1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{
7 cached_state::CachedStateProvider, executor::WorkloadExecutor, metrics::EngineApiMetrics,
8 },
9};
10use alloy_consensus::BlockHeader;
11use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash};
12use alloy_primitives::{
13 map::{HashMap, HashSet},
14 BlockNumber, B256, U256,
15};
16use alloy_rpc_types_engine::{
17 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
18};
19use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
20use payload_processor::sparse_trie::StateRootComputeOutcome;
21use persistence_state::CurrentPersistenceAction;
22use reth_chain_state::{
23 CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
24 MemoryOverlayStateProvider, NewCanonicalChain,
25};
26use reth_consensus::{Consensus, FullConsensus};
27pub use reth_engine_primitives::InvalidBlockHook;
28use reth_engine_primitives::{
29 BeaconConsensusEngineEvent, BeaconEngineMessage, BeaconOnNewPayloadError, EngineTypes,
30 EngineValidator, ExecutionPayload, ForkchoiceStateTracker, OnForkChoiceUpdated,
31};
32use reth_errors::{ConsensusError, ProviderResult};
33use reth_ethereum_primitives::EthPrimitives;
34use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
35use reth_payload_builder::PayloadBuilderHandle;
36use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes};
37use reth_primitives_traits::{
38 Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
39};
40use reth_provider::{
41 providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
42 ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
43 StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
44};
45use reth_revm::database::StateProviderDatabase;
46use reth_stages_api::ControlFlow;
47use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
48use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
49use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
50use std::{
51 cmp::Ordering,
52 collections::{btree_map, hash_map, BTreeMap, VecDeque},
53 fmt::Debug,
54 ops::Bound,
55 sync::{
56 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
57 Arc,
58 },
59 time::Instant,
60};
61use tokio::sync::{
62 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
63 oneshot::{self, error::TryRecvError},
64};
65use tracing::*;
66
67mod block_buffer;
68mod cached_state;
69pub mod error;
70mod invalid_block_hook;
71mod invalid_headers;
72mod metrics;
73mod payload_processor;
74mod persistence_state;
75#[allow(unused)]
77mod trie_updates;
78
79use crate::tree::error::AdvancePersistenceError;
80pub use block_buffer::BlockBuffer;
81pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
82pub use invalid_headers::InvalidHeaderCache;
83pub use payload_processor::*;
84pub use persistence_state::PersistenceState;
85pub use reth_engine_primitives::TreeConfig;
86use reth_evm::execute::BlockExecutionOutput;
87
88pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
94
95#[derive(Debug, Default)]
102pub struct TreeState<N: NodePrimitives = EthPrimitives> {
103 blocks_by_hash: HashMap<B256, ExecutedBlockWithTrieUpdates<N>>,
107 blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlockWithTrieUpdates<N>>>,
113 parent_to_child: HashMap<B256, HashSet<B256>>,
115 persisted_trie_updates: HashMap<B256, (BlockNumber, Arc<TrieUpdates>)>,
119 current_canonical_head: BlockNumHash,
121}
122
123impl<N: NodePrimitives> TreeState<N> {
124 fn new(current_canonical_head: BlockNumHash) -> Self {
126 Self {
127 blocks_by_hash: HashMap::default(),
128 blocks_by_number: BTreeMap::new(),
129 current_canonical_head,
130 parent_to_child: HashMap::default(),
131 persisted_trie_updates: HashMap::default(),
132 }
133 }
134
135 fn reset(&mut self, current_canonical_head: BlockNumHash) {
137 *self = Self::new(current_canonical_head);
138 }
139
140 fn block_count(&self) -> usize {
142 self.blocks_by_hash.len()
143 }
144
145 fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlockWithTrieUpdates<N>> {
147 self.blocks_by_hash.get(&hash)
148 }
149
150 fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock<N::Block>>> {
152 self.blocks_by_hash.get(&hash).map(|b| Arc::new(b.recovered_block().sealed_block().clone()))
153 }
154
155 fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlockWithTrieUpdates<N>>)> {
160 let block = self.blocks_by_hash.get(&hash).cloned()?;
161 let mut parent_hash = block.recovered_block().parent_hash();
162 let mut blocks = vec![block];
163 while let Some(executed) = self.blocks_by_hash.get(&parent_hash) {
164 parent_hash = executed.recovered_block().parent_hash();
165 blocks.push(executed.clone());
166 }
167
168 Some((parent_hash, blocks))
169 }
170
171 fn insert_executed(&mut self, executed: ExecutedBlockWithTrieUpdates<N>) {
173 let hash = executed.recovered_block().hash();
174 let parent_hash = executed.recovered_block().parent_hash();
175 let block_number = executed.recovered_block().number();
176
177 if self.blocks_by_hash.contains_key(&hash) {
178 return;
179 }
180
181 self.blocks_by_hash.insert(hash, executed.clone());
182
183 self.blocks_by_number.entry(block_number).or_default().push(executed);
184
185 self.parent_to_child.entry(parent_hash).or_default().insert(hash);
186
187 for children in self.parent_to_child.values_mut() {
188 children.retain(|child| self.blocks_by_hash.contains_key(child));
189 }
190 }
191
192 fn remove_by_hash(
198 &mut self,
199 hash: B256,
200 ) -> Option<(ExecutedBlockWithTrieUpdates<N>, HashSet<B256>)> {
201 let executed = self.blocks_by_hash.remove(&hash)?;
202
203 let parent_entry = self.parent_to_child.entry(executed.recovered_block().parent_hash());
205 if let hash_map::Entry::Occupied(mut entry) = parent_entry {
206 entry.get_mut().remove(&hash);
207
208 if entry.get().is_empty() {
209 entry.remove();
210 }
211 }
212
213 let children = self.parent_to_child.remove(&hash).unwrap_or_default();
215
216 let block_number_entry = self.blocks_by_number.entry(executed.recovered_block().number());
218 if let btree_map::Entry::Occupied(mut entry) = block_number_entry {
219 if let Some(index) = entry.get().iter().position(|b| b.recovered_block().hash() == hash)
221 {
222 entry.get_mut().swap_remove(index);
223
224 if entry.get().is_empty() {
226 entry.remove();
227 }
228 }
229 }
230
231 Some((executed, children))
232 }
233
234 pub(crate) fn is_canonical(&self, hash: B256) -> bool {
236 let mut current_block = self.current_canonical_head.hash;
237 if current_block == hash {
238 return true
239 }
240
241 while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
242 current_block = executed.recovered_block().parent_hash();
243 if current_block == hash {
244 return true
245 }
246 }
247
248 false
249 }
250
251 pub(crate) fn remove_canonical_until(
254 &mut self,
255 upper_bound: BlockNumber,
256 last_persisted_hash: B256,
257 ) {
258 debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
259
260 if !self.is_canonical(last_persisted_hash) {
263 return
264 }
265
266 let mut current_block = self.current_canonical_head.hash;
269 while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
270 current_block = executed.recovered_block().parent_hash();
271 if executed.recovered_block().number() <= upper_bound {
272 debug!(target: "engine::tree", num_hash=?executed.recovered_block().num_hash(), "Attempting to remove block walking back from the head");
273 if let Some((removed, _)) = self.remove_by_hash(executed.recovered_block().hash()) {
274 debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed block walking back from the head");
275 self.persisted_trie_updates.insert(
277 removed.recovered_block().hash(),
278 (removed.recovered_block().number(), removed.trie),
279 );
280 }
281 }
282 }
283 debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removed canonical blocks from the tree");
284 }
285
286 pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
289 let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
290
291 let blocks_to_remove = self
300 .blocks_by_number
301 .range((Bound::Unbounded, Bound::Excluded(finalized_num)))
302 .flat_map(|(_, blocks)| blocks.iter().map(|b| b.recovered_block().hash()))
303 .collect::<Vec<_>>();
304 for hash in blocks_to_remove {
305 if let Some((removed, _)) = self.remove_by_hash(hash) {
306 debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain block");
307 }
308 }
309
310 self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num > finalized_num);
312
313 let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
320
321 if let Some(position) =
323 blocks_to_remove.iter().position(|b| b.recovered_block().hash() == finalized_hash)
324 {
325 let finalized_block = blocks_to_remove.swap_remove(position);
326 self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
327 }
328
329 let mut blocks_to_remove = blocks_to_remove
330 .into_iter()
331 .map(|e| e.recovered_block().hash())
332 .collect::<VecDeque<_>>();
333 while let Some(block) = blocks_to_remove.pop_front() {
334 if let Some((removed, children)) = self.remove_by_hash(block) {
335 debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain child block");
336 blocks_to_remove.extend(children);
337 }
338 }
339 }
340
341 pub(crate) fn remove_until(
352 &mut self,
353 upper_bound: BlockNumHash,
354 last_persisted_hash: B256,
355 finalized_num_hash: Option<BlockNumHash>,
356 ) {
357 debug!(target: "engine::tree", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
358
359 let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
362 if upper_bound.number < finalized.number {
363 finalized = upper_bound;
364 debug!(target: "engine::tree", ?finalized, "Adjusted upper bound");
365 }
366 finalized
367 });
368
369 self.remove_canonical_until(upper_bound.number, last_persisted_hash);
377
378 if let Some(finalized_num_hash) = finalized_num_hash {
381 self.prune_finalized_sidechains(finalized_num_hash);
382 }
383 }
384
385 fn is_descendant(&self, first: BlockNumHash, second: &N::BlockHeader) -> bool {
389 if second.parent_hash() == first.hash {
392 return true
393 }
394
395 if second.number() <= first.number {
398 return false
399 }
400
401 let Some(mut current_block) = self.block_by_hash(second.parent_hash()) else {
403 return false
405 };
406
407 while current_block.number() > first.number + 1 {
408 let Some(block) = self.block_by_hash(current_block.header().parent_hash()) else {
409 return false
411 };
412
413 current_block = block;
414 }
415
416 current_block.parent_hash() == first.hash
418 }
419
420 fn set_canonical_head(&mut self, new_head: BlockNumHash) {
422 self.current_canonical_head = new_head;
423 }
424
425 const fn canonical_head(&self) -> &BlockNumHash {
427 &self.current_canonical_head
428 }
429
430 const fn canonical_block_hash(&self) -> B256 {
432 self.canonical_head().hash
433 }
434
435 const fn canonical_block_number(&self) -> BlockNumber {
437 self.canonical_head().number
438 }
439}
440
441#[derive(Clone, Debug)]
443pub struct StateProviderBuilder<N: NodePrimitives, P> {
444 provider_factory: P,
446 historical: B256,
448 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
450}
451
452impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
453 pub fn new(
456 provider_factory: P,
457 historical: B256,
458 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
459 ) -> Self {
460 Self { provider_factory, historical, overlay }
461 }
462}
463
464impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
465where
466 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
467{
468 pub fn build(&self) -> ProviderResult<StateProviderBox> {
470 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
471 if let Some(overlay) = self.overlay.clone() {
472 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
473 }
474 Ok(provider)
475 }
476}
477
478#[derive(Debug)]
482pub struct EngineApiTreeState<N: NodePrimitives> {
483 tree_state: TreeState<N>,
485 forkchoice_state_tracker: ForkchoiceStateTracker,
487 buffer: BlockBuffer<N::Block>,
489 invalid_headers: InvalidHeaderCache,
492}
493
494impl<N: NodePrimitives> EngineApiTreeState<N> {
495 fn new(
496 block_buffer_limit: u32,
497 max_invalid_header_cache_length: u32,
498 canonical_block: BlockNumHash,
499 ) -> Self {
500 Self {
501 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
502 buffer: BlockBuffer::new(block_buffer_limit),
503 tree_state: TreeState::new(canonical_block),
504 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
505 }
506 }
507}
508
509#[derive(Debug)]
511pub struct TreeOutcome<T> {
512 pub outcome: T,
514 pub event: Option<TreeEvent>,
516}
517
518impl<T> TreeOutcome<T> {
519 pub const fn new(outcome: T) -> Self {
521 Self { outcome, event: None }
522 }
523
524 pub fn with_event(mut self, event: TreeEvent) -> Self {
526 self.event = Some(event);
527 self
528 }
529}
530
531#[derive(Debug)]
533pub enum TreeEvent {
534 TreeAction(TreeAction),
536 BackfillAction(BackfillAction),
538 Download(DownloadRequest),
540}
541
542impl TreeEvent {
543 const fn is_backfill_action(&self) -> bool {
545 matches!(self, Self::BackfillAction(_))
546 }
547}
548
549#[derive(Debug)]
551pub enum TreeAction {
552 MakeCanonical {
554 sync_target_head: B256,
556 },
557}
558
559pub struct EngineApiTreeHandler<N, P, E, T, V, C>
564where
565 N: NodePrimitives,
566 T: EngineTypes,
567{
568 provider: P,
569 executor_provider: E,
570 evm_config: C,
571 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
572 payload_validator: V,
573 state: EngineApiTreeState<N>,
575 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
584 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
586 outgoing: UnboundedSender<EngineApiEvent<N>>,
588 persistence: PersistenceHandle<N>,
590 persistence_state: PersistenceState,
592 backfill_sync_state: BackfillSyncState,
594 canonical_in_memory_state: CanonicalInMemoryState<N>,
597 payload_builder: PayloadBuilderHandle<T>,
600 config: TreeConfig,
602 metrics: EngineApiMetrics,
604 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
606 engine_kind: EngineApiKind,
608 payload_processor: PayloadProcessor<N, C>,
610}
611
612impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug, C: Debug> std::fmt::Debug
613 for EngineApiTreeHandler<N, P, E, T, V, C>
614where
615 N: NodePrimitives,
616{
617 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618 f.debug_struct("EngineApiTreeHandler")
619 .field("provider", &self.provider)
620 .field("evm_config", &self.evm_config)
621 .field("executor_provider", &self.executor_provider)
622 .field("consensus", &self.consensus)
623 .field("payload_validator", &self.payload_validator)
624 .field("state", &self.state)
625 .field("incoming_tx", &self.incoming_tx)
626 .field("persistence", &self.persistence)
627 .field("persistence_state", &self.persistence_state)
628 .field("backfill_sync_state", &self.backfill_sync_state)
629 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
630 .field("payload_builder", &self.payload_builder)
631 .field("config", &self.config)
632 .field("metrics", &self.metrics)
633 .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
634 .field("engine_kind", &self.engine_kind)
635 .finish()
636 }
637}
638
639impl<N, P, E, T, V, C> EngineApiTreeHandler<N, P, E, T, V, C>
640where
641 N: NodePrimitives,
642 P: DatabaseProviderFactory
643 + BlockReader<Block = N::Block, Header = N::BlockHeader>
644 + StateProviderFactory
645 + StateReader<Receipt = N::Receipt>
646 + StateCommitmentProvider
647 + HashedPostStateProvider
648 + Clone
649 + 'static,
650 <P as DatabaseProviderFactory>::Provider:
651 BlockReader<Block = N::Block, Header = N::BlockHeader>,
652 E: BlockExecutorProvider<Primitives = N>,
653 C: ConfigureEvm<Primitives = N> + 'static,
654 T: EngineTypes,
655 V: EngineValidator<T, Block = N::Block>,
656{
657 #[expect(clippy::too_many_arguments)]
659 pub fn new(
660 provider: P,
661 executor_provider: E,
662 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
663 payload_validator: V,
664 outgoing: UnboundedSender<EngineApiEvent<N>>,
665 state: EngineApiTreeState<N>,
666 canonical_in_memory_state: CanonicalInMemoryState<N>,
667 persistence: PersistenceHandle<N>,
668 persistence_state: PersistenceState,
669 payload_builder: PayloadBuilderHandle<T>,
670 config: TreeConfig,
671 engine_kind: EngineApiKind,
672 evm_config: C,
673 ) -> Self {
674 let (incoming_tx, incoming) = std::sync::mpsc::channel();
675
676 let payload_processor =
677 PayloadProcessor::new(WorkloadExecutor::default(), evm_config.clone(), &config);
678
679 Self {
680 provider,
681 executor_provider,
682 evm_config,
683 consensus,
684 payload_validator,
685 incoming,
686 outgoing,
687 persistence,
688 persistence_state,
689 backfill_sync_state: BackfillSyncState::Idle,
690 state,
691 canonical_in_memory_state,
692 payload_builder,
693 config,
694 metrics: Default::default(),
695 incoming_tx,
696 invalid_block_hook: Box::new(NoopInvalidBlockHook),
697 engine_kind,
698 payload_processor,
699 }
700 }
701
702 fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
704 self.invalid_block_hook = invalid_block_hook;
705 }
706
707 #[expect(clippy::complexity)]
713 pub fn spawn_new(
714 provider: P,
715 executor_provider: E,
716 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
717 payload_validator: V,
718 persistence: PersistenceHandle<N>,
719 payload_builder: PayloadBuilderHandle<T>,
720 canonical_in_memory_state: CanonicalInMemoryState<N>,
721 config: TreeConfig,
722 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
723 kind: EngineApiKind,
724 evm_config: C,
725 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
726 {
727 let best_block_number = provider.best_block_number().unwrap_or(0);
728 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
729
730 let persistence_state = PersistenceState {
731 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
732 rx: None,
733 remove_above_state: VecDeque::new(),
734 };
735
736 let (tx, outgoing) = unbounded_channel();
737 let state = EngineApiTreeState::new(
738 config.block_buffer_limit(),
739 config.max_invalid_header_cache_length(),
740 header.num_hash(),
741 );
742
743 let mut task = Self::new(
744 provider,
745 executor_provider,
746 consensus,
747 payload_validator,
748 tx,
749 state,
750 canonical_in_memory_state,
751 persistence,
752 persistence_state,
753 payload_builder,
754 config,
755 kind,
756 evm_config,
757 );
758 task.set_invalid_block_hook(invalid_block_hook);
759 let incoming = task.incoming_tx.clone();
760 std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
761 (incoming, outgoing)
762 }
763
764 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
766 self.incoming_tx.clone()
767 }
768
769 pub fn run(mut self) {
773 loop {
774 match self.try_recv_engine_message() {
775 Ok(Some(msg)) => {
776 debug!(target: "engine::tree", %msg, "received new engine message");
777 if let Err(fatal) = self.on_engine_message(msg) {
778 error!(target: "engine::tree", %fatal, "insert block fatal error");
779 return
780 }
781 }
782 Ok(None) => {
783 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
784 }
785 Err(_err) => {
786 error!(target: "engine::tree", "Engine channel disconnected");
787 return
788 }
789 }
790
791 if let Err(err) = self.advance_persistence() {
792 error!(target: "engine::tree", %err, "Advancing persistence failed");
793 return
794 }
795 }
796 }
797
798 fn on_downloaded(
804 &mut self,
805 mut blocks: Vec<RecoveredBlock<N::Block>>,
806 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
807 if blocks.is_empty() {
808 return Ok(None)
810 }
811
812 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
813 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
814 for block in blocks.drain(..batch) {
815 if let Some(event) = self.on_downloaded_block(block)? {
816 let needs_backfill = event.is_backfill_action();
817 self.on_tree_event(event)?;
818 if needs_backfill {
819 return Ok(None)
821 }
822 }
823 }
824
825 if !blocks.is_empty() {
827 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
828 }
829
830 Ok(None)
831 }
832
833 #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
846 fn on_new_payload(
847 &mut self,
848 payload: T::ExecutionData,
849 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
850 trace!(target: "engine::tree", "invoked new payload");
851 self.metrics.engine.new_payload_messages.increment(1);
852
853 let parent_hash = payload.parent_hash();
879 let block = match self.payload_validator.ensure_well_formed_payload(payload) {
880 Ok(block) => block,
881 Err(error) => {
882 error!(target: "engine::tree", %error, "Invalid payload");
883 let latest_valid_hash =
886 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
887 None
891 } else {
892 self.latest_valid_hash_for_invalid_payload(parent_hash)?
893 };
894
895 let status = PayloadStatusEnum::from(error);
896 return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
897 }
898 };
899
900 let block_hash = block.hash();
901 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
902 if lowest_buffered_ancestor == block_hash {
903 lowest_buffered_ancestor = block.parent_hash();
904 }
905
906 if let Some(status) =
908 self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?
909 {
910 return Ok(TreeOutcome::new(status))
911 }
912
913 let status = if self.backfill_sync_state.is_idle() {
914 let mut latest_valid_hash = None;
915 let num_hash = block.num_hash();
916 match self.insert_block(block) {
917 Ok(status) => {
918 let status = match status {
919 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
920 latest_valid_hash = Some(block_hash);
921 self.try_connect_buffered_blocks(num_hash)?;
922 PayloadStatusEnum::Valid
923 }
924 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
925 latest_valid_hash = Some(block_hash);
926 PayloadStatusEnum::Valid
927 }
928 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
929 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
930 PayloadStatusEnum::Syncing
932 }
933 };
934
935 PayloadStatus::new(status, latest_valid_hash)
936 }
937 Err(error) => self.on_insert_block_error(error)?,
938 }
939 } else if let Err(error) = self.buffer_block(block) {
940 self.on_insert_block_error(error)?
941 } else {
942 PayloadStatus::from_status(PayloadStatusEnum::Syncing)
943 };
944
945 let mut outcome = TreeOutcome::new(status);
946 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
948 if self.state.tree_state.canonical_block_hash() != block_hash {
950 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
951 sync_target_head: block_hash,
952 }));
953 }
954 }
955
956 Ok(outcome)
957 }
958
959 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
966 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
968 return Ok(None)
969 };
970
971 let new_head_number = new_head_block.recovered_block().number();
972 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
973
974 let mut new_chain = vec![new_head_block.clone()];
975 let mut current_hash = new_head_block.recovered_block().parent_hash();
976 let mut current_number = new_head_number - 1;
977
978 while current_number > current_canonical_number {
983 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
984 {
985 current_hash = block.recovered_block().parent_hash();
986 current_number -= 1;
987 new_chain.push(block);
988 } else {
989 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
990 return Ok(None);
993 }
994 }
995
996 if current_hash == self.state.tree_state.current_canonical_head.hash {
999 new_chain.reverse();
1000
1001 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
1003 }
1004
1005 let mut old_chain = Vec::new();
1007 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
1008
1009 while current_canonical_number > current_number {
1012 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1013 old_chain.push(block.clone());
1014 old_hash = block.recovered_block().parent_hash();
1015 current_canonical_number -= 1;
1016 } else {
1017 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1019 return Ok(None);
1020 }
1021 }
1022
1023 debug_assert_eq!(current_number, current_canonical_number);
1025
1026 while old_hash != current_hash {
1029 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1030 old_hash = block.recovered_block().parent_hash();
1031 old_chain.push(block);
1032 } else {
1033 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1035 return Ok(None);
1036 }
1037
1038 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
1039 {
1040 current_hash = block.recovered_block().parent_hash();
1041 new_chain.push(block);
1042 } else {
1043 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
1045 return Ok(None);
1046 }
1047 }
1048 new_chain.reverse();
1049 old_chain.reverse();
1050
1051 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
1052 }
1053
1054 fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
1061 let canonical_head = self.state.tree_state.canonical_head();
1063 let mut current_hash = target_hash;
1064 while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
1065 if current_block.hash() == canonical_head.hash {
1066 return Ok(false)
1067 }
1068 if current_block.number() <= canonical_head.number {
1070 break
1071 }
1072 current_hash = current_block.parent_hash();
1073 }
1074
1075 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
1077 return Ok(false)
1078 }
1079
1080 if self.provider.block_number(target_hash)?.is_some() {
1082 return Ok(false)
1083 }
1084
1085 Ok(true)
1086 }
1087
1088 fn persisting_kind_for(&self, block: &N::BlockHeader) -> PersistingKind {
1090 let Some(action) = self.persistence_state.current_action() else {
1092 return PersistingKind::NotPersisting
1093 };
1094 let CurrentPersistenceAction::SavingBlocks { highest } = action else {
1096 return PersistingKind::PersistingNotDescendant
1097 };
1098
1099 if block.number() > highest.number && self.state.tree_state.is_descendant(*highest, block) {
1102 return PersistingKind::PersistingDescendant
1103 }
1104
1105 PersistingKind::PersistingNotDescendant
1107 }
1108
1109 #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
1118 fn on_forkchoice_updated(
1119 &mut self,
1120 state: ForkchoiceState,
1121 attrs: Option<T::PayloadAttributes>,
1122 version: EngineApiMessageVersion,
1123 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1124 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1125 self.metrics.engine.forkchoice_updated_messages.increment(1);
1126 self.canonical_in_memory_state.on_forkchoice_update_received();
1127
1128 if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1129 return Ok(TreeOutcome::new(on_updated))
1130 }
1131
1132 let valid_outcome = |head| {
1133 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1134 PayloadStatusEnum::Valid,
1135 Some(head),
1136 )))
1137 };
1138
1139 if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1155 trace!(target: "engine::tree", "fcu head hash is already canonical");
1156
1157 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1159 return Ok(TreeOutcome::new(outcome))
1161 }
1162
1163 if let Some(attr) = attrs {
1165 let tip = self
1166 .block_by_hash(self.state.tree_state.canonical_block_hash())?
1167 .ok_or_else(|| {
1168 ProviderError::HeaderNotFound(state.head_block_hash.into())
1171 })?;
1172 let updated = self.process_payload_attributes(attr, tip.header(), state, version);
1173 return Ok(TreeOutcome::new(updated))
1174 }
1175
1176 return Ok(valid_outcome(state.head_block_hash))
1178 }
1179
1180 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1182 let tip = chain_update.tip().clone_sealed_header();
1183 self.on_canonical_chain_update(chain_update);
1184
1185 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1187 return Ok(TreeOutcome::new(outcome))
1189 }
1190
1191 if let Some(attr) = attrs {
1192 let updated = self.process_payload_attributes(attr, &tip, state, version);
1193 return Ok(TreeOutcome::new(updated))
1194 }
1195
1196 return Ok(valid_outcome(state.head_block_hash))
1197 }
1198
1199 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1201 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1202
1203 if self.engine_kind.is_opstack() {
1206 if let Some(attr) = attrs {
1207 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1208 let updated =
1209 self.process_payload_attributes(attr, &canonical_header, state, version);
1210 return Ok(TreeOutcome::new(updated))
1211 }
1212 }
1213
1214 return Ok(valid_outcome(state.head_block_hash))
1224 }
1225
1226 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1233 !state.safe_block_hash.is_zero() &&
1235 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1236 {
1237 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1238 state.safe_block_hash
1239 } else {
1240 state.head_block_hash
1241 };
1242
1243 let target = self.lowest_buffered_ancestor_or(target);
1244 trace!(target: "engine::tree", %target, "downloading missing block");
1245
1246 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1247 PayloadStatusEnum::Syncing,
1248 )))
1249 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1250 }
1251
1252 #[expect(clippy::type_complexity)]
1261 fn try_recv_engine_message(
1262 &self,
1263 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1264 if self.persistence_state.in_progress() {
1265 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1267 Ok(msg) => Ok(Some(msg)),
1268 Err(err) => match err {
1269 RecvTimeoutError::Timeout => Ok(None),
1270 RecvTimeoutError::Disconnected => Err(RecvError),
1271 },
1272 }
1273 } else {
1274 self.incoming.recv().map(Some)
1275 }
1276 }
1277
1278 fn remove_blocks(&mut self, new_tip_num: u64) {
1281 debug!(target: "engine::tree", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1282 if new_tip_num < self.persistence_state.last_persisted_block.number {
1283 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1284 let (tx, rx) = oneshot::channel();
1285 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1286 self.persistence_state.start_remove(new_tip_num, rx);
1287 }
1288 }
1289
1290 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1293 if blocks_to_persist.is_empty() {
1294 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1295 return
1296 }
1297
1298 let highest_num_hash = blocks_to_persist
1300 .iter()
1301 .max_by_key(|block| block.recovered_block().number())
1302 .map(|b| b.recovered_block().num_hash())
1303 .expect("Checked non-empty persisting blocks");
1304
1305 debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1306 let (tx, rx) = oneshot::channel();
1307 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1308
1309 self.persistence_state.start_save(highest_num_hash, rx);
1310 }
1311
1312 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1317 if self.persistence_state.in_progress() {
1318 let (mut rx, start_time, current_action) = self
1319 .persistence_state
1320 .rx
1321 .take()
1322 .expect("if a persistence task is in progress Receiver must be Some");
1323 match rx.try_recv() {
1325 Ok(last_persisted_hash_num) => {
1326 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1327 let Some(BlockNumHash {
1328 hash: last_persisted_block_hash,
1329 number: last_persisted_block_number,
1330 }) = last_persisted_hash_num
1331 else {
1332 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1335 return Ok(())
1336 };
1337
1338 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1339 self.persistence_state
1340 .finish(last_persisted_block_hash, last_persisted_block_number);
1341 self.on_new_persisted_block()?;
1342 }
1343 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1344 Err(TryRecvError::Empty) => {
1345 self.persistence_state.rx = Some((rx, start_time, current_action))
1346 }
1347 }
1348 }
1349
1350 if !self.persistence_state.in_progress() {
1351 if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
1352 self.remove_blocks(new_tip_num)
1353 } else if self.should_persist() {
1354 let blocks_to_persist = self.get_canonical_blocks_to_persist();
1355 self.persist_blocks(blocks_to_persist);
1356 }
1357 }
1358
1359 Ok(())
1360 }
1361
1362 fn on_engine_message(
1364 &mut self,
1365 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1366 ) -> Result<(), InsertBlockFatalError> {
1367 match msg {
1368 FromEngine::Event(event) => match event {
1369 FromOrchestrator::BackfillSyncStarted => {
1370 debug!(target: "engine::tree", "received backfill sync started event");
1371 self.backfill_sync_state = BackfillSyncState::Active;
1372 }
1373 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1374 self.on_backfill_sync_finished(ctrl)?;
1375 }
1376 },
1377 FromEngine::Request(request) => {
1378 match request {
1379 EngineApiRequest::InsertExecutedBlock(block) => {
1380 let block_num_hash = block.recovered_block().num_hash();
1381 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1382 let now = Instant::now();
1383
1384 if self.state.tree_state.canonical_block_hash() ==
1387 block.recovered_block().parent_hash()
1388 {
1389 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1390 self.canonical_in_memory_state.set_pending_block(block.clone());
1391 }
1392
1393 self.state.tree_state.insert_executed(block.clone());
1394 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1395 self.emit_event(EngineApiEvent::BeaconConsensus(
1396 BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1397 ));
1398 }
1399 EngineApiRequest::Beacon(request) => {
1400 match request {
1401 BeaconEngineMessage::ForkchoiceUpdated {
1402 state,
1403 payload_attrs,
1404 tx,
1405 version,
1406 } => {
1407 let mut output =
1408 self.on_forkchoice_updated(state, payload_attrs, version);
1409
1410 if let Ok(res) = &mut output {
1411 self.state
1413 .forkchoice_state_tracker
1414 .set_latest(state, res.outcome.forkchoice_status());
1415
1416 self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1418 state,
1419 res.outcome.forkchoice_status(),
1420 ));
1421
1422 self.on_maybe_tree_event(res.event.take())?;
1424 }
1425
1426 if let Err(err) =
1427 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1428 {
1429 self.metrics
1430 .engine
1431 .failed_forkchoice_updated_response_deliveries
1432 .increment(1);
1433 error!(target: "engine::tree", "Failed to send event: {err:?}");
1434 }
1435 }
1436 BeaconEngineMessage::NewPayload { payload, tx } => {
1437 let mut output = self.on_new_payload(payload);
1438
1439 let maybe_event =
1440 output.as_mut().ok().and_then(|out| out.event.take());
1441
1442 if let Err(err) =
1444 tx.send(output.map(|o| o.outcome).map_err(|e| {
1445 BeaconOnNewPayloadError::Internal(Box::new(e))
1446 }))
1447 {
1448 error!(target: "engine::tree", "Failed to send event: {err:?}");
1449 self.metrics
1450 .engine
1451 .failed_new_payload_response_deliveries
1452 .increment(1);
1453 }
1454
1455 self.on_maybe_tree_event(maybe_event)?;
1457 }
1458 BeaconEngineMessage::TransitionConfigurationExchanged => {
1459 self.canonical_in_memory_state
1462 .on_transition_configuration_exchanged();
1463 }
1464 }
1465 }
1466 }
1467 }
1468 FromEngine::DownloadedBlocks(blocks) => {
1469 if let Some(event) = self.on_downloaded(blocks)? {
1470 self.on_tree_event(event)?;
1471 }
1472 }
1473 }
1474 Ok(())
1475 }
1476
1477 fn on_backfill_sync_finished(
1491 &mut self,
1492 ctrl: ControlFlow,
1493 ) -> Result<(), InsertBlockFatalError> {
1494 debug!(target: "engine::tree", "received backfill sync finished event");
1495 self.backfill_sync_state = BackfillSyncState::Idle;
1496
1497 let mut backfill_height = ctrl.block_number();
1499
1500 if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1502 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1503 self.state.invalid_headers.insert(**bad_block);
1505
1506 backfill_height = Some(*target);
1508 }
1509
1510 let Some(backfill_height) = backfill_height else { return Ok(()) };
1512
1513 let Some(backfill_num_hash) = self
1519 .provider
1520 .block_hash(backfill_height)?
1521 .map(|hash| BlockNumHash { hash, number: backfill_height })
1522 else {
1523 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1524 return Ok(())
1525 };
1526
1527 if ctrl.is_unwind() {
1528 self.state.tree_state.reset(backfill_num_hash)
1531 } else {
1532 self.state.tree_state.remove_until(
1533 backfill_num_hash,
1534 self.persistence_state.last_persisted_block.hash,
1535 Some(backfill_num_hash),
1536 );
1537 }
1538
1539 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1540 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1541
1542 self.state.buffer.remove_old_blocks(backfill_height);
1544 self.canonical_in_memory_state.clear_state();
1547
1548 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1549 self.state.tree_state.set_canonical_head(new_head.num_hash());
1552 self.persistence_state.finish(new_head.hash(), new_head.number());
1553
1554 self.canonical_in_memory_state.set_canonical_head(new_head);
1556 }
1557
1558 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1561 else {
1562 return Ok(())
1563 };
1564 if sync_target_state.finalized_block_hash.is_zero() {
1565 return Ok(())
1567 }
1568 let newest_finalized = self
1570 .state
1571 .buffer
1572 .block(&sync_target_state.finalized_block_hash)
1573 .map(|block| block.number());
1574
1575 if let Some(backfill_target) =
1581 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1582 self.backfill_sync_target(progress, finalized_number, None)
1585 })
1586 {
1587 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1589 backfill_target.into(),
1590 )));
1591 return Ok(())
1592 };
1593
1594 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1596 }
1597
1598 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1602 if let Some(chain_update) = self.on_new_head(target)? {
1603 self.on_canonical_chain_update(chain_update);
1604 }
1605
1606 Ok(())
1607 }
1608
1609 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1611 if let Some(event) = event {
1612 self.on_tree_event(event)?;
1613 }
1614
1615 Ok(())
1616 }
1617
1618 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1622 match event {
1623 TreeEvent::TreeAction(action) => match action {
1624 TreeAction::MakeCanonical { sync_target_head } => {
1625 self.make_canonical(sync_target_head)?;
1626 }
1627 },
1628 TreeEvent::BackfillAction(action) => {
1629 self.emit_event(EngineApiEvent::BackfillAction(action));
1630 }
1631 TreeEvent::Download(action) => {
1632 self.emit_event(EngineApiEvent::Download(action));
1633 }
1634 }
1635
1636 Ok(())
1637 }
1638
1639 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1641 let event = event.into();
1642
1643 if event.is_backfill_action() {
1644 debug_assert_eq!(
1645 self.backfill_sync_state,
1646 BackfillSyncState::Idle,
1647 "backfill action should only be emitted when backfill is idle"
1648 );
1649
1650 if self.persistence_state.in_progress() {
1651 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1654 return
1655 }
1656
1657 self.backfill_sync_state = BackfillSyncState::Pending;
1658 self.metrics.engine.pipeline_runs.increment(1);
1659 debug!(target: "engine::tree", "emitting backfill action event");
1660 }
1661
1662 let _ = self.outgoing.send(event).inspect_err(
1663 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1664 );
1665 }
1666
1667 pub const fn should_persist(&self) -> bool {
1671 if !self.backfill_sync_state.is_idle() {
1672 return false
1674 }
1675
1676 let min_block = self.persistence_state.last_persisted_block.number;
1677 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1678 self.config.persistence_threshold()
1679 }
1680
1681 fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
1685 let mut blocks_to_persist = Vec::new();
1686 let mut current_hash = self.state.tree_state.canonical_block_hash();
1687 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1688
1689 let canonical_head_number = self.state.tree_state.canonical_block_number();
1690
1691 let target_number =
1692 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1693
1694 debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1695 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1696 if block.recovered_block().number() <= last_persisted_number {
1697 break;
1698 }
1699
1700 if block.recovered_block().number() <= target_number {
1701 blocks_to_persist.push(block.clone());
1702 }
1703
1704 current_hash = block.recovered_block().parent_hash();
1705 }
1706
1707 blocks_to_persist.reverse();
1709
1710 blocks_to_persist
1711 }
1712
1713 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1721 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1722 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1723 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1724 number: self.persistence_state.last_persisted_block.number,
1725 hash: self.persistence_state.last_persisted_block.hash,
1726 });
1727 Ok(())
1728 }
1729
1730 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1738 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1739 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash).cloned() {
1741 return Ok(Some(block.block))
1742 }
1743
1744 let (block, senders) = self
1745 .provider
1746 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1747 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1748 .split_sealed();
1749 let execution_output = self
1750 .provider
1751 .get_state(block.header().number())?
1752 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1753 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1754
1755 Ok(Some(ExecutedBlock {
1756 recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1757 execution_output: Arc::new(execution_output),
1758 hashed_state: Arc::new(hashed_state),
1759 }))
1760 }
1761
1762 fn sealed_header_by_hash(
1764 &self,
1765 hash: B256,
1766 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1767 let block = self
1769 .state
1770 .tree_state
1771 .block_by_hash(hash)
1772 .map(|block| block.as_ref().clone_sealed_header());
1773
1774 if block.is_some() {
1775 Ok(block)
1776 } else {
1777 self.provider.sealed_header_by_hash(hash)
1778 }
1779 }
1780
1781 fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1783 let mut block = self.provider.block_by_hash(hash)?;
1785 if block.is_none() {
1786 block = self
1789 .state
1790 .tree_state
1791 .block_by_hash(hash)
1792 .map(|block| block.as_ref().clone().into_block());
1794 }
1795 Ok(block)
1796 }
1797
1798 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1805 self.state
1806 .buffer
1807 .lowest_ancestor(&hash)
1808 .map(|block| block.parent_hash())
1809 .unwrap_or_else(|| hash)
1810 }
1811
1812 fn latest_valid_hash_for_invalid_payload(
1823 &mut self,
1824 parent_hash: B256,
1825 ) -> ProviderResult<Option<B256>> {
1826 if self.block_by_hash(parent_hash)?.is_some() {
1828 return Ok(Some(parent_hash))
1829 }
1830
1831 let mut current_hash = parent_hash;
1834 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1835 while let Some(block_with_parent) = current_block {
1836 current_hash = block_with_parent.parent;
1837 current_block = self.state.invalid_headers.get(¤t_hash);
1838
1839 if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1842 return Ok(Some(current_hash))
1843 }
1844 }
1845 Ok(None)
1846 }
1847
1848 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1852 if let Some(parent) = self.block_by_hash(parent_hash)? {
1855 if !parent.header().difficulty().is_zero() {
1856 parent_hash = B256::ZERO;
1857 }
1858 }
1859
1860 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1861 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1862 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1863 })
1864 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1865 }
1866
1867 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1871 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1872 return target.head_block_hash == block_hash
1873 }
1874 false
1875 }
1876
1877 fn check_invalid_ancestor_with_head(
1883 &mut self,
1884 check: B256,
1885 head: &SealedBlock<N::Block>,
1886 ) -> ProviderResult<Option<PayloadStatus>> {
1887 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1889
1890 let status = self.prepare_invalid_response(header.parent)?;
1892
1893 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), header);
1895 self.emit_event(BeaconConsensusEngineEvent::InvalidBlock(Box::new(head.clone())));
1896
1897 Ok(Some(status))
1898 }
1899
1900 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1903 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1905 Ok(Some(self.prepare_invalid_response(header.parent)?))
1907 }
1908
1909 fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1912 if let Err(e) =
1913 self.consensus.validate_header_with_total_difficulty(block.header(), U256::MAX)
1914 {
1915 error!(
1916 target: "engine::tree",
1917 ?block,
1918 "Failed to validate total difficulty for block {}: {e}",
1919 block.hash()
1920 );
1921 return Err(e)
1922 }
1923
1924 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1925 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1926 return Err(e)
1927 }
1928
1929 if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1930 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1931 return Err(e)
1932 }
1933
1934 Ok(())
1935 }
1936
1937 #[instrument(level = "trace", skip(self), target = "engine::tree")]
1939 fn try_connect_buffered_blocks(
1940 &mut self,
1941 parent: BlockNumHash,
1942 ) -> Result<(), InsertBlockFatalError> {
1943 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1944
1945 if blocks.is_empty() {
1946 return Ok(())
1948 }
1949
1950 let now = Instant::now();
1951 let block_count = blocks.len();
1952 for child in blocks {
1953 let child_num_hash = child.num_hash();
1954 match self.insert_block(child) {
1955 Ok(res) => {
1956 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1957 if self.is_sync_target_head(child_num_hash.hash) &&
1958 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1959 {
1960 self.make_canonical(child_num_hash.hash)?;
1961 }
1962 }
1963 Err(err) => {
1964 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1965 if let Err(fatal) = self.on_insert_block_error(err) {
1966 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1967 return Err(fatal)
1968 }
1969 }
1970 }
1971 }
1972
1973 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1974 Ok(())
1975 }
1976
1977 fn buffer_block(
1979 &mut self,
1980 block: RecoveredBlock<N::Block>,
1981 ) -> Result<(), InsertBlockError<N::Block>> {
1982 if let Err(err) = self.validate_block(&block) {
1983 return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1984 }
1985 self.state.buffer.insert_block(block);
1986 Ok(())
1987 }
1988
1989 #[inline]
1994 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1995 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1996 }
1997
1998 #[inline]
2001 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2002 if block > local_tip {
2003 Some(block - local_tip)
2004 } else {
2005 None
2006 }
2007 }
2008
2009 fn backfill_sync_target(
2016 &self,
2017 canonical_tip_num: u64,
2018 target_block_number: u64,
2019 downloaded_block: Option<BlockNumHash>,
2020 ) -> Option<B256> {
2021 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2022
2023 let mut exceeds_backfill_threshold =
2025 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
2026
2027 if let Some(buffered_finalized) = sync_target_state
2029 .as_ref()
2030 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2031 {
2032 exceeds_backfill_threshold =
2035 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
2036 }
2037
2038 if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
2041 if downloaded_block.hash == state.finalized_block_hash {
2042 exceeds_backfill_threshold =
2044 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
2045 }
2046 }
2047
2048 if exceeds_backfill_threshold {
2050 if let Some(state) = sync_target_state {
2051 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2053 Err(err) => {
2054 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2055 }
2056 Ok(None) => {
2057 if !state.finalized_block_hash.is_zero() {
2059 return Some(state.finalized_block_hash)
2062 }
2063
2064 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2077 return Some(state.head_block_hash)
2078 }
2079 Ok(Some(_)) => {
2080 }
2082 }
2083 }
2084 }
2085
2086 None
2087 }
2088
2089 fn find_disk_reorg(&self, chain_update: &NewCanonicalChain<N>) -> Option<u64> {
2101 let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };
2102
2103 let BlockNumHash { number: new_num, hash: new_hash } =
2104 new.first().map(|block| block.recovered_block().num_hash())?;
2105
2106 match new_num.cmp(&self.persistence_state.last_persisted_block.number) {
2107 Ordering::Greater => {
2108 None
2111 }
2112 Ordering::Equal => {
2113 (self.persistence_state.last_persisted_block.hash != new_hash).then_some(new_num)
2116 }
2117 Ordering::Less => {
2118 Some(new_num)
2120 }
2121 }
2122 }
2123
2124 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2128 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2129 let start = Instant::now();
2130
2131 if let Some(height) = self.find_disk_reorg(&chain_update) {
2133 let new_tip_num = height.saturating_sub(1);
2135 self.persistence_state.schedule_removal(new_tip_num);
2136 }
2137
2138 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2140
2141 let tip = chain_update.tip().clone_sealed_header();
2142 let notification = chain_update.to_chain_notification();
2143
2144 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2146 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2147 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2148 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2149
2150 self.update_reorg_metrics(old.len());
2151 self.reinsert_reorged_blocks(new.clone());
2152 let old = old
2155 .iter()
2156 .filter_map(|block| {
2157 let (_, trie) = self
2158 .state
2159 .tree_state
2160 .persisted_trie_updates
2161 .get(&block.recovered_block.hash())
2162 .cloned()?;
2163 Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
2164 })
2165 .collect::<Vec<_>>();
2166 self.reinsert_reorged_blocks(old);
2167 }
2168
2169 self.canonical_in_memory_state.update_chain(chain_update);
2171 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2172
2173 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2175
2176 self.canonical_in_memory_state.notify_canon_state(notification);
2178
2179 self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
2181 Box::new(tip),
2182 start.elapsed(),
2183 ));
2184 }
2185
2186 fn update_reorg_metrics(&self, old_chain_length: usize) {
2188 self.metrics.tree.reorgs.increment(1);
2189 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2190 }
2191
2192 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2194 for block in new_chain {
2195 if self
2196 .state
2197 .tree_state
2198 .executed_block_by_hash(block.recovered_block().hash())
2199 .is_none()
2200 {
2201 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2202 self.state.tree_state.insert_executed(block);
2203 }
2204 }
2205 }
2206
2207 fn on_invalid_block(
2209 &mut self,
2210 parent_header: &SealedHeader<N::BlockHeader>,
2211 block: &RecoveredBlock<N::Block>,
2212 output: &BlockExecutionOutput<N::Receipt>,
2213 trie_updates: Option<(&TrieUpdates, B256)>,
2214 ) {
2215 if self.state.invalid_headers.get(&block.hash()).is_some() {
2216 return;
2218 }
2219 self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
2220 }
2221
2222 fn on_disconnected_downloaded_block(
2227 &self,
2228 downloaded_block: BlockNumHash,
2229 missing_parent: BlockNumHash,
2230 head: BlockNumHash,
2231 ) -> Option<TreeEvent> {
2232 if let Some(target) =
2234 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2235 {
2236 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2237 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2238 }
2239
2240 let request = if let Some(distance) =
2250 self.distance_from_local_tip(head.number, missing_parent.number)
2251 {
2252 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2253 DownloadRequest::BlockRange(missing_parent.hash, distance)
2254 } else {
2255 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2256 DownloadRequest::single_block(missing_parent.hash)
2259 };
2260
2261 Some(TreeEvent::Download(request))
2262 }
2263
2264 #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2270 fn on_downloaded_block(
2271 &mut self,
2272 block: RecoveredBlock<N::Block>,
2273 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2274 let block_num_hash = block.num_hash();
2275 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2276 if self
2277 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2278 .is_some()
2279 {
2280 return Ok(None)
2281 }
2282
2283 if !self.backfill_sync_state.is_idle() {
2284 return Ok(None)
2285 }
2286
2287 match self.insert_block(block) {
2289 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2290 if self.is_sync_target_head(block_num_hash.hash) {
2291 trace!(target: "engine::tree", "appended downloaded sync target block");
2292
2293 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2296 sync_target_head: block_num_hash.hash,
2297 })))
2298 }
2299 trace!(target: "engine::tree", "appended downloaded block");
2300 self.try_connect_buffered_blocks(block_num_hash)?;
2301 }
2302 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2303 return Ok(self.on_disconnected_downloaded_block(
2306 block_num_hash,
2307 missing_ancestor,
2308 head,
2309 ))
2310 }
2311 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2312 trace!(target: "engine::tree", "downloaded block already executed");
2313 }
2314 Err(err) => {
2315 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2316 if let Err(fatal) = self.on_insert_block_error(err) {
2317 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2318 return Err(fatal)
2319 }
2320 }
2321 }
2322 Ok(None)
2323 }
2324
2325 fn insert_block(
2326 &mut self,
2327 block: RecoveredBlock<N::Block>,
2328 ) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
2329 self.insert_block_inner(block.clone())
2330 .map_err(|kind| InsertBlockError::new(block.into_sealed_block(), kind))
2331 }
2332
2333 fn insert_block_inner(
2334 &mut self,
2335 block: RecoveredBlock<N::Block>,
2336 ) -> Result<InsertPayloadOk, InsertBlockErrorKind> {
2337 let block_num_hash = block.num_hash();
2338 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2339
2340 if self.block_by_hash(block.hash())?.is_some() {
2341 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2342 }
2343
2344 let start = Instant::now();
2345
2346 trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
2347
2348 self.validate_block(&block)?;
2350
2351 trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
2352 let Some(provider_builder) = self.state_provider_builder(block.parent_hash())? else {
2353 let missing_ancestor = self
2356 .state
2357 .buffer
2358 .lowest_ancestor(&block.parent_hash())
2359 .map(|block| block.parent_num_hash())
2360 .unwrap_or_else(|| block.parent_num_hash());
2361
2362 self.state.buffer.insert_block(block);
2363
2364 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2365 head: self.state.tree_state.current_canonical_head,
2366 missing_ancestor,
2367 }))
2368 };
2369
2370 let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
2372 InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
2373 block.parent_hash().into(),
2374 ))
2375 })?;
2376 if let Err(e) =
2377 self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
2378 {
2379 warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
2380 return Err(e.into())
2381 }
2382
2383 let state_provider = provider_builder.build()?;
2384
2385 let persisting_kind = self.persisting_kind_for(block.header());
2395 let run_parallel_state_root = persisting_kind.can_run_parallel_state_root();
2396
2397 let header = block.clone_sealed_header();
2399 let txs = block.clone_transactions_recovered().collect();
2400 let mut handle = if run_parallel_state_root && self.config.use_state_root_task() {
2401 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2403
2404 let trie_input_start = Instant::now();
2406 let trie_input = self
2407 .compute_trie_input(
2408 persisting_kind,
2409 consistent_view.clone(),
2410 block.header().parent_hash(),
2411 )
2412 .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
2413
2414 self.metrics
2415 .block_validation
2416 .trie_input_duration
2417 .record(trie_input_start.elapsed().as_secs_f64());
2418
2419 self.payload_processor.spawn(header, txs, provider_builder, consistent_view, trie_input)
2420 } else {
2421 self.payload_processor.spawn_cache_exclusive(header, txs, provider_builder)
2422 };
2423
2424 let state_provider = CachedStateProvider::new_with_caches(
2427 state_provider,
2428 handle.caches(),
2429 handle.cache_metrics(),
2430 );
2431
2432 trace!(target: "engine::tree", block=?block_num_hash, "Executing block");
2433
2434 let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
2435 let execution_start = Instant::now();
2436 let output = self.metrics.executor.execute_metered(
2437 executor,
2438 &block,
2439 Box::new(handle.state_hook()),
2440 )?;
2441 let execution_finish = Instant::now();
2442 let execution_time = execution_finish.duration_since(execution_start);
2443 trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
2444
2445 handle.stop_prewarming_execution();
2447
2448 if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
2449 self.on_invalid_block(&parent_block, &block, &output, None);
2451 return Err(err.into())
2452 }
2453
2454 let hashed_state = self.provider.hashed_post_state(&output.state);
2455
2456 if let Err(err) = self
2457 .payload_validator
2458 .validate_block_post_execution_with_hashed_state(&hashed_state, &block)
2459 {
2460 self.on_invalid_block(&parent_block, &block, &output, None);
2462 return Err(err.into())
2463 }
2464
2465 trace!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
2466
2467 let root_time = Instant::now();
2468
2469 let mut maybe_state_root = None;
2470
2471 if run_parallel_state_root {
2472 if self.config.use_state_root_task() {
2475 match handle.state_root() {
2476 Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
2477 let elapsed = execution_finish.elapsed();
2478 info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
2479 if state_root == block.header().state_root() {
2481 maybe_state_root = Some((state_root, trie_updates, elapsed))
2482 } else {
2483 warn!(
2484 target: "engine::tree",
2485 ?state_root,
2486 block_state_root = ?block.header().state_root(),
2487 "State root task returned incorrect state root"
2488 );
2489 }
2490 }
2491 Err(error) => {
2492 debug!(target: "engine::tree", %error, "Background parallel state root computation failed");
2493 }
2494 }
2495 } else {
2496 match self.compute_state_root_parallel(
2497 persisting_kind,
2498 block.header().parent_hash(),
2499 &hashed_state,
2500 ) {
2501 Ok(result) => {
2502 info!(
2503 target: "engine::tree",
2504 block = ?block_num_hash,
2505 regular_state_root = ?result.0,
2506 "Regular root task finished"
2507 );
2508 maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
2509 }
2510 Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2511 debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
2512 }
2513 Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))),
2514 }
2515 }
2516 }
2517
2518 let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
2519 maybe_state_root
2520 {
2521 maybe_state_root
2522 } else {
2523 debug!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
2525 let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?;
2526 (root, updates, root_time.elapsed())
2527 };
2528
2529 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2530 debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
2531
2532 if state_root != block.header().state_root() {
2534 self.on_invalid_block(&parent_block, &block, &output, Some((&trie_output, state_root)));
2536 return Err(ConsensusError::BodyStateRootDiff(
2537 GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2538 )
2539 .into())
2540 }
2541
2542 handle.terminate_caching(Some(output.state.clone()));
2544
2545 let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
2546 block: ExecutedBlock {
2547 recovered_block: Arc::new(block),
2548 execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
2549 hashed_state: Arc::new(hashed_state),
2550 },
2551 trie: Arc::new(trie_output),
2552 };
2553
2554 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2556 {
2557 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2558 self.canonical_in_memory_state.set_pending_block(executed.clone());
2559 }
2560
2561 self.state.tree_state.insert_executed(executed.clone());
2562 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2563
2564 let elapsed = start.elapsed();
2566 let engine_event = if self.is_fork(block_num_hash.hash)? {
2567 BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2568 } else {
2569 BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2570 };
2571 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2572
2573 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2574 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2575 }
2576
2577 fn compute_state_root_parallel(
2586 &self,
2587 persisting_kind: PersistingKind,
2588 parent_hash: B256,
2589 hashed_state: &HashedPostState,
2590 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2591 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2592
2593 let mut input =
2594 self.compute_trie_input(persisting_kind, consistent_view.clone(), parent_hash)?;
2595 input.append_ref(hashed_state);
2597
2598 ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2599 }
2600
2601 fn compute_trie_input(
2617 &self,
2618 persisting_kind: PersistingKind,
2619 consistent_view: ConsistentDbView<P>,
2620 parent_hash: B256,
2621 ) -> Result<TrieInput, ParallelStateRootError> {
2622 let mut input = TrieInput::default();
2623
2624 let provider = consistent_view.provider_ro()?;
2625 let best_block_number = provider.best_block_number()?;
2626
2627 let (mut historical, mut blocks) = self
2628 .state
2629 .tree_state
2630 .blocks_by_hash(parent_hash)
2631 .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2632
2633 if persisting_kind.is_descendant() {
2636 while let Some(block) = blocks.last() {
2638 let recovered_block = block.recovered_block();
2639 if recovered_block.number() <= best_block_number {
2640 blocks.pop();
2643 } else {
2644 break
2647 }
2648 }
2649
2650 historical = if let Some(block) = blocks.last() {
2651 (block.recovered_block().number() - 1).into()
2654 } else {
2655 parent_hash.into()
2657 };
2658 }
2659
2660 if blocks.is_empty() {
2661 debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2662 } else {
2663 debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2664 }
2665
2666 let block_number = provider
2668 .convert_hash_or_number(historical)?
2669 .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2670
2671 let revert_state = if block_number == best_block_number {
2673 debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2676 HashedPostState::default()
2677 } else {
2678 let revert_state = HashedPostState::from_reverts::<
2679 <P::StateCommitment as StateCommitment>::KeyHasher,
2680 >(provider.tx_ref(), block_number + 1)
2681 .map_err(ProviderError::from)?;
2682 debug!(
2683 target: "engine::tree",
2684 block_number,
2685 best_block_number,
2686 accounts = revert_state.accounts.len(),
2687 storages = revert_state.storages.len(),
2688 "Non-empty revert state"
2689 );
2690 revert_state
2691 };
2692 input.append(revert_state);
2693
2694 for block in blocks.iter().rev() {
2696 input.append_cached_ref(block.trie_updates(), block.hashed_state())
2697 }
2698
2699 Ok(input)
2700 }
2701
2702 fn on_insert_block_error(
2708 &mut self,
2709 error: InsertBlockError<N::Block>,
2710 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2711 let (block, error) = error.split();
2712
2713 let validation_err = error.ensure_validation_error()?;
2716
2717 warn!(
2721 target: "engine::tree",
2722 invalid_hash=%block.hash(),
2723 invalid_number=block.number(),
2724 %validation_err,
2725 "Invalid block error on new payload",
2726 );
2727 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2728
2729 self.state.invalid_headers.insert(block.block_with_parent());
2731 self.emit_event(EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
2732 Box::new(block),
2733 )));
2734 Ok(PayloadStatus::new(
2735 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2736 latest_valid_hash,
2737 ))
2738 }
2739
2740 pub fn find_canonical_header(
2742 &self,
2743 hash: B256,
2744 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2745 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2746
2747 if canonical.is_none() {
2748 canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2749 }
2750
2751 Ok(canonical)
2752 }
2753
2754 fn update_finalized_block(
2756 &self,
2757 finalized_block_hash: B256,
2758 ) -> Result<(), OnForkChoiceUpdated> {
2759 if finalized_block_hash.is_zero() {
2760 return Ok(())
2761 }
2762
2763 match self.find_canonical_header(finalized_block_hash) {
2764 Ok(None) => {
2765 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2766 return Err(OnForkChoiceUpdated::invalid_state())
2768 }
2769 Ok(Some(finalized)) => {
2770 if Some(finalized.num_hash()) !=
2771 self.canonical_in_memory_state.get_finalized_num_hash()
2772 {
2773 let _ = self.persistence.save_finalized_block_number(finalized.number());
2776 self.canonical_in_memory_state.set_finalized(finalized);
2777 }
2778 }
2779 Err(err) => {
2780 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2781 }
2782 }
2783
2784 Ok(())
2785 }
2786
2787 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2789 if safe_block_hash.is_zero() {
2790 return Ok(())
2791 }
2792
2793 match self.find_canonical_header(safe_block_hash) {
2794 Ok(None) => {
2795 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2796 return Err(OnForkChoiceUpdated::invalid_state())
2798 }
2799 Ok(Some(safe)) => {
2800 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2801 let _ = self.persistence.save_safe_block_number(safe.number());
2804 self.canonical_in_memory_state.set_safe(safe);
2805 }
2806 }
2807 Err(err) => {
2808 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2809 }
2810 }
2811
2812 Ok(())
2813 }
2814
2815 fn ensure_consistent_forkchoice_state(
2824 &self,
2825 state: ForkchoiceState,
2826 ) -> Result<(), OnForkChoiceUpdated> {
2827 self.update_finalized_block(state.finalized_block_hash)?;
2833
2834 self.update_safe_block(state.safe_block_hash)
2840 }
2841
2842 fn pre_validate_forkchoice_update(
2847 &mut self,
2848 state: ForkchoiceState,
2849 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2850 if state.head_block_hash.is_zero() {
2851 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2852 }
2853
2854 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2857 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2858 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2859 }
2860
2861 if !self.backfill_sync_state.is_idle() {
2862 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2865 return Ok(Some(OnForkChoiceUpdated::syncing()))
2866 }
2867
2868 Ok(None)
2869 }
2870
2871 fn process_payload_attributes(
2876 &self,
2877 attrs: T::PayloadAttributes,
2878 head: &N::BlockHeader,
2879 state: ForkchoiceState,
2880 version: EngineApiMessageVersion,
2881 ) -> OnForkChoiceUpdated {
2882 if let Err(err) =
2883 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2884 {
2885 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2886 return OnForkChoiceUpdated::invalid_payload_attributes()
2887 }
2888
2889 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2894 state.head_block_hash,
2895 attrs,
2896 version as u8,
2897 ) {
2898 Ok(attributes) => {
2899 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2902
2903 OnForkChoiceUpdated::updated_with_pending_payload_id(
2915 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2916 pending_payload_id,
2917 )
2918 }
2919 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2920 }
2921 }
2922
2923 pub(crate) fn remove_before(
2930 &mut self,
2931 upper_bound: BlockNumHash,
2932 finalized_hash: Option<B256>,
2933 ) -> ProviderResult<()> {
2934 let num = if let Some(hash) = finalized_hash {
2937 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2938 } else {
2939 None
2940 };
2941
2942 self.state.tree_state.remove_until(
2943 upper_bound,
2944 self.persistence_state.last_persisted_block.hash,
2945 num,
2946 );
2947 Ok(())
2948 }
2949
2950 pub fn state_provider_builder(
2955 &self,
2956 hash: B256,
2957 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2958 where
2959 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
2960 {
2961 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2962 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2963 return Ok(Some(StateProviderBuilder::new(
2965 self.provider.clone(),
2966 historical,
2967 Some(blocks),
2968 )))
2969 }
2970
2971 if let Some(header) = self.provider.header(&hash)? {
2973 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2974 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2977 }
2978
2979 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2980 Ok(None)
2981 }
2982}
2983
2984#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2990pub enum BlockStatus {
2991 Valid,
2993 Disconnected {
2995 head: BlockNumHash,
2997 missing_ancestor: BlockNumHash,
2999 },
3000}
3001
3002#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3007pub enum InsertPayloadOk {
3008 AlreadySeen(BlockStatus),
3010 Inserted(BlockStatus),
3012}
3013
3014#[derive(Debug, Clone, Copy)]
3016pub enum PersistingKind {
3017 NotPersisting,
3019 PersistingNotDescendant,
3021 PersistingDescendant,
3023}
3024
3025impl PersistingKind {
3026 pub fn can_run_parallel_state_root(&self) -> bool {
3031 matches!(self, Self::NotPersisting | Self::PersistingDescendant)
3032 }
3033
3034 pub fn is_descendant(&self) -> bool {
3037 matches!(self, Self::PersistingDescendant)
3038 }
3039}
3040
3041#[cfg(test)]
3042mod tests {
3043 use super::*;
3044 use crate::persistence::PersistenceAction;
3045 use alloy_consensus::Header;
3046 use alloy_primitives::Bytes;
3047 use alloy_rlp::Decodable;
3048 use alloy_rpc_types_engine::{
3049 CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1,
3050 ExecutionPayloadV3,
3051 };
3052 use assert_matches::assert_matches;
3053 use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
3054 use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
3055 use reth_engine_primitives::ForkchoiceStatus;
3056 use reth_ethereum_consensus::EthBeaconConsensus;
3057 use reth_ethereum_engine_primitives::EthEngineTypes;
3058 use reth_ethereum_primitives::{Block, EthPrimitives};
3059 use reth_evm::test_utils::MockExecutorProvider;
3060 use reth_evm_ethereum::EthEvmConfig;
3061 use reth_node_ethereum::EthereumEngineValidator;
3062 use reth_primitives_traits::Block as _;
3063 use reth_provider::test_utils::MockEthProvider;
3064 use reth_trie::{updates::TrieUpdates, HashedPostState};
3065 use std::{
3066 str::FromStr,
3067 sync::mpsc::{channel, Sender},
3068 };
3069
3070 #[allow(dead_code)]
3074 struct TestChannel<T> {
3075 release: Receiver<()>,
3077 tx: Sender<T>,
3079 rx: Receiver<T>,
3081 }
3082
3083 impl<T: Send + 'static> TestChannel<T> {
3084 #[allow(dead_code)]
3086 fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
3087 let (original_tx, original_rx) = channel();
3088 let (wrapped_tx, wrapped_rx) = channel();
3089 let (release_tx, release_rx) = channel();
3090 let handle = TestChannelHandle::new(release_tx);
3091 let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
3092 std::thread::spawn(move || test_channel.intercept_loop());
3094 (original_tx, wrapped_rx, handle)
3095 }
3096
3097 fn intercept_loop(&self) {
3099 while self.release.recv() == Ok(()) {
3100 let Ok(value) = self.rx.recv() else { return };
3101
3102 let _ = self.tx.send(value);
3103 }
3104 }
3105 }
3106
3107 struct TestChannelHandle {
3108 release: Sender<()>,
3110 }
3111
3112 impl TestChannelHandle {
3113 const fn new(release: Sender<()>) -> Self {
3115 Self { release }
3116 }
3117
3118 #[allow(dead_code)]
3120 fn release(&self) {
3121 let _ = self.release.send(());
3122 }
3123 }
3124
3125 struct TestHarness {
3126 tree: EngineApiTreeHandler<
3127 EthPrimitives,
3128 MockEthProvider,
3129 MockExecutorProvider,
3130 EthEngineTypes,
3131 EthereumEngineValidator,
3132 EthEvmConfig,
3133 >,
3134 to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
3135 from_tree_rx: UnboundedReceiver<EngineApiEvent>,
3136 blocks: Vec<ExecutedBlockWithTrieUpdates>,
3137 action_rx: Receiver<PersistenceAction>,
3138 executor_provider: MockExecutorProvider,
3139 block_builder: TestBlockBuilder,
3140 provider: MockEthProvider,
3141 }
3142
3143 impl TestHarness {
3144 fn new(chain_spec: Arc<ChainSpec>) -> Self {
3145 let (action_tx, action_rx) = channel();
3146 Self::with_persistence_channel(chain_spec, action_tx, action_rx)
3147 }
3148
3149 #[allow(dead_code)]
3150 fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
3151 let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
3152 (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
3153 }
3154
3155 fn with_persistence_channel(
3156 chain_spec: Arc<ChainSpec>,
3157 action_tx: Sender<PersistenceAction>,
3158 action_rx: Receiver<PersistenceAction>,
3159 ) -> Self {
3160 let persistence_handle = PersistenceHandle::new(action_tx);
3161
3162 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
3163
3164 let provider = MockEthProvider::default();
3165 let executor_provider = MockExecutorProvider::default();
3166
3167 let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
3168
3169 let (from_tree_tx, from_tree_rx) = unbounded_channel();
3170
3171 let header = chain_spec.genesis_header().clone();
3172 let header = SealedHeader::seal_slow(header);
3173 let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
3174 let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
3175
3176 let (to_payload_service, _payload_command_rx) = unbounded_channel();
3177 let payload_builder = PayloadBuilderHandle::new(to_payload_service);
3178
3179 let evm_config = EthEvmConfig::new(chain_spec.clone());
3180
3181 let tree = EngineApiTreeHandler::new(
3182 provider.clone(),
3183 executor_provider.clone(),
3184 consensus,
3185 payload_validator,
3186 from_tree_tx,
3187 engine_api_tree_state,
3188 canonical_in_memory_state,
3189 persistence_handle,
3190 PersistenceState::default(),
3191 payload_builder,
3192 TreeConfig::default()
3195 .with_legacy_state_root(true)
3196 .with_has_enough_parallelism(true),
3197 EngineApiKind::Ethereum,
3198 evm_config,
3199 );
3200
3201 let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3202 Self {
3203 to_tree_tx: tree.incoming_tx.clone(),
3204 tree,
3205 from_tree_rx,
3206 blocks: vec![],
3207 action_rx,
3208 executor_provider,
3209 block_builder,
3210 provider,
3211 }
3212 }
3213
3214 fn with_blocks(mut self, blocks: Vec<ExecutedBlockWithTrieUpdates>) -> Self {
3215 let mut blocks_by_hash = HashMap::default();
3216 let mut blocks_by_number = BTreeMap::new();
3217 let mut state_by_hash = HashMap::default();
3218 let mut hash_by_number = BTreeMap::new();
3219 let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
3220 let mut parent_hash = B256::ZERO;
3221
3222 for block in &blocks {
3223 let sealed_block = block.recovered_block();
3224 let hash = sealed_block.hash();
3225 let number = sealed_block.number;
3226 blocks_by_hash.insert(hash, block.clone());
3227 blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
3228 state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
3229 hash_by_number.insert(number, hash);
3230 parent_to_child.entry(parent_hash).or_default().insert(hash);
3231 parent_hash = hash;
3232 }
3233
3234 self.tree.state.tree_state = TreeState {
3235 blocks_by_hash,
3236 blocks_by_number,
3237 current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
3238 parent_to_child,
3239 persisted_trie_updates: HashMap::default(),
3240 };
3241
3242 let last_executed_block = blocks.last().unwrap().clone();
3243 let pending = Some(BlockState::new(last_executed_block));
3244 self.tree.canonical_in_memory_state =
3245 CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
3246
3247 self.blocks = blocks.clone();
3248
3249 let recovered_blocks =
3250 blocks.iter().map(|b| b.recovered_block().clone()).collect::<Vec<_>>();
3251
3252 self.persist_blocks(recovered_blocks);
3253
3254 self
3255 }
3256
3257 const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
3258 self.tree.backfill_sync_state = state;
3259 self
3260 }
3261
3262 fn extend_execution_outcome(
3263 &self,
3264 execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
3265 ) {
3266 self.executor_provider.extend(execution_outcomes);
3267 }
3268
3269 fn insert_block(
3270 &mut self,
3271 block: RecoveredBlock<reth_ethereum_primitives::Block>,
3272 ) -> Result<InsertPayloadOk, InsertBlockError<Block>> {
3273 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3274 self.extend_execution_outcome([execution_outcome]);
3275 self.tree.provider.add_state_root(block.state_root);
3276 self.tree.insert_block(block)
3277 }
3278
3279 async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3280 let fcu_status = fcu_status.into();
3281
3282 self.send_fcu(block_hash, fcu_status).await;
3283
3284 self.check_fcu(block_hash, fcu_status).await;
3285 }
3286
3287 async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3288 let fcu_state = self.fcu_state(block_hash);
3289
3290 let (tx, rx) = oneshot::channel();
3291 self.tree
3292 .on_engine_message(FromEngine::Request(
3293 BeaconEngineMessage::ForkchoiceUpdated {
3294 state: fcu_state,
3295 payload_attrs: None,
3296 tx,
3297 version: EngineApiMessageVersion::default(),
3298 }
3299 .into(),
3300 ))
3301 .unwrap();
3302
3303 let response = rx.await.unwrap().unwrap().await.unwrap();
3304 match fcu_status.into() {
3305 ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
3306 ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
3307 ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
3308 }
3309 }
3310
3311 async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3312 let fcu_state = self.fcu_state(block_hash);
3313
3314 let event = self.from_tree_rx.recv().await.unwrap();
3316 match event {
3317 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
3318 state,
3319 status,
3320 )) => {
3321 assert_eq!(state, fcu_state);
3322 assert_eq!(status, fcu_status.into());
3323 }
3324 _ => panic!("Unexpected event: {:#?}", event),
3325 }
3326 }
3327
3328 const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
3329 ForkchoiceState {
3330 head_block_hash: block_hash,
3331 safe_block_hash: block_hash,
3332 finalized_block_hash: block_hash,
3333 }
3334 }
3335
3336 async fn send_new_payload(
3337 &mut self,
3338 block: RecoveredBlock<reth_ethereum_primitives::Block>,
3339 ) {
3340 let payload = ExecutionPayloadV3::from_block_unchecked(
3341 block.hash(),
3342 &block.clone_sealed_block().into_block(),
3343 );
3344 self.tree
3345 .on_new_payload(ExecutionData {
3346 payload: payload.into(),
3347 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
3348 parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
3349 versioned_hashes: vec![],
3350 }),
3351 })
3352 .unwrap();
3353 }
3354
3355 async fn insert_chain(
3356 &mut self,
3357 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3358 ) {
3359 for block in chain.clone() {
3360 self.insert_block(block.clone()).unwrap();
3361 }
3362 self.check_canon_chain_insertion(chain).await;
3363 }
3364
3365 async fn check_canon_commit(&mut self, hash: B256) {
3366 let event = self.from_tree_rx.recv().await.unwrap();
3367 match event {
3368 EngineApiEvent::BeaconConsensus(
3369 BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3370 ) => {
3371 assert_eq!(header.hash(), hash);
3372 }
3373 _ => panic!("Unexpected event: {:#?}", event),
3374 }
3375 }
3376
3377 async fn check_fork_chain_insertion(
3378 &mut self,
3379 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3380 ) {
3381 for block in chain {
3382 self.check_fork_block_added(block.hash()).await;
3383 }
3384 }
3385
3386 async fn check_canon_chain_insertion(
3387 &mut self,
3388 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3389 ) {
3390 for block in chain.clone() {
3391 self.check_canon_block_added(block.hash()).await;
3392 }
3393 }
3394
3395 async fn check_canon_block_added(&mut self, expected_hash: B256) {
3396 let event = self.from_tree_rx.recv().await.unwrap();
3397 match event {
3398 EngineApiEvent::BeaconConsensus(
3399 BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
3400 ) => {
3401 assert_eq!(executed.recovered_block.hash(), expected_hash);
3402 }
3403 _ => panic!("Unexpected event: {:#?}", event),
3404 }
3405 }
3406
3407 async fn check_fork_block_added(&mut self, expected_hash: B256) {
3408 let event = self.from_tree_rx.recv().await.unwrap();
3409 match event {
3410 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3411 executed,
3412 _,
3413 )) => {
3414 assert_eq!(executed.recovered_block.hash(), expected_hash);
3415 }
3416 _ => panic!("Unexpected event: {:#?}", event),
3417 }
3418 }
3419
3420 async fn check_invalid_block(&mut self, expected_hash: B256) {
3421 let event = self.from_tree_rx.recv().await.unwrap();
3422 match event {
3423 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
3424 block,
3425 )) => {
3426 assert_eq!(block.hash(), expected_hash);
3427 }
3428 _ => panic!("Unexpected event: {:#?}", event),
3429 }
3430 }
3431
3432 fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
3433 let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3434 let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3435
3436 for block in &blocks {
3437 block_data.push((block.hash(), block.clone_block()));
3438 headers_data.push((block.hash(), block.header().clone()));
3439 }
3440
3441 self.provider.extend_blocks(block_data);
3442 self.provider.extend_headers(headers_data);
3443 }
3444
3445 fn setup_range_insertion_for_valid_chain(
3446 &mut self,
3447 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3448 ) {
3449 self.setup_range_insertion_for_chain(chain, None)
3450 }
3451
3452 fn setup_range_insertion_for_invalid_chain(
3453 &mut self,
3454 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3455 index: usize,
3456 ) {
3457 self.setup_range_insertion_for_chain(chain, Some(index))
3458 }
3459
3460 fn setup_range_insertion_for_chain(
3461 &mut self,
3462 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3463 invalid_index: Option<usize>,
3464 ) {
3465 let mut chain_rev = chain;
3468 chain_rev.reverse();
3469
3470 let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3471 for (index, block) in chain_rev.iter().enumerate() {
3472 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3473 let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3474 B256::random()
3475 } else {
3476 block.state_root
3477 };
3478 self.tree.provider.add_state_root(state_root);
3479 execution_outcomes.push(execution_outcome);
3480 }
3481 self.extend_execution_outcome(execution_outcomes);
3482 }
3483
3484 fn check_canon_head(&self, head_hash: B256) {
3485 assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3486 }
3487 }
3488
3489 #[test]
3490 fn test_tree_persist_block_batch() {
3491 let tree_config = TreeConfig::default();
3492 let chain_spec = MAINNET.clone();
3493 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3494
3495 let blocks: Vec<_> = test_block_builder
3498 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3499 .collect();
3500 let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3501
3502 let mut blocks = vec![];
3503 for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3504 blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3505 }
3506
3507 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3508
3509 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3511 test_harness.tree.on_engine_message(msg).unwrap();
3512
3513 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3515 match msg {
3516 FromEngine::DownloadedBlocks(blocks) => {
3517 assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3518 }
3519 _ => panic!("unexpected message: {:#?}", msg),
3520 }
3521 }
3522
3523 #[tokio::test]
3524 async fn test_tree_persist_blocks() {
3525 let tree_config = TreeConfig::default();
3526 let chain_spec = MAINNET.clone();
3527 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3528
3529 let blocks: Vec<_> = test_block_builder
3532 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3533 .collect();
3534 let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3535 std::thread::Builder::new()
3536 .name("Tree Task".to_string())
3537 .spawn(|| test_harness.tree.run())
3538 .unwrap();
3539
3540 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3542
3543 let received_action =
3544 test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3545 if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3546 let expected_persist_len =
3549 blocks.len() - tree_config.memory_block_buffer_target() as usize;
3550 assert_eq!(saved_blocks.len(), expected_persist_len);
3551 assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3552 } else {
3553 panic!("unexpected action received {received_action:?}");
3554 }
3555 }
3556
3557 #[tokio::test]
3558 async fn test_in_memory_state_trait_impl() {
3559 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect();
3560 let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3561
3562 for executed_block in blocks {
3563 let sealed_block = executed_block.recovered_block();
3564
3565 let expected_state = BlockState::new(executed_block.clone());
3566
3567 let actual_state_by_hash = test_harness
3568 .tree
3569 .canonical_in_memory_state
3570 .state_by_hash(sealed_block.hash())
3571 .unwrap();
3572 assert_eq!(expected_state, *actual_state_by_hash);
3573
3574 let actual_state_by_number = test_harness
3575 .tree
3576 .canonical_in_memory_state
3577 .state_by_number(sealed_block.number)
3578 .unwrap();
3579 assert_eq!(expected_state, *actual_state_by_number);
3580 }
3581 }
3582
3583 #[tokio::test]
3584 async fn test_engine_request_during_backfill() {
3585 let tree_config = TreeConfig::default();
3586 let blocks: Vec<_> = TestBlockBuilder::eth()
3587 .get_executed_blocks(0..tree_config.persistence_threshold())
3588 .collect();
3589 let mut test_harness = TestHarness::new(MAINNET.clone())
3590 .with_blocks(blocks)
3591 .with_backfill_state(BackfillSyncState::Active);
3592
3593 let (tx, rx) = oneshot::channel();
3594 test_harness
3595 .tree
3596 .on_engine_message(FromEngine::Request(
3597 BeaconEngineMessage::ForkchoiceUpdated {
3598 state: ForkchoiceState {
3599 head_block_hash: B256::random(),
3600 safe_block_hash: B256::random(),
3601 finalized_block_hash: B256::random(),
3602 },
3603 payload_attrs: None,
3604 tx,
3605 version: EngineApiMessageVersion::default(),
3606 }
3607 .into(),
3608 ))
3609 .unwrap();
3610
3611 let resp = rx.await.unwrap().unwrap().await.unwrap();
3612 assert!(resp.payload_status.is_syncing());
3613 }
3614
3615 #[test]
3616 fn test_disconnected_payload() {
3617 let s = include_str!("../../test-data/holesky/2.rlp");
3618 let data = Bytes::from_str(s).unwrap();
3619 let block = Block::decode(&mut data.as_ref()).unwrap();
3620 let sealed = block.seal_slow();
3621 let hash = sealed.hash();
3622 let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block());
3623
3624 let mut test_harness = TestHarness::new(HOLESKY.clone());
3625
3626 let outcome = test_harness
3627 .tree
3628 .on_new_payload(ExecutionData {
3629 payload: payload.into(),
3630 sidecar: ExecutionPayloadSidecar::none(),
3631 })
3632 .unwrap();
3633 assert!(outcome.outcome.is_syncing());
3634
3635 let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3637 assert_eq!(buffered.clone_sealed_block(), sealed);
3638 }
3639
3640 #[test]
3641 fn test_disconnected_block() {
3642 let s = include_str!("../../test-data/holesky/2.rlp");
3643 let data = Bytes::from_str(s).unwrap();
3644 let block = Block::decode(&mut data.as_ref()).unwrap();
3645 let sealed = block.seal_slow().try_recover().unwrap();
3646
3647 let mut test_harness = TestHarness::new(HOLESKY.clone());
3648
3649 let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap();
3650 assert_eq!(
3651 outcome,
3652 InsertPayloadOk::Inserted(BlockStatus::Disconnected {
3653 head: test_harness.tree.state.tree_state.current_canonical_head,
3654 missing_ancestor: sealed.parent_num_hash()
3655 })
3656 );
3657 }
3658
3659 #[tokio::test]
3660 async fn test_holesky_payload() {
3661 let s = include_str!("../../test-data/holesky/1.rlp");
3662 let data = Bytes::from_str(s).unwrap();
3663 let block: Block = Block::decode(&mut data.as_ref()).unwrap();
3664 let sealed = block.seal_slow();
3665 let payload =
3666 ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
3667
3668 let mut test_harness =
3669 TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3670
3671 let (tx, rx) = oneshot::channel();
3672 test_harness
3673 .tree
3674 .on_engine_message(FromEngine::Request(
3675 BeaconEngineMessage::NewPayload {
3676 payload: ExecutionData {
3677 payload: payload.clone().into(),
3678 sidecar: ExecutionPayloadSidecar::none(),
3679 },
3680 tx,
3681 }
3682 .into(),
3683 ))
3684 .unwrap();
3685
3686 let resp = rx.await.unwrap().unwrap();
3687 assert!(resp.is_syncing());
3688 }
3689
3690 #[test]
3691 fn test_tree_state_normal_descendant() {
3692 let mut tree_state = TreeState::new(BlockNumHash::default());
3693 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3694
3695 tree_state.insert_executed(blocks[0].clone());
3696 assert!(tree_state.is_descendant(
3697 blocks[0].recovered_block().num_hash(),
3698 blocks[1].recovered_block().header()
3699 ));
3700
3701 tree_state.insert_executed(blocks[1].clone());
3702
3703 assert!(tree_state.is_descendant(
3704 blocks[0].recovered_block().num_hash(),
3705 blocks[2].recovered_block().header()
3706 ));
3707 assert!(tree_state.is_descendant(
3708 blocks[1].recovered_block().num_hash(),
3709 blocks[2].recovered_block().header()
3710 ));
3711 }
3712
3713 #[tokio::test]
3714 async fn test_tree_state_insert_executed() {
3715 let mut tree_state = TreeState::new(BlockNumHash::default());
3716 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3717
3718 tree_state.insert_executed(blocks[0].clone());
3719 tree_state.insert_executed(blocks[1].clone());
3720
3721 assert_eq!(
3722 tree_state.parent_to_child.get(&blocks[0].recovered_block().hash()),
3723 Some(&HashSet::from_iter([blocks[1].recovered_block().hash()]))
3724 );
3725
3726 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3727
3728 tree_state.insert_executed(blocks[2].clone());
3729
3730 assert_eq!(
3731 tree_state.parent_to_child.get(&blocks[1].recovered_block().hash()),
3732 Some(&HashSet::from_iter([blocks[2].recovered_block().hash()]))
3733 );
3734 assert!(tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3735
3736 assert!(!tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3737 }
3738
3739 #[tokio::test]
3740 async fn test_tree_state_insert_executed_with_reorg() {
3741 let mut tree_state = TreeState::new(BlockNumHash::default());
3742 let mut test_block_builder = TestBlockBuilder::eth();
3743 let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3744
3745 for block in &blocks {
3746 tree_state.insert_executed(block.clone());
3747 }
3748 assert_eq!(tree_state.blocks_by_hash.len(), 5);
3749
3750 let fork_block_3 = test_block_builder
3751 .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3752 let fork_block_4 = test_block_builder
3753 .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3754 let fork_block_5 = test_block_builder
3755 .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3756
3757 tree_state.insert_executed(fork_block_3.clone());
3758 tree_state.insert_executed(fork_block_4.clone());
3759 tree_state.insert_executed(fork_block_5.clone());
3760
3761 assert_eq!(tree_state.blocks_by_hash.len(), 8);
3762 assert_eq!(tree_state.blocks_by_number[&3].len(), 2); assert_eq!(tree_state.parent_to_child[&blocks[1].recovered_block().hash()].len(), 2); tree_state.insert_executed(fork_block_4.clone());
3767 assert_eq!(tree_state.blocks_by_hash.len(), 8);
3768
3769 assert!(tree_state.parent_to_child[&fork_block_3.recovered_block().hash()]
3770 .contains(&fork_block_4.recovered_block().hash()));
3771 assert!(tree_state.parent_to_child[&fork_block_4.recovered_block().hash()]
3772 .contains(&fork_block_5.recovered_block().hash()));
3773
3774 assert_eq!(tree_state.blocks_by_number[&4].len(), 2);
3775 assert_eq!(tree_state.blocks_by_number[&5].len(), 2);
3776 }
3777
3778 #[tokio::test]
3779 async fn test_tree_state_remove_before() {
3780 let start_num_hash = BlockNumHash::default();
3781 let mut tree_state = TreeState::new(start_num_hash);
3782 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3783
3784 for block in &blocks {
3785 tree_state.insert_executed(block.clone());
3786 }
3787
3788 let last = blocks.last().unwrap();
3789
3790 tree_state.set_canonical_head(last.recovered_block().num_hash());
3792
3793 tree_state.remove_until(
3795 BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3796 start_num_hash.hash,
3797 Some(blocks[1].recovered_block().num_hash()),
3798 );
3799
3800 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3801 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3802 assert!(!tree_state.blocks_by_number.contains_key(&1));
3803 assert!(!tree_state.blocks_by_number.contains_key(&2));
3804
3805 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3806 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3807 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3808 assert!(tree_state.blocks_by_number.contains_key(&3));
3809 assert!(tree_state.blocks_by_number.contains_key(&4));
3810 assert!(tree_state.blocks_by_number.contains_key(&5));
3811
3812 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3813 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3814 assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3815 assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3816 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3817
3818 assert_eq!(
3819 tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3820 Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3821 );
3822 assert_eq!(
3823 tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3824 Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3825 );
3826 }
3827
3828 #[tokio::test]
3829 async fn test_tree_state_remove_before_finalized() {
3830 let start_num_hash = BlockNumHash::default();
3831 let mut tree_state = TreeState::new(start_num_hash);
3832 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3833
3834 for block in &blocks {
3835 tree_state.insert_executed(block.clone());
3836 }
3837
3838 let last = blocks.last().unwrap();
3839
3840 tree_state.set_canonical_head(last.recovered_block().num_hash());
3842
3843 tree_state.remove_until(
3845 BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3846 start_num_hash.hash,
3847 None,
3848 );
3849
3850 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3851 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3852 assert!(!tree_state.blocks_by_number.contains_key(&1));
3853 assert!(!tree_state.blocks_by_number.contains_key(&2));
3854
3855 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3856 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3857 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3858 assert!(tree_state.blocks_by_number.contains_key(&3));
3859 assert!(tree_state.blocks_by_number.contains_key(&4));
3860 assert!(tree_state.blocks_by_number.contains_key(&5));
3861
3862 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3863 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3864 assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3865 assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3866 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3867
3868 assert_eq!(
3869 tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3870 Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3871 );
3872 assert_eq!(
3873 tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3874 Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3875 );
3876 }
3877
3878 #[tokio::test]
3879 async fn test_tree_state_remove_before_lower_finalized() {
3880 let start_num_hash = BlockNumHash::default();
3881 let mut tree_state = TreeState::new(start_num_hash);
3882 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3883
3884 for block in &blocks {
3885 tree_state.insert_executed(block.clone());
3886 }
3887
3888 let last = blocks.last().unwrap();
3889
3890 tree_state.set_canonical_head(last.recovered_block().num_hash());
3892
3893 tree_state.remove_until(
3895 BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3896 start_num_hash.hash,
3897 Some(blocks[0].recovered_block().num_hash()),
3898 );
3899
3900 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3901 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3902 assert!(!tree_state.blocks_by_number.contains_key(&1));
3903 assert!(!tree_state.blocks_by_number.contains_key(&2));
3904
3905 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3906 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3907 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3908 assert!(tree_state.blocks_by_number.contains_key(&3));
3909 assert!(tree_state.blocks_by_number.contains_key(&4));
3910 assert!(tree_state.blocks_by_number.contains_key(&5));
3911
3912 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3913 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3914 assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3915 assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3916 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3917
3918 assert_eq!(
3919 tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3920 Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3921 );
3922 assert_eq!(
3923 tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3924 Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3925 );
3926 }
3927
3928 #[tokio::test]
3929 async fn test_tree_state_on_new_head_reorg() {
3930 reth_tracing::init_test_tracing();
3931 let chain_spec = MAINNET.clone();
3932
3933 let mut test_harness = TestHarness::new(chain_spec);
3935 test_harness.tree.config = test_harness
3936 .tree
3937 .config
3938 .with_persistence_threshold(1)
3939 .with_memory_block_buffer_target(1);
3940 let mut test_block_builder = TestBlockBuilder::eth();
3941 let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3942
3943 for block in &blocks {
3944 test_harness.tree.state.tree_state.insert_executed(block.clone());
3945 }
3946
3947 test_harness
3949 .tree
3950 .state
3951 .tree_state
3952 .set_canonical_head(blocks[2].recovered_block().num_hash());
3953
3954 let fork_block_3 = test_block_builder
3956 .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3957 let fork_block_4 = test_block_builder
3958 .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3959 let fork_block_5 = test_block_builder
3960 .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3961
3962 test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3963 test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3964 test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3965
3966 let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap();
3968 assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3969 if let Some(NewCanonicalChain::Commit { new }) = result {
3970 assert_eq!(new.len(), 2);
3971 assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash());
3972 assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash());
3973 }
3974
3975 let current_action = test_harness.tree.persistence_state.current_action();
3977 assert_eq!(current_action, None);
3978
3979 test_harness.tree.advance_persistence().unwrap();
3984 let current_action = test_harness.tree.persistence_state.current_action().cloned();
3985 assert_eq!(
3986 current_action,
3987 Some(CurrentPersistenceAction::SavingBlocks {
3988 highest: blocks[1].recovered_block().num_hash()
3989 })
3990 );
3991
3992 let received_action = test_harness.action_rx.recv().unwrap();
3994 let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
3995 panic!("received wrong action");
3996 };
3997 assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
3998
3999 sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap();
4001
4002 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4004 assert_eq!(
4005 current_action,
4006 Some(CurrentPersistenceAction::SavingBlocks {
4007 highest: blocks[1].recovered_block().num_hash()
4008 })
4009 );
4010
4011 test_harness.tree.advance_persistence().unwrap();
4013 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4014 assert_eq!(current_action, None);
4015
4016 let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap();
4018 assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
4019
4020 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4021 assert_eq!(new.len(), 3);
4022 assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash());
4023 assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash());
4024 assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash());
4025
4026 assert_eq!(old.len(), 1);
4027 assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash());
4028 }
4029
4030 test_harness.tree.advance_persistence().unwrap();
4032 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4033 assert_eq!(current_action, None);
4034
4035 test_harness
4037 .tree
4038 .state
4039 .tree_state
4040 .set_canonical_head(fork_block_5.recovered_block().num_hash());
4041
4042 test_harness.tree.advance_persistence().unwrap();
4045 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4046 assert_eq!(
4047 current_action,
4048 Some(CurrentPersistenceAction::SavingBlocks {
4049 highest: fork_block_4.recovered_block().num_hash()
4050 })
4051 );
4052 }
4053
4054 #[test]
4055 fn test_tree_state_on_new_head_deep_fork() {
4056 reth_tracing::init_test_tracing();
4057
4058 let chain_spec = MAINNET.clone();
4059 let mut test_harness = TestHarness::new(chain_spec);
4060 let mut test_block_builder = TestBlockBuilder::eth();
4061
4062 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4063
4064 for block in &blocks {
4065 test_harness.tree.state.tree_state.insert_executed(block.clone());
4066 }
4067
4068 let last_block = blocks.last().unwrap().recovered_block().clone();
4070
4071 test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
4072
4073 let chain_a = test_block_builder.create_fork(&last_block, 10);
4075 let chain_b = test_block_builder.create_fork(&last_block, 10);
4076
4077 for block in &chain_a {
4078 test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4079 block: ExecutedBlock {
4080 recovered_block: Arc::new(block.clone()),
4081 execution_output: Arc::new(ExecutionOutcome::default()),
4082 hashed_state: Arc::new(HashedPostState::default()),
4083 },
4084 trie: Arc::new(TrieUpdates::default()),
4085 });
4086 }
4087 test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
4088
4089 for block in &chain_b {
4090 test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4091 block: ExecutedBlock {
4092 recovered_block: Arc::new(block.clone()),
4093 execution_output: Arc::new(ExecutionOutcome::default()),
4094 hashed_state: Arc::new(HashedPostState::default()),
4095 },
4096 trie: Arc::new(TrieUpdates::default()),
4097 });
4098 }
4099
4100 let mut expected_new = Vec::new();
4102 for block in &chain_b {
4103 let result = test_harness.tree.on_new_head(block.hash()).unwrap();
4105 assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
4106
4107 expected_new.push(block);
4108 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4109 assert_eq!(new.len(), expected_new.len());
4110 for (index, block) in expected_new.iter().enumerate() {
4111 assert_eq!(new[index].recovered_block().hash(), block.hash());
4112 }
4113
4114 assert_eq!(old.len(), chain_a.len());
4115 for (index, block) in chain_a.iter().enumerate() {
4116 assert_eq!(old[index].recovered_block().hash(), block.hash());
4117 }
4118 }
4119
4120 test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
4122 }
4123 }
4124
4125 #[tokio::test]
4126 async fn test_get_canonical_blocks_to_persist() {
4127 let chain_spec = MAINNET.clone();
4128 let mut test_harness = TestHarness::new(chain_spec);
4129 let mut test_block_builder = TestBlockBuilder::eth();
4130
4131 let canonical_head_number = 9;
4132 let blocks: Vec<_> =
4133 test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
4134 test_harness = test_harness.with_blocks(blocks.clone());
4135
4136 let last_persisted_block_number = 3;
4137 test_harness.tree.persistence_state.last_persisted_block.number =
4138 last_persisted_block_number;
4139
4140 let persistence_threshold = 4;
4141 let memory_block_buffer_target = 3;
4142 test_harness.tree.config = TreeConfig::default()
4143 .with_persistence_threshold(persistence_threshold)
4144 .with_memory_block_buffer_target(memory_block_buffer_target);
4145
4146 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4147
4148 let expected_blocks_to_persist_length: usize =
4149 (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
4150 .try_into()
4151 .unwrap();
4152
4153 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4154 for (i, item) in
4155 blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
4156 {
4157 assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
4158 }
4159
4160 let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
4162 let fork_block_hash = fork_block.recovered_block().hash();
4163 test_harness.tree.state.tree_state.insert_executed(fork_block);
4164
4165 assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
4166
4167 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4168 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4169
4170 assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
4172
4173 assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
4175 b.recovered_block().hash() == blocks[4].recovered_block().hash()));
4176
4177 test_harness.tree.advance_persistence().expect("advancing persistence should succeed");
4179 assert_eq!(
4180 test_harness.tree.persistence_state.current_action().cloned(),
4181 Some(CurrentPersistenceAction::SavingBlocks {
4182 highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
4183 })
4184 );
4185 }
4186
4187 #[tokio::test]
4188 async fn test_engine_tree_fcu_missing_head() {
4189 let chain_spec = MAINNET.clone();
4190 let mut test_harness = TestHarness::new(chain_spec.clone());
4191
4192 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
4193
4194 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4195 test_harness = test_harness.with_blocks(blocks);
4196
4197 let missing_block = test_block_builder
4198 .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash());
4199
4200 test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
4201
4202 let event = test_harness.from_tree_rx.recv().await.unwrap();
4204 match event {
4205 EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
4206 let expected_block_set = HashSet::from_iter([missing_block.hash()]);
4207 assert_eq!(actual_block_set, expected_block_set);
4208 }
4209 _ => panic!("Unexpected event: {:#?}", event),
4210 }
4211 }
4212
4213 #[tokio::test]
4214 async fn test_engine_tree_fcu_canon_chain_insertion() {
4215 let chain_spec = MAINNET.clone();
4216 let mut test_harness = TestHarness::new(chain_spec.clone());
4217
4218 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4219 test_harness = test_harness.with_blocks(base_chain.clone());
4220
4221 test_harness
4222 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4223 .await;
4224
4225 let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3);
4227
4228 test_harness.insert_chain(main_chain).await;
4229 }
4230
4231 #[tokio::test]
4232 async fn test_engine_tree_fcu_reorg_with_all_blocks() {
4233 let chain_spec = MAINNET.clone();
4234 let mut test_harness = TestHarness::new(chain_spec.clone());
4235
4236 let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
4237 test_harness = test_harness.with_blocks(main_chain.clone());
4238
4239 let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3);
4240 let fork_chain_last_hash = fork_chain.last().unwrap().hash();
4241
4242 for block in &fork_chain {
4244 test_harness.insert_block(block.clone()).unwrap();
4245 }
4246
4247 test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4248
4249 test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
4251
4252 test_harness.check_canon_commit(fork_chain_last_hash).await;
4254
4255 test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4256
4257 test_harness.check_canon_head(fork_chain_last_hash);
4259 }
4260
4261 #[tokio::test]
4262 async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
4263 reth_tracing::init_test_tracing();
4264
4265 let chain_spec = MAINNET.clone();
4266 let mut test_harness = TestHarness::new(chain_spec.clone());
4267
4268 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4269 test_harness = test_harness.with_blocks(base_chain.clone());
4270
4271 test_harness
4272 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4273 .await;
4274
4275 let main_chain = test_harness
4277 .block_builder
4278 .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4279
4280 let main_chain_last_hash = main_chain.last().unwrap().hash();
4281 test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4282
4283 test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4284
4285 let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
4287 let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
4288 block_number: backfill_finished_block_number,
4289 });
4290
4291 let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
4292 test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
4294 test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
4295
4296 let event = test_harness.from_tree_rx.recv().await.unwrap();
4297 match event {
4298 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4299 assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
4300 }
4301 _ => panic!("Unexpected event: {:#?}", event),
4302 }
4303
4304 test_harness
4305 .tree
4306 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
4307 .last()
4308 .unwrap()
4309 .clone()]))
4310 .unwrap();
4311
4312 let event = test_harness.from_tree_rx.recv().await.unwrap();
4313 match event {
4314 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4315 assert_eq!(
4316 total_blocks,
4317 (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
4318 );
4319 assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
4320 }
4321 _ => panic!("Unexpected event: {:#?}", event),
4322 }
4323 }
4324
4325 #[tokio::test]
4326 async fn test_engine_tree_live_sync_transition_eventually_canonical() {
4327 reth_tracing::init_test_tracing();
4328
4329 let chain_spec = MAINNET.clone();
4330 let mut test_harness = TestHarness::new(chain_spec.clone());
4331 test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
4332
4333 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4335 test_harness = test_harness.with_blocks(base_chain.clone());
4336
4337 test_harness
4339 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4340 .await;
4341
4342 let main_chain = test_harness
4345 .block_builder
4346 .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4347
4348 let main_chain_last = main_chain.last().unwrap();
4349 let main_chain_last_hash = main_chain_last.hash();
4350 let main_chain_backfill_target =
4351 main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
4352 let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
4353
4354 test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4356 test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4357
4358 let event = test_harness.from_tree_rx.recv().await.unwrap();
4360 match event {
4361 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4362 assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
4363 }
4364 _ => panic!("Unexpected event: {:#?}", event),
4365 }
4366
4367 test_harness
4369 .tree
4370 .on_engine_message(FromEngine::DownloadedBlocks(vec![
4371 main_chain_backfill_target.clone()
4372 ]))
4373 .unwrap();
4374
4375 let event = test_harness.from_tree_rx.recv().await.unwrap();
4377 match event {
4378 EngineApiEvent::BackfillAction(BackfillAction::Start(
4379 reth_stages::PipelineTarget::Sync(target_hash),
4380 )) => {
4381 assert_eq!(target_hash, main_chain_backfill_target_hash);
4382 }
4383 _ => panic!("Unexpected event: {:#?}", event),
4384 }
4385
4386 let backfilled_chain: Vec<_> =
4388 main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
4389 test_harness.persist_blocks(backfilled_chain.clone());
4390
4391 test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
4392
4393 test_harness
4395 .tree
4396 .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
4397 ControlFlow::Continue { block_number: main_chain_backfill_target.number },
4398 )))
4399 .unwrap();
4400
4401 test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4403
4404 let event = test_harness.from_tree_rx.recv().await.unwrap();
4405 match event {
4406 EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
4407 assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
4408 }
4409 _ => panic!("Unexpected event: {:#?}", event),
4410 }
4411
4412 test_harness
4414 .tree
4415 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
4416 .unwrap();
4417
4418 let event = test_harness.from_tree_rx.recv().await.unwrap();
4420 match event {
4421 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4422 assert_eq!(
4423 total_blocks,
4424 (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
4425 );
4426 assert_eq!(initial_hash, main_chain_last.parent_hash);
4427 }
4428 _ => panic!("Unexpected event: {:#?}", event),
4429 }
4430
4431 let remaining: Vec<_> = main_chain
4432 .clone()
4433 .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
4434 .collect();
4435
4436 test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
4437
4438 test_harness
4440 .tree
4441 .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
4442 .unwrap();
4443
4444 test_harness.check_canon_chain_insertion(remaining).await;
4445
4446 test_harness.check_canon_commit(main_chain_last_hash).await;
4448
4449 test_harness.check_canon_head(main_chain_last_hash);
4451 }
4452
4453 #[tokio::test]
4454 async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
4455 reth_tracing::init_test_tracing();
4456
4457 let chain_spec = MAINNET.clone();
4458 let mut test_harness = TestHarness::new(chain_spec.clone());
4459
4460 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4462 test_harness = test_harness.with_blocks(base_chain.clone());
4463
4464 test_harness
4466 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4467 .await;
4468
4469 let main_chain =
4471 test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10);
4472 let target = main_chain.get(5).unwrap();
4474 let target_hash = target.hash();
4475 let main_last = main_chain.last().unwrap();
4476 let main_last_hash = main_last.hash();
4477
4478 test_harness.insert_chain(main_chain).await;
4480
4481 test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4483
4484 test_harness.check_canon_commit(target_hash).await;
4485 test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4486
4487 test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4489
4490 test_harness.check_canon_commit(main_last_hash).await;
4491 test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4492 test_harness.check_canon_head(main_last_hash);
4493 }
4494
4495 #[tokio::test]
4496 async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4497 reth_tracing::init_test_tracing();
4498
4499 let chain_spec = MAINNET.clone();
4500 let mut test_harness = TestHarness::new(chain_spec.clone());
4501
4502 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4504 test_harness = test_harness.with_blocks(base_chain.clone());
4505
4506 let old_head = base_chain.first().unwrap().recovered_block();
4507
4508 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4510 let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4511
4512 test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4513 test_harness.insert_chain(extension_chain).await;
4514
4515 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4517
4518 let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4520 let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4521
4522 test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4524 for block in &chain_a {
4525 test_harness.send_new_payload(block.clone()).await;
4526 }
4527
4528 test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4529
4530 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4532 for block in &chain_b {
4533 test_harness.send_new_payload(block.clone()).await;
4534 }
4535
4536 test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4537
4538 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4540 test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4541
4542 test_harness.check_canon_commit(chain_b_tip_hash).await;
4544
4545 test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4547
4548 test_harness.check_canon_head(chain_b_tip_hash);
4550
4551 assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4553 }
4554
4555 #[tokio::test]
4556 async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4557 let chain_spec = MAINNET.clone();
4558 let mut test_harness = TestHarness::new(chain_spec.clone());
4559
4560 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4561 test_harness = test_harness.with_blocks(base_chain.clone());
4562
4563 let side_chain =
4566 test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2);
4567
4568 let buffered_block = side_chain.last().unwrap();
4570 let buffered_block_hash = buffered_block.hash();
4571
4572 test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4573 test_harness.send_new_payload(buffered_block.clone()).await;
4574
4575 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4576
4577 let non_buffered_block = side_chain.first().unwrap();
4578 let non_buffered_block_hash = non_buffered_block.hash();
4579
4580 test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4582 test_harness.send_new_payload(non_buffered_block.clone()).await;
4583 assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4584
4585 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4587
4588 test_harness.check_canon_block_added(non_buffered_block_hash).await;
4590 test_harness.check_canon_block_added(buffered_block_hash).await;
4591 }
4592
4593 #[tokio::test]
4594 async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4595 reth_tracing::init_test_tracing();
4596
4597 let chain_spec = MAINNET.clone();
4598 let mut test_harness = TestHarness::new(chain_spec.clone());
4599
4600 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4602 test_harness = test_harness.with_blocks(base_chain.clone());
4603
4604 let old_head = base_chain.first().unwrap().recovered_block();
4605
4606 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4608 let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4609 test_harness.insert_chain(extension_chain).await;
4610
4611 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4613
4614 let total_fork_elements = 10;
4616 let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4617 let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4618
4619 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4621 for block in &chain_b {
4622 test_harness.send_new_payload(block.clone()).await;
4623 test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4624 test_harness.check_canon_block_added(block.hash()).await;
4625 test_harness.check_canon_commit(block.hash()).await;
4626 test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4627 }
4628
4629 let invalid_index = 3;
4631 test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4632 for block in &chain_a {
4633 test_harness.send_new_payload(block.clone()).await;
4634 }
4635
4636 test_harness
4639 .check_fork_chain_insertion(
4640 chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4641 )
4642 .await;
4643 for block in &chain_a[chain_a.len() - invalid_index - 1..] {
4644 test_harness.check_invalid_block(block.hash()).await;
4645 }
4646
4647 let chain_a_tip_hash = chain_a.last().unwrap().hash();
4649 test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4650
4651 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4653
4654 test_harness.check_canon_head(chain_b_tip_hash);
4656
4657 test_harness.check_canon_head(chain_b_tip_hash);
4659 }
4660
4661 #[tokio::test]
4662 async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4663 reth_tracing::init_test_tracing();
4664 let chain_spec = MAINNET.clone();
4665 let mut test_harness = TestHarness::new(chain_spec.clone());
4666
4667 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4668 test_harness = test_harness.with_blocks(base_chain.clone());
4669
4670 let side_chain = test_harness
4672 .block_builder
4673 .create_fork(base_chain.last().unwrap().recovered_block(), 15);
4674 let invalid_index = 9;
4675
4676 test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4677
4678 for (index, block) in side_chain.iter().enumerate() {
4679 test_harness.send_new_payload(block.clone()).await;
4680
4681 if index < side_chain.len() - invalid_index - 1 {
4682 test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4683 }
4684 }
4685
4686 let fork_tip_hash = side_chain.last().unwrap().hash();
4688 test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4689 }
4690}