1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{
7 cached_state::CachedStateProvider, executor::WorkloadExecutor, metrics::EngineApiMetrics,
8 },
9};
10use alloy_consensus::BlockHeader;
11use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash, NumHash};
12use alloy_primitives::{
13 map::{HashMap, HashSet},
14 BlockNumber, B256, U256,
15};
16use alloy_rpc_types_engine::{
17 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
18};
19use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
20use payload_processor::sparse_trie::StateRootComputeOutcome;
21use persistence_state::CurrentPersistenceAction;
22use reth_chain_state::{
23 CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
24 MemoryOverlayStateProvider, NewCanonicalChain,
25};
26use reth_consensus::{Consensus, FullConsensus};
27pub use reth_engine_primitives::InvalidBlockHook;
28use reth_engine_primitives::{
29 BeaconConsensusEngineEvent, BeaconEngineMessage, BeaconOnNewPayloadError, EngineValidator,
30 ExecutionPayload, ForkchoiceStateTracker, OnForkChoiceUpdated,
31};
32use reth_errors::{ConsensusError, ProviderResult};
33use reth_ethereum_primitives::EthPrimitives;
34use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
35use reth_payload_builder::PayloadBuilderHandle;
36use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes, PayloadTypes};
37use reth_primitives_traits::{
38 Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
39};
40use reth_provider::{
41 providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
42 ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
43 StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
44};
45use reth_revm::database::StateProviderDatabase;
46use reth_stages_api::ControlFlow;
47use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
48use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
49use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
50use std::{
51 collections::{btree_map, hash_map, BTreeMap, VecDeque},
52 fmt::Debug,
53 ops::Bound,
54 sync::{
55 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
56 Arc,
57 },
58 time::Instant,
59};
60use tokio::sync::{
61 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
62 oneshot::{self, error::TryRecvError},
63};
64use tracing::*;
65
66mod block_buffer;
67mod cached_state;
68pub mod error;
69mod invalid_block_hook;
70mod invalid_headers;
71mod metrics;
72mod payload_processor;
73mod persistence_state;
74#[expect(unused)]
76mod trie_updates;
77
78use crate::tree::error::AdvancePersistenceError;
79pub use block_buffer::BlockBuffer;
80pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
81pub use invalid_headers::InvalidHeaderCache;
82pub use payload_processor::*;
83pub use persistence_state::PersistenceState;
84pub use reth_engine_primitives::TreeConfig;
85use reth_evm::execute::BlockExecutionOutput;
86
87pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
93
94#[derive(Debug, Default)]
101pub struct TreeState<N: NodePrimitives = EthPrimitives> {
102 blocks_by_hash: HashMap<B256, ExecutedBlockWithTrieUpdates<N>>,
106 blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlockWithTrieUpdates<N>>>,
112 parent_to_child: HashMap<B256, HashSet<B256>>,
114 persisted_trie_updates: HashMap<B256, (BlockNumber, Arc<TrieUpdates>)>,
118 current_canonical_head: BlockNumHash,
120}
121
122impl<N: NodePrimitives> TreeState<N> {
123 fn new(current_canonical_head: BlockNumHash) -> Self {
125 Self {
126 blocks_by_hash: HashMap::default(),
127 blocks_by_number: BTreeMap::new(),
128 current_canonical_head,
129 parent_to_child: HashMap::default(),
130 persisted_trie_updates: HashMap::default(),
131 }
132 }
133
134 fn reset(&mut self, current_canonical_head: BlockNumHash) {
136 *self = Self::new(current_canonical_head);
137 }
138
139 fn block_count(&self) -> usize {
141 self.blocks_by_hash.len()
142 }
143
144 fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlockWithTrieUpdates<N>> {
146 self.blocks_by_hash.get(&hash)
147 }
148
149 fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock<N::Block>>> {
151 self.blocks_by_hash.get(&hash).map(|b| Arc::new(b.recovered_block().sealed_block().clone()))
152 }
153
154 fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlockWithTrieUpdates<N>>)> {
159 let block = self.blocks_by_hash.get(&hash).cloned()?;
160 let mut parent_hash = block.recovered_block().parent_hash();
161 let mut blocks = vec![block];
162 while let Some(executed) = self.blocks_by_hash.get(&parent_hash) {
163 parent_hash = executed.recovered_block().parent_hash();
164 blocks.push(executed.clone());
165 }
166
167 Some((parent_hash, blocks))
168 }
169
170 fn insert_executed(&mut self, executed: ExecutedBlockWithTrieUpdates<N>) {
172 let hash = executed.recovered_block().hash();
173 let parent_hash = executed.recovered_block().parent_hash();
174 let block_number = executed.recovered_block().number();
175
176 if self.blocks_by_hash.contains_key(&hash) {
177 return;
178 }
179
180 self.blocks_by_hash.insert(hash, executed.clone());
181
182 self.blocks_by_number.entry(block_number).or_default().push(executed);
183
184 self.parent_to_child.entry(parent_hash).or_default().insert(hash);
185
186 for children in self.parent_to_child.values_mut() {
187 children.retain(|child| self.blocks_by_hash.contains_key(child));
188 }
189 }
190
191 fn remove_by_hash(
197 &mut self,
198 hash: B256,
199 ) -> Option<(ExecutedBlockWithTrieUpdates<N>, HashSet<B256>)> {
200 let executed = self.blocks_by_hash.remove(&hash)?;
201
202 let parent_entry = self.parent_to_child.entry(executed.recovered_block().parent_hash());
204 if let hash_map::Entry::Occupied(mut entry) = parent_entry {
205 entry.get_mut().remove(&hash);
206
207 if entry.get().is_empty() {
208 entry.remove();
209 }
210 }
211
212 let children = self.parent_to_child.remove(&hash).unwrap_or_default();
214
215 let block_number_entry = self.blocks_by_number.entry(executed.recovered_block().number());
217 if let btree_map::Entry::Occupied(mut entry) = block_number_entry {
218 if let Some(index) = entry.get().iter().position(|b| b.recovered_block().hash() == hash)
220 {
221 entry.get_mut().swap_remove(index);
222
223 if entry.get().is_empty() {
225 entry.remove();
226 }
227 }
228 }
229
230 Some((executed, children))
231 }
232
233 pub(crate) fn is_canonical(&self, hash: B256) -> bool {
235 let mut current_block = self.current_canonical_head.hash;
236 if current_block == hash {
237 return true
238 }
239
240 while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
241 current_block = executed.recovered_block().parent_hash();
242 if current_block == hash {
243 return true
244 }
245 }
246
247 false
248 }
249
250 pub(crate) fn remove_canonical_until(
253 &mut self,
254 upper_bound: BlockNumber,
255 last_persisted_hash: B256,
256 ) {
257 debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
258
259 if !self.is_canonical(last_persisted_hash) {
262 return
263 }
264
265 let mut current_block = self.current_canonical_head.hash;
268 while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
269 current_block = executed.recovered_block().parent_hash();
270 if executed.recovered_block().number() <= upper_bound {
271 debug!(target: "engine::tree", num_hash=?executed.recovered_block().num_hash(), "Attempting to remove block walking back from the head");
272 if let Some((removed, _)) = self.remove_by_hash(executed.recovered_block().hash()) {
273 debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed block walking back from the head");
274 self.persisted_trie_updates.insert(
276 removed.recovered_block().hash(),
277 (removed.recovered_block().number(), removed.trie),
278 );
279 }
280 }
281 }
282 debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removed canonical blocks from the tree");
283 }
284
285 pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
288 let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
289
290 let blocks_to_remove = self
299 .blocks_by_number
300 .range((Bound::Unbounded, Bound::Excluded(finalized_num)))
301 .flat_map(|(_, blocks)| blocks.iter().map(|b| b.recovered_block().hash()))
302 .collect::<Vec<_>>();
303 for hash in blocks_to_remove {
304 if let Some((removed, _)) = self.remove_by_hash(hash) {
305 debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain block");
306 }
307 }
308
309 self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num > finalized_num);
311
312 let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
319
320 if let Some(position) =
322 blocks_to_remove.iter().position(|b| b.recovered_block().hash() == finalized_hash)
323 {
324 let finalized_block = blocks_to_remove.swap_remove(position);
325 self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
326 }
327
328 let mut blocks_to_remove = blocks_to_remove
329 .into_iter()
330 .map(|e| e.recovered_block().hash())
331 .collect::<VecDeque<_>>();
332 while let Some(block) = blocks_to_remove.pop_front() {
333 if let Some((removed, children)) = self.remove_by_hash(block) {
334 debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain child block");
335 blocks_to_remove.extend(children);
336 }
337 }
338 }
339
340 pub(crate) fn remove_until(
351 &mut self,
352 upper_bound: BlockNumHash,
353 last_persisted_hash: B256,
354 finalized_num_hash: Option<BlockNumHash>,
355 ) {
356 debug!(target: "engine::tree", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
357
358 let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
361 if upper_bound.number < finalized.number {
362 finalized = upper_bound;
363 debug!(target: "engine::tree", ?finalized, "Adjusted upper bound");
364 }
365 finalized
366 });
367
368 self.remove_canonical_until(upper_bound.number, last_persisted_hash);
376
377 if let Some(finalized_num_hash) = finalized_num_hash {
380 self.prune_finalized_sidechains(finalized_num_hash);
381 }
382 }
383
384 fn is_descendant(&self, first: BlockNumHash, second: &N::BlockHeader) -> bool {
388 if second.parent_hash() == first.hash {
391 return true
392 }
393
394 if second.number() <= first.number {
397 return false
398 }
399
400 let Some(mut current_block) = self.block_by_hash(second.parent_hash()) else {
402 return false
404 };
405
406 while current_block.number() > first.number + 1 {
407 let Some(block) = self.block_by_hash(current_block.header().parent_hash()) else {
408 return false
410 };
411
412 current_block = block;
413 }
414
415 current_block.parent_hash() == first.hash
417 }
418
419 const fn set_canonical_head(&mut self, new_head: BlockNumHash) {
421 self.current_canonical_head = new_head;
422 }
423
424 const fn canonical_head(&self) -> &BlockNumHash {
426 &self.current_canonical_head
427 }
428
429 const fn canonical_block_hash(&self) -> B256 {
431 self.canonical_head().hash
432 }
433
434 const fn canonical_block_number(&self) -> BlockNumber {
436 self.canonical_head().number
437 }
438}
439
440#[derive(Clone, Debug)]
442pub struct StateProviderBuilder<N: NodePrimitives, P> {
443 provider_factory: P,
445 historical: B256,
447 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
449}
450
451impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
452 pub const fn new(
455 provider_factory: P,
456 historical: B256,
457 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
458 ) -> Self {
459 Self { provider_factory, historical, overlay }
460 }
461}
462
463impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
464where
465 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
466{
467 pub fn build(&self) -> ProviderResult<StateProviderBox> {
469 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
470 if let Some(overlay) = self.overlay.clone() {
471 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
472 }
473 Ok(provider)
474 }
475}
476
477#[derive(Debug)]
481pub struct EngineApiTreeState<N: NodePrimitives> {
482 tree_state: TreeState<N>,
484 forkchoice_state_tracker: ForkchoiceStateTracker,
486 buffer: BlockBuffer<N::Block>,
488 invalid_headers: InvalidHeaderCache,
491}
492
493impl<N: NodePrimitives> EngineApiTreeState<N> {
494 fn new(
495 block_buffer_limit: u32,
496 max_invalid_header_cache_length: u32,
497 canonical_block: BlockNumHash,
498 ) -> Self {
499 Self {
500 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
501 buffer: BlockBuffer::new(block_buffer_limit),
502 tree_state: TreeState::new(canonical_block),
503 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
504 }
505 }
506}
507
508#[derive(Debug)]
510pub struct TreeOutcome<T> {
511 pub outcome: T,
513 pub event: Option<TreeEvent>,
515}
516
517impl<T> TreeOutcome<T> {
518 pub const fn new(outcome: T) -> Self {
520 Self { outcome, event: None }
521 }
522
523 pub fn with_event(mut self, event: TreeEvent) -> Self {
525 self.event = Some(event);
526 self
527 }
528}
529
530#[derive(Debug)]
532pub enum TreeEvent {
533 TreeAction(TreeAction),
535 BackfillAction(BackfillAction),
537 Download(DownloadRequest),
539}
540
541impl TreeEvent {
542 const fn is_backfill_action(&self) -> bool {
544 matches!(self, Self::BackfillAction(_))
545 }
546}
547
548#[derive(Debug)]
550pub enum TreeAction {
551 MakeCanonical {
553 sync_target_head: B256,
555 },
556}
557
558pub struct EngineApiTreeHandler<N, P, E, T, V, C>
563where
564 N: NodePrimitives,
565 T: PayloadTypes,
566{
567 provider: P,
568 executor_provider: E,
569 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
570 payload_validator: V,
571 state: EngineApiTreeState<N>,
573 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
582 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
584 outgoing: UnboundedSender<EngineApiEvent<N>>,
586 persistence: PersistenceHandle<N>,
588 persistence_state: PersistenceState,
590 backfill_sync_state: BackfillSyncState,
592 canonical_in_memory_state: CanonicalInMemoryState<N>,
595 payload_builder: PayloadBuilderHandle<T>,
598 config: TreeConfig,
600 metrics: EngineApiMetrics,
602 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
604 engine_kind: EngineApiKind,
606 payload_processor: PayloadProcessor<N, C>,
608}
609
610impl<N, P: Debug, E: Debug, T: PayloadTypes + Debug, V: Debug, C: Debug> std::fmt::Debug
611 for EngineApiTreeHandler<N, P, E, T, V, C>
612where
613 N: NodePrimitives,
614{
615 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616 f.debug_struct("EngineApiTreeHandler")
617 .field("provider", &self.provider)
618 .field("executor_provider", &self.executor_provider)
619 .field("consensus", &self.consensus)
620 .field("payload_validator", &self.payload_validator)
621 .field("state", &self.state)
622 .field("incoming_tx", &self.incoming_tx)
623 .field("persistence", &self.persistence)
624 .field("persistence_state", &self.persistence_state)
625 .field("backfill_sync_state", &self.backfill_sync_state)
626 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
627 .field("payload_builder", &self.payload_builder)
628 .field("config", &self.config)
629 .field("metrics", &self.metrics)
630 .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
631 .field("engine_kind", &self.engine_kind)
632 .finish()
633 }
634}
635
636impl<N, P, E, T, V, C> EngineApiTreeHandler<N, P, E, T, V, C>
637where
638 N: NodePrimitives,
639 P: DatabaseProviderFactory
640 + BlockReader<Block = N::Block, Header = N::BlockHeader>
641 + StateProviderFactory
642 + StateReader<Receipt = N::Receipt>
643 + StateCommitmentProvider
644 + HashedPostStateProvider
645 + Clone
646 + 'static,
647 <P as DatabaseProviderFactory>::Provider:
648 BlockReader<Block = N::Block, Header = N::BlockHeader>,
649 E: BlockExecutorProvider<Primitives = N>,
650 C: ConfigureEvm<Primitives = N> + 'static,
651 T: PayloadTypes,
652 V: EngineValidator<T, Block = N::Block>,
653{
654 #[expect(clippy::too_many_arguments)]
656 pub fn new(
657 provider: P,
658 executor_provider: E,
659 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
660 payload_validator: V,
661 outgoing: UnboundedSender<EngineApiEvent<N>>,
662 state: EngineApiTreeState<N>,
663 canonical_in_memory_state: CanonicalInMemoryState<N>,
664 persistence: PersistenceHandle<N>,
665 persistence_state: PersistenceState,
666 payload_builder: PayloadBuilderHandle<T>,
667 config: TreeConfig,
668 engine_kind: EngineApiKind,
669 evm_config: C,
670 ) -> Self {
671 let (incoming_tx, incoming) = std::sync::mpsc::channel();
672
673 let payload_processor =
674 PayloadProcessor::new(WorkloadExecutor::default(), evm_config, &config);
675
676 Self {
677 provider,
678 executor_provider,
679 consensus,
680 payload_validator,
681 incoming,
682 outgoing,
683 persistence,
684 persistence_state,
685 backfill_sync_state: BackfillSyncState::Idle,
686 state,
687 canonical_in_memory_state,
688 payload_builder,
689 config,
690 metrics: Default::default(),
691 incoming_tx,
692 invalid_block_hook: Box::new(NoopInvalidBlockHook),
693 engine_kind,
694 payload_processor,
695 }
696 }
697
698 fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
700 self.invalid_block_hook = invalid_block_hook;
701 }
702
703 #[expect(clippy::complexity)]
709 pub fn spawn_new(
710 provider: P,
711 executor_provider: E,
712 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
713 payload_validator: V,
714 persistence: PersistenceHandle<N>,
715 payload_builder: PayloadBuilderHandle<T>,
716 canonical_in_memory_state: CanonicalInMemoryState<N>,
717 config: TreeConfig,
718 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
719 kind: EngineApiKind,
720 evm_config: C,
721 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
722 {
723 let best_block_number = provider.best_block_number().unwrap_or(0);
724 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
725
726 let persistence_state = PersistenceState {
727 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
728 rx: None,
729 };
730
731 let (tx, outgoing) = unbounded_channel();
732 let state = EngineApiTreeState::new(
733 config.block_buffer_limit(),
734 config.max_invalid_header_cache_length(),
735 header.num_hash(),
736 );
737
738 let mut task = Self::new(
739 provider,
740 executor_provider,
741 consensus,
742 payload_validator,
743 tx,
744 state,
745 canonical_in_memory_state,
746 persistence,
747 persistence_state,
748 payload_builder,
749 config,
750 kind,
751 evm_config,
752 );
753 task.set_invalid_block_hook(invalid_block_hook);
754 let incoming = task.incoming_tx.clone();
755 std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
756 (incoming, outgoing)
757 }
758
759 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
761 self.incoming_tx.clone()
762 }
763
764 pub fn run(mut self) {
768 loop {
769 match self.try_recv_engine_message() {
770 Ok(Some(msg)) => {
771 debug!(target: "engine::tree", %msg, "received new engine message");
772 if let Err(fatal) = self.on_engine_message(msg) {
773 error!(target: "engine::tree", %fatal, "insert block fatal error");
774 return
775 }
776 }
777 Ok(None) => {
778 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
779 }
780 Err(_err) => {
781 error!(target: "engine::tree", "Engine channel disconnected");
782 return
783 }
784 }
785
786 if let Err(err) = self.advance_persistence() {
787 error!(target: "engine::tree", %err, "Advancing persistence failed");
788 return
789 }
790 }
791 }
792
793 fn on_downloaded(
799 &mut self,
800 mut blocks: Vec<RecoveredBlock<N::Block>>,
801 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
802 if blocks.is_empty() {
803 return Ok(None)
805 }
806
807 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
808 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
809 for block in blocks.drain(..batch) {
810 if let Some(event) = self.on_downloaded_block(block)? {
811 let needs_backfill = event.is_backfill_action();
812 self.on_tree_event(event)?;
813 if needs_backfill {
814 return Ok(None)
816 }
817 }
818 }
819
820 if !blocks.is_empty() {
822 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
823 }
824
825 Ok(None)
826 }
827
828 #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
842 fn on_new_payload(
843 &mut self,
844 payload: T::ExecutionData,
845 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
846 trace!(target: "engine::tree", "invoked new payload");
847 self.metrics.engine.new_payload_messages.increment(1);
848
849 let parent_hash = payload.parent_hash();
875 let block = match self.payload_validator.ensure_well_formed_payload(payload) {
876 Ok(block) => block,
877 Err(error) => {
878 error!(target: "engine::tree", %error, "Invalid payload");
879 let latest_valid_hash =
882 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
883 None
887 } else {
888 self.latest_valid_hash_for_invalid_payload(parent_hash)?
889 };
890
891 let status = PayloadStatusEnum::from(error);
892 return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
893 }
894 };
895
896 let block_hash = block.hash();
897 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
898 if lowest_buffered_ancestor == block_hash {
899 lowest_buffered_ancestor = block.parent_hash();
900 }
901
902 if let Some(status) =
904 self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?
905 {
906 return Ok(TreeOutcome::new(status))
907 }
908
909 let status = if self.backfill_sync_state.is_idle() {
910 let mut latest_valid_hash = None;
911 let num_hash = block.num_hash();
912 match self.insert_block(block) {
913 Ok(status) => {
914 let status = match status {
915 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
916 latest_valid_hash = Some(block_hash);
917 self.try_connect_buffered_blocks(num_hash)?;
918 PayloadStatusEnum::Valid
919 }
920 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
921 latest_valid_hash = Some(block_hash);
922 PayloadStatusEnum::Valid
923 }
924 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
925 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
926 PayloadStatusEnum::Syncing
928 }
929 };
930
931 PayloadStatus::new(status, latest_valid_hash)
932 }
933 Err(error) => self.on_insert_block_error(error)?,
934 }
935 } else if let Err(error) = self.buffer_block(block) {
936 self.on_insert_block_error(error)?
937 } else {
938 PayloadStatus::from_status(PayloadStatusEnum::Syncing)
939 };
940
941 let mut outcome = TreeOutcome::new(status);
942 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
944 if self.state.tree_state.canonical_block_hash() != block_hash {
946 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
947 sync_target_head: block_hash,
948 }));
949 }
950 }
951
952 Ok(outcome)
953 }
954
955 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
962 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
964 return Ok(None)
965 };
966
967 let new_head_number = new_head_block.recovered_block().number();
968 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
969
970 let mut new_chain = vec![new_head_block.clone()];
971 let mut current_hash = new_head_block.recovered_block().parent_hash();
972 let mut current_number = new_head_number - 1;
973
974 while current_number > current_canonical_number {
979 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
980 {
981 current_hash = block.recovered_block().parent_hash();
982 current_number -= 1;
983 new_chain.push(block);
984 } else {
985 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
986 return Ok(None);
989 }
990 }
991
992 if current_hash == self.state.tree_state.current_canonical_head.hash {
995 new_chain.reverse();
996
997 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
999 }
1000
1001 let mut old_chain = Vec::new();
1003 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
1004
1005 while current_canonical_number > current_number {
1008 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1009 old_chain.push(block.clone());
1010 old_hash = block.recovered_block().parent_hash();
1011 current_canonical_number -= 1;
1012 } else {
1013 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1015 return Ok(None);
1016 }
1017 }
1018
1019 debug_assert_eq!(current_number, current_canonical_number);
1021
1022 while old_hash != current_hash {
1025 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1026 old_hash = block.recovered_block().parent_hash();
1027 old_chain.push(block);
1028 } else {
1029 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1031 return Ok(None);
1032 }
1033
1034 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
1035 {
1036 current_hash = block.recovered_block().parent_hash();
1037 new_chain.push(block);
1038 } else {
1039 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
1041 return Ok(None);
1042 }
1043 }
1044 new_chain.reverse();
1045 old_chain.reverse();
1046
1047 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
1048 }
1049
1050 fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
1057 let canonical_head = self.state.tree_state.canonical_head();
1059 let mut current_hash = target_hash;
1060 while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
1061 if current_block.hash() == canonical_head.hash {
1062 return Ok(false)
1063 }
1064 if current_block.number() <= canonical_head.number {
1066 break
1067 }
1068 current_hash = current_block.parent_hash();
1069 }
1070
1071 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
1073 return Ok(false)
1074 }
1075
1076 if self.provider.block_number(target_hash)?.is_some() {
1078 return Ok(false)
1079 }
1080
1081 Ok(true)
1082 }
1083
1084 fn persisting_kind_for(&self, block: &N::BlockHeader) -> PersistingKind {
1086 let Some(action) = self.persistence_state.current_action() else {
1088 return PersistingKind::NotPersisting
1089 };
1090 let CurrentPersistenceAction::SavingBlocks { highest } = action else {
1092 return PersistingKind::PersistingNotDescendant
1093 };
1094
1095 if block.number() > highest.number && self.state.tree_state.is_descendant(*highest, block) {
1098 return PersistingKind::PersistingDescendant
1099 }
1100
1101 PersistingKind::PersistingNotDescendant
1103 }
1104
1105 #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
1114 fn on_forkchoice_updated(
1115 &mut self,
1116 state: ForkchoiceState,
1117 attrs: Option<T::PayloadAttributes>,
1118 version: EngineApiMessageVersion,
1119 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1120 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1121 self.metrics.engine.forkchoice_updated_messages.increment(1);
1122 self.canonical_in_memory_state.on_forkchoice_update_received();
1123
1124 if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1125 return Ok(TreeOutcome::new(on_updated))
1126 }
1127
1128 let valid_outcome = |head| {
1129 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1130 PayloadStatusEnum::Valid,
1131 Some(head),
1132 )))
1133 };
1134
1135 if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1151 trace!(target: "engine::tree", "fcu head hash is already canonical");
1152
1153 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1155 return Ok(TreeOutcome::new(outcome))
1157 }
1158
1159 if let Some(attr) = attrs {
1161 let tip = self
1162 .block_by_hash(self.state.tree_state.canonical_block_hash())?
1163 .ok_or_else(|| {
1164 ProviderError::HeaderNotFound(state.head_block_hash.into())
1167 })?;
1168 let updated = self.process_payload_attributes(attr, tip.header(), state, version);
1169 return Ok(TreeOutcome::new(updated))
1170 }
1171
1172 return Ok(valid_outcome(state.head_block_hash))
1174 }
1175
1176 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1178 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1179
1180 if self.engine_kind.is_opstack() {
1183 if let Some(attr) = attrs {
1184 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1185 let updated =
1186 self.process_payload_attributes(attr, &canonical_header, state, version);
1187 return Ok(TreeOutcome::new(updated))
1188 }
1189 }
1190
1191 return Ok(valid_outcome(state.head_block_hash))
1201 }
1202
1203 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1205 let tip = chain_update.tip().clone_sealed_header();
1206 self.on_canonical_chain_update(chain_update);
1207
1208 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1210 return Ok(TreeOutcome::new(outcome))
1212 }
1213
1214 if let Some(attr) = attrs {
1215 let updated = self.process_payload_attributes(attr, &tip, state, version);
1216 return Ok(TreeOutcome::new(updated))
1217 }
1218
1219 return Ok(valid_outcome(state.head_block_hash))
1220 }
1221
1222 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1229 !state.safe_block_hash.is_zero() &&
1231 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1232 {
1233 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1234 state.safe_block_hash
1235 } else {
1236 state.head_block_hash
1237 };
1238
1239 let target = self.lowest_buffered_ancestor_or(target);
1240 trace!(target: "engine::tree", %target, "downloading missing block");
1241
1242 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1243 PayloadStatusEnum::Syncing,
1244 )))
1245 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1246 }
1247
1248 #[expect(clippy::type_complexity)]
1257 fn try_recv_engine_message(
1258 &self,
1259 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1260 if self.persistence_state.in_progress() {
1261 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1263 Ok(msg) => Ok(Some(msg)),
1264 Err(err) => match err {
1265 RecvTimeoutError::Timeout => Ok(None),
1266 RecvTimeoutError::Disconnected => Err(RecvError),
1267 },
1268 }
1269 } else {
1270 self.incoming.recv().map(Some)
1271 }
1272 }
1273
1274 fn remove_blocks(&mut self, new_tip_num: u64) {
1277 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1278 if new_tip_num < self.persistence_state.last_persisted_block.number {
1279 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1280 let (tx, rx) = oneshot::channel();
1281 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1282 self.persistence_state.start_remove(new_tip_num, rx);
1283 }
1284 }
1285
1286 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1289 if blocks_to_persist.is_empty() {
1290 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1291 return
1292 }
1293
1294 let highest_num_hash = blocks_to_persist
1296 .iter()
1297 .max_by_key(|block| block.recovered_block().number())
1298 .map(|b| b.recovered_block().num_hash())
1299 .expect("Checked non-empty persisting blocks");
1300
1301 debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1302 let (tx, rx) = oneshot::channel();
1303 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1304
1305 self.persistence_state.start_save(highest_num_hash, rx);
1306 }
1307
1308 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1313 if self.persistence_state.in_progress() {
1314 let (mut rx, start_time, current_action) = self
1315 .persistence_state
1316 .rx
1317 .take()
1318 .expect("if a persistence task is in progress Receiver must be Some");
1319 match rx.try_recv() {
1321 Ok(last_persisted_hash_num) => {
1322 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1323 let Some(BlockNumHash {
1324 hash: last_persisted_block_hash,
1325 number: last_persisted_block_number,
1326 }) = last_persisted_hash_num
1327 else {
1328 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1331 return Ok(())
1332 };
1333
1334 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1335 self.persistence_state
1336 .finish(last_persisted_block_hash, last_persisted_block_number);
1337 self.on_new_persisted_block()?;
1338 }
1339 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1340 Err(TryRecvError::Empty) => {
1341 self.persistence_state.rx = Some((rx, start_time, current_action))
1342 }
1343 }
1344 }
1345
1346 if !self.persistence_state.in_progress() {
1347 if let Some(new_tip_num) = self.find_disk_reorg()? {
1348 self.remove_blocks(new_tip_num)
1349 } else if self.should_persist() {
1350 let blocks_to_persist = self.get_canonical_blocks_to_persist();
1351 self.persist_blocks(blocks_to_persist);
1352 }
1353 }
1354
1355 Ok(())
1356 }
1357
1358 fn on_engine_message(
1360 &mut self,
1361 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1362 ) -> Result<(), InsertBlockFatalError> {
1363 match msg {
1364 FromEngine::Event(event) => match event {
1365 FromOrchestrator::BackfillSyncStarted => {
1366 debug!(target: "engine::tree", "received backfill sync started event");
1367 self.backfill_sync_state = BackfillSyncState::Active;
1368 }
1369 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1370 self.on_backfill_sync_finished(ctrl)?;
1371 }
1372 },
1373 FromEngine::Request(request) => {
1374 match request {
1375 EngineApiRequest::InsertExecutedBlock(block) => {
1376 let block_num_hash = block.recovered_block().num_hash();
1377 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1378 let now = Instant::now();
1379
1380 if self.state.tree_state.canonical_block_hash() ==
1383 block.recovered_block().parent_hash()
1384 {
1385 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1386 self.canonical_in_memory_state.set_pending_block(block.clone());
1387 }
1388
1389 self.state.tree_state.insert_executed(block.clone());
1390 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1391 self.emit_event(EngineApiEvent::BeaconConsensus(
1392 BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1393 ));
1394 }
1395 EngineApiRequest::Beacon(request) => {
1396 match request {
1397 BeaconEngineMessage::ForkchoiceUpdated {
1398 state,
1399 payload_attrs,
1400 tx,
1401 version,
1402 } => {
1403 let mut output =
1404 self.on_forkchoice_updated(state, payload_attrs, version);
1405
1406 if let Ok(res) = &mut output {
1407 self.state
1409 .forkchoice_state_tracker
1410 .set_latest(state, res.outcome.forkchoice_status());
1411
1412 self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1414 state,
1415 res.outcome.forkchoice_status(),
1416 ));
1417
1418 self.on_maybe_tree_event(res.event.take())?;
1420 }
1421
1422 if let Err(err) =
1423 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1424 {
1425 self.metrics
1426 .engine
1427 .failed_forkchoice_updated_response_deliveries
1428 .increment(1);
1429 error!(target: "engine::tree", "Failed to send event: {err:?}");
1430 }
1431 }
1432 BeaconEngineMessage::NewPayload { payload, tx } => {
1433 let mut output = self.on_new_payload(payload);
1434
1435 let maybe_event =
1436 output.as_mut().ok().and_then(|out| out.event.take());
1437
1438 if let Err(err) =
1440 tx.send(output.map(|o| o.outcome).map_err(|e| {
1441 BeaconOnNewPayloadError::Internal(Box::new(e))
1442 }))
1443 {
1444 error!(target: "engine::tree", "Failed to send event: {err:?}");
1445 self.metrics
1446 .engine
1447 .failed_new_payload_response_deliveries
1448 .increment(1);
1449 }
1450
1451 self.on_maybe_tree_event(maybe_event)?;
1453 }
1454 BeaconEngineMessage::TransitionConfigurationExchanged => {
1455 self.canonical_in_memory_state
1458 .on_transition_configuration_exchanged();
1459 }
1460 }
1461 }
1462 }
1463 }
1464 FromEngine::DownloadedBlocks(blocks) => {
1465 if let Some(event) = self.on_downloaded(blocks)? {
1466 self.on_tree_event(event)?;
1467 }
1468 }
1469 }
1470 Ok(())
1471 }
1472
1473 fn on_backfill_sync_finished(
1487 &mut self,
1488 ctrl: ControlFlow,
1489 ) -> Result<(), InsertBlockFatalError> {
1490 debug!(target: "engine::tree", "received backfill sync finished event");
1491 self.backfill_sync_state = BackfillSyncState::Idle;
1492
1493 let mut backfill_height = ctrl.block_number();
1495
1496 if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1498 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1499 self.state.invalid_headers.insert(**bad_block);
1501
1502 backfill_height = Some(*target);
1504 }
1505
1506 let Some(backfill_height) = backfill_height else { return Ok(()) };
1508
1509 let Some(backfill_num_hash) = self
1515 .provider
1516 .block_hash(backfill_height)?
1517 .map(|hash| BlockNumHash { hash, number: backfill_height })
1518 else {
1519 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1520 return Ok(())
1521 };
1522
1523 if ctrl.is_unwind() {
1524 self.state.tree_state.reset(backfill_num_hash)
1527 } else {
1528 self.state.tree_state.remove_until(
1529 backfill_num_hash,
1530 self.persistence_state.last_persisted_block.hash,
1531 Some(backfill_num_hash),
1532 );
1533 }
1534
1535 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1536 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1537
1538 self.state.buffer.remove_old_blocks(backfill_height);
1540 self.canonical_in_memory_state.clear_state();
1543
1544 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1545 self.state.tree_state.set_canonical_head(new_head.num_hash());
1548 self.persistence_state.finish(new_head.hash(), new_head.number());
1549
1550 self.canonical_in_memory_state.set_canonical_head(new_head);
1552 }
1553
1554 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1557 else {
1558 return Ok(())
1559 };
1560 if sync_target_state.finalized_block_hash.is_zero() {
1561 return Ok(())
1563 }
1564 let newest_finalized = self
1566 .state
1567 .buffer
1568 .block(&sync_target_state.finalized_block_hash)
1569 .map(|block| block.number());
1570
1571 if let Some(backfill_target) =
1577 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1578 self.backfill_sync_target(progress, finalized_number, None)
1581 })
1582 {
1583 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1585 backfill_target.into(),
1586 )));
1587 return Ok(())
1588 };
1589
1590 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1592 }
1593
1594 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1598 if let Some(chain_update) = self.on_new_head(target)? {
1599 self.on_canonical_chain_update(chain_update);
1600 }
1601
1602 Ok(())
1603 }
1604
1605 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1607 if let Some(event) = event {
1608 self.on_tree_event(event)?;
1609 }
1610
1611 Ok(())
1612 }
1613
1614 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1618 match event {
1619 TreeEvent::TreeAction(action) => match action {
1620 TreeAction::MakeCanonical { sync_target_head } => {
1621 self.make_canonical(sync_target_head)?;
1622 }
1623 },
1624 TreeEvent::BackfillAction(action) => {
1625 self.emit_event(EngineApiEvent::BackfillAction(action));
1626 }
1627 TreeEvent::Download(action) => {
1628 self.emit_event(EngineApiEvent::Download(action));
1629 }
1630 }
1631
1632 Ok(())
1633 }
1634
1635 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1637 let event = event.into();
1638
1639 if event.is_backfill_action() {
1640 debug_assert_eq!(
1641 self.backfill_sync_state,
1642 BackfillSyncState::Idle,
1643 "backfill action should only be emitted when backfill is idle"
1644 );
1645
1646 if self.persistence_state.in_progress() {
1647 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1650 return
1651 }
1652
1653 self.backfill_sync_state = BackfillSyncState::Pending;
1654 self.metrics.engine.pipeline_runs.increment(1);
1655 debug!(target: "engine::tree", "emitting backfill action event");
1656 }
1657
1658 let _ = self.outgoing.send(event).inspect_err(
1659 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1660 );
1661 }
1662
1663 pub const fn should_persist(&self) -> bool {
1667 if !self.backfill_sync_state.is_idle() {
1668 return false
1670 }
1671
1672 let min_block = self.persistence_state.last_persisted_block.number;
1673 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1674 self.config.persistence_threshold()
1675 }
1676
1677 fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
1681 let mut blocks_to_persist = Vec::new();
1682 let mut current_hash = self.state.tree_state.canonical_block_hash();
1683 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1684
1685 let canonical_head_number = self.state.tree_state.canonical_block_number();
1686
1687 let target_number =
1688 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1689
1690 debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1691 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1692 if block.recovered_block().number() <= last_persisted_number {
1693 break;
1694 }
1695
1696 if block.recovered_block().number() <= target_number {
1697 blocks_to_persist.push(block.clone());
1698 }
1699
1700 current_hash = block.recovered_block().parent_hash();
1701 }
1702
1703 blocks_to_persist.reverse();
1705
1706 blocks_to_persist
1707 }
1708
1709 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1717 if let Some(remove_above) = self.find_disk_reorg()? {
1720 self.remove_blocks(remove_above);
1721 return Ok(())
1722 }
1723
1724 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1725 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1726 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1727 number: self.persistence_state.last_persisted_block.number,
1728 hash: self.persistence_state.last_persisted_block.hash,
1729 });
1730 Ok(())
1731 }
1732
1733 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1741 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1742 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash).cloned() {
1744 return Ok(Some(block.block))
1745 }
1746
1747 let (block, senders) = self
1748 .provider
1749 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1750 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1751 .split_sealed();
1752 let execution_output = self
1753 .provider
1754 .get_state(block.header().number())?
1755 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1756 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1757
1758 Ok(Some(ExecutedBlock {
1759 recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1760 execution_output: Arc::new(execution_output),
1761 hashed_state: Arc::new(hashed_state),
1762 }))
1763 }
1764
1765 fn sealed_header_by_hash(
1767 &self,
1768 hash: B256,
1769 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1770 let block = self
1772 .state
1773 .tree_state
1774 .block_by_hash(hash)
1775 .map(|block| block.as_ref().clone_sealed_header());
1776
1777 if block.is_some() {
1778 Ok(block)
1779 } else {
1780 self.provider.sealed_header_by_hash(hash)
1781 }
1782 }
1783
1784 fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1786 let mut block = self.provider.block_by_hash(hash)?;
1788 if block.is_none() {
1789 block = self
1792 .state
1793 .tree_state
1794 .block_by_hash(hash)
1795 .map(|block| block.as_ref().clone().into_block());
1797 }
1798 Ok(block)
1799 }
1800
1801 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1808 self.state
1809 .buffer
1810 .lowest_ancestor(&hash)
1811 .map(|block| block.parent_hash())
1812 .unwrap_or_else(|| hash)
1813 }
1814
1815 fn latest_valid_hash_for_invalid_payload(
1826 &mut self,
1827 parent_hash: B256,
1828 ) -> ProviderResult<Option<B256>> {
1829 if self.block_by_hash(parent_hash)?.is_some() {
1831 return Ok(Some(parent_hash))
1832 }
1833
1834 let mut current_hash = parent_hash;
1837 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1838 while let Some(block_with_parent) = current_block {
1839 current_hash = block_with_parent.parent;
1840 current_block = self.state.invalid_headers.get(¤t_hash);
1841
1842 if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1845 return Ok(Some(current_hash))
1846 }
1847 }
1848 Ok(None)
1849 }
1850
1851 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1855 if let Some(parent) = self.block_by_hash(parent_hash)? {
1858 if !parent.header().difficulty().is_zero() {
1859 parent_hash = B256::ZERO;
1860 }
1861 }
1862
1863 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1864 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1865 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1866 })
1867 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1868 }
1869
1870 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1874 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1875 return target.head_block_hash == block_hash
1876 }
1877 false
1878 }
1879
1880 fn check_invalid_ancestor_with_head(
1886 &mut self,
1887 check: B256,
1888 head: &SealedBlock<N::Block>,
1889 ) -> ProviderResult<Option<PayloadStatus>> {
1890 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1892
1893 let status = self.prepare_invalid_response(header.parent)?;
1895
1896 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), header);
1898 self.emit_event(BeaconConsensusEngineEvent::InvalidBlock(Box::new(head.clone())));
1899
1900 Ok(Some(status))
1901 }
1902
1903 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1906 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1908 Ok(Some(self.prepare_invalid_response(header.parent)?))
1910 }
1911
1912 fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1915 if let Err(e) =
1916 self.consensus.validate_header_with_total_difficulty(block.header(), U256::MAX)
1917 {
1918 error!(
1919 target: "engine::tree",
1920 ?block,
1921 "Failed to validate total difficulty for block {}: {e}",
1922 block.hash()
1923 );
1924 return Err(e)
1925 }
1926
1927 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1928 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1929 return Err(e)
1930 }
1931
1932 if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1933 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1934 return Err(e)
1935 }
1936
1937 Ok(())
1938 }
1939
1940 #[instrument(level = "trace", skip(self), target = "engine::tree")]
1942 fn try_connect_buffered_blocks(
1943 &mut self,
1944 parent: BlockNumHash,
1945 ) -> Result<(), InsertBlockFatalError> {
1946 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1947
1948 if blocks.is_empty() {
1949 return Ok(())
1951 }
1952
1953 let now = Instant::now();
1954 let block_count = blocks.len();
1955 for child in blocks {
1956 let child_num_hash = child.num_hash();
1957 match self.insert_block(child) {
1958 Ok(res) => {
1959 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1960 if self.is_sync_target_head(child_num_hash.hash) &&
1961 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1962 {
1963 self.make_canonical(child_num_hash.hash)?;
1964 }
1965 }
1966 Err(err) => {
1967 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1968 if let Err(fatal) = self.on_insert_block_error(err) {
1969 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1970 return Err(fatal)
1971 }
1972 }
1973 }
1974 }
1975
1976 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1977 Ok(())
1978 }
1979
1980 fn buffer_block(
1982 &mut self,
1983 block: RecoveredBlock<N::Block>,
1984 ) -> Result<(), InsertBlockError<N::Block>> {
1985 if let Err(err) = self.validate_block(&block) {
1986 return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1987 }
1988 self.state.buffer.insert_block(block);
1989 Ok(())
1990 }
1991
1992 #[inline]
1997 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1998 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1999 }
2000
2001 #[inline]
2004 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2005 if block > local_tip {
2006 Some(block - local_tip)
2007 } else {
2008 None
2009 }
2010 }
2011
2012 fn backfill_sync_target(
2019 &self,
2020 canonical_tip_num: u64,
2021 target_block_number: u64,
2022 downloaded_block: Option<BlockNumHash>,
2023 ) -> Option<B256> {
2024 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2025
2026 let mut exceeds_backfill_threshold =
2028 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
2029
2030 if let Some(buffered_finalized) = sync_target_state
2032 .as_ref()
2033 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2034 {
2035 exceeds_backfill_threshold =
2038 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
2039 }
2040
2041 if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
2044 if downloaded_block.hash == state.finalized_block_hash {
2045 exceeds_backfill_threshold =
2047 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
2048 }
2049 }
2050
2051 if exceeds_backfill_threshold {
2053 if let Some(state) = sync_target_state {
2054 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2056 Err(err) => {
2057 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2058 }
2059 Ok(None) => {
2060 if !state.finalized_block_hash.is_zero() {
2062 return Some(state.finalized_block_hash)
2065 }
2066
2067 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2080 return Some(state.head_block_hash)
2081 }
2082 Ok(Some(_)) => {
2083 }
2085 }
2086 }
2087 }
2088
2089 None
2090 }
2091
2092 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2095 let mut canonical = self.state.tree_state.current_canonical_head;
2096 let mut persisted = self.persistence_state.last_persisted_block;
2097
2098 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2099 Ok(self
2100 .sealed_header_by_hash(num_hash.hash)?
2101 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2102 .parent_num_hash())
2103 };
2104
2105 while canonical.number > persisted.number {
2108 canonical = parent_num_hash(canonical)?;
2109 }
2110
2111 if canonical == persisted {
2113 return Ok(None);
2114 }
2115
2116 while persisted.number > canonical.number {
2122 persisted = parent_num_hash(persisted)?;
2123 }
2124
2125 debug_assert_eq!(persisted.number, canonical.number);
2126
2127 while persisted.hash != canonical.hash {
2129 canonical = parent_num_hash(canonical)?;
2130 persisted = parent_num_hash(persisted)?;
2131 }
2132
2133 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2134
2135 Ok(Some(persisted.number))
2136 }
2137
2138 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2142 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2143 let start = Instant::now();
2144
2145 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2147
2148 let tip = chain_update.tip().clone_sealed_header();
2149 let notification = chain_update.to_chain_notification();
2150
2151 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2153 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2154 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2155 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2156
2157 self.update_reorg_metrics(old.len());
2158 self.reinsert_reorged_blocks(new.clone());
2159 let old = old
2162 .iter()
2163 .filter_map(|block| {
2164 let (_, trie) = self
2165 .state
2166 .tree_state
2167 .persisted_trie_updates
2168 .get(&block.recovered_block.hash())
2169 .cloned()?;
2170 Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
2171 })
2172 .collect::<Vec<_>>();
2173 self.reinsert_reorged_blocks(old);
2174 }
2175
2176 self.canonical_in_memory_state.update_chain(chain_update);
2178 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2179
2180 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2182
2183 self.canonical_in_memory_state.notify_canon_state(notification);
2185
2186 self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
2188 Box::new(tip),
2189 start.elapsed(),
2190 ));
2191 }
2192
2193 fn update_reorg_metrics(&self, old_chain_length: usize) {
2195 self.metrics.tree.reorgs.increment(1);
2196 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2197 }
2198
2199 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2201 for block in new_chain {
2202 if self
2203 .state
2204 .tree_state
2205 .executed_block_by_hash(block.recovered_block().hash())
2206 .is_none()
2207 {
2208 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2209 self.state.tree_state.insert_executed(block);
2210 }
2211 }
2212 }
2213
2214 fn on_invalid_block(
2216 &mut self,
2217 parent_header: &SealedHeader<N::BlockHeader>,
2218 block: &RecoveredBlock<N::Block>,
2219 output: &BlockExecutionOutput<N::Receipt>,
2220 trie_updates: Option<(&TrieUpdates, B256)>,
2221 ) {
2222 if self.state.invalid_headers.get(&block.hash()).is_some() {
2223 return;
2225 }
2226 self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
2227 }
2228
2229 fn on_disconnected_downloaded_block(
2234 &self,
2235 downloaded_block: BlockNumHash,
2236 missing_parent: BlockNumHash,
2237 head: BlockNumHash,
2238 ) -> Option<TreeEvent> {
2239 if let Some(target) =
2241 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2242 {
2243 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2244 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2245 }
2246
2247 let request = if let Some(distance) =
2257 self.distance_from_local_tip(head.number, missing_parent.number)
2258 {
2259 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2260 DownloadRequest::BlockRange(missing_parent.hash, distance)
2261 } else {
2262 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2263 DownloadRequest::single_block(missing_parent.hash)
2266 };
2267
2268 Some(TreeEvent::Download(request))
2269 }
2270
2271 #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2277 fn on_downloaded_block(
2278 &mut self,
2279 block: RecoveredBlock<N::Block>,
2280 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2281 let block_num_hash = block.num_hash();
2282 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2283 if self
2284 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2285 .is_some()
2286 {
2287 return Ok(None)
2288 }
2289
2290 if !self.backfill_sync_state.is_idle() {
2291 return Ok(None)
2292 }
2293
2294 match self.insert_block(block) {
2296 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2297 if self.is_sync_target_head(block_num_hash.hash) {
2298 trace!(target: "engine::tree", "appended downloaded sync target block");
2299
2300 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2303 sync_target_head: block_num_hash.hash,
2304 })))
2305 }
2306 trace!(target: "engine::tree", "appended downloaded block");
2307 self.try_connect_buffered_blocks(block_num_hash)?;
2308 }
2309 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2310 return Ok(self.on_disconnected_downloaded_block(
2313 block_num_hash,
2314 missing_ancestor,
2315 head,
2316 ))
2317 }
2318 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2319 trace!(target: "engine::tree", "downloaded block already executed");
2320 }
2321 Err(err) => {
2322 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2323 if let Err(fatal) = self.on_insert_block_error(err) {
2324 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2325 return Err(fatal)
2326 }
2327 }
2328 }
2329 Ok(None)
2330 }
2331
2332 fn insert_block(
2333 &mut self,
2334 block: RecoveredBlock<N::Block>,
2335 ) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
2336 self.insert_block_inner(block.clone())
2337 .map_err(|kind| InsertBlockError::new(block.into_sealed_block(), kind))
2338 }
2339
2340 fn insert_block_inner(
2341 &mut self,
2342 block: RecoveredBlock<N::Block>,
2343 ) -> Result<InsertPayloadOk, InsertBlockErrorKind> {
2344 let block_num_hash = block.num_hash();
2345 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2346
2347 if self.block_by_hash(block.hash())?.is_some() {
2348 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2349 }
2350
2351 let start = Instant::now();
2352
2353 trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
2354
2355 self.validate_block(&block)?;
2357
2358 trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
2359 let Some(provider_builder) = self.state_provider_builder(block.parent_hash())? else {
2360 let missing_ancestor = self
2363 .state
2364 .buffer
2365 .lowest_ancestor(&block.parent_hash())
2366 .map(|block| block.parent_num_hash())
2367 .unwrap_or_else(|| block.parent_num_hash());
2368
2369 self.state.buffer.insert_block(block);
2370
2371 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2372 head: self.state.tree_state.current_canonical_head,
2373 missing_ancestor,
2374 }))
2375 };
2376
2377 let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
2379 InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
2380 block.parent_hash().into(),
2381 ))
2382 })?;
2383 if let Err(e) =
2384 self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
2385 {
2386 warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
2387 return Err(e.into())
2388 }
2389
2390 let state_provider = provider_builder.build()?;
2391
2392 let persisting_kind = self.persisting_kind_for(block.header());
2402 let run_parallel_state_root = persisting_kind.can_run_parallel_state_root();
2403
2404 let header = block.clone_sealed_header();
2406 let txs = block.clone_transactions_recovered().collect();
2407 let mut handle = if run_parallel_state_root && self.config.use_state_root_task() {
2408 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2410
2411 let trie_input_start = Instant::now();
2413 let trie_input = self
2414 .compute_trie_input(
2415 persisting_kind,
2416 consistent_view.clone(),
2417 block.header().parent_hash(),
2418 )
2419 .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
2420
2421 self.metrics
2422 .block_validation
2423 .trie_input_duration
2424 .record(trie_input_start.elapsed().as_secs_f64());
2425
2426 self.payload_processor.spawn(
2427 header,
2428 txs,
2429 provider_builder,
2430 consistent_view,
2431 trie_input,
2432 &self.config,
2433 )
2434 } else {
2435 self.payload_processor.spawn_cache_exclusive(header, txs, provider_builder)
2436 };
2437
2438 let state_provider = CachedStateProvider::new_with_caches(
2441 state_provider,
2442 handle.caches(),
2443 handle.cache_metrics(),
2444 );
2445
2446 debug!(target: "engine::tree", block=?block_num_hash, "Executing block");
2447
2448 let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
2449 let execution_start = Instant::now();
2450 let output = self.metrics.executor.execute_metered(
2451 executor,
2452 &block,
2453 Box::new(handle.state_hook()),
2454 )?;
2455 let execution_finish = Instant::now();
2456 let execution_time = execution_finish.duration_since(execution_start);
2457 debug!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
2458
2459 handle.stop_prewarming_execution();
2461
2462 if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
2463 self.on_invalid_block(&parent_block, &block, &output, None);
2465 return Err(err.into())
2466 }
2467
2468 let hashed_state = self.provider.hashed_post_state(&output.state);
2469
2470 if let Err(err) = self
2471 .payload_validator
2472 .validate_block_post_execution_with_hashed_state(&hashed_state, &block)
2473 {
2474 self.on_invalid_block(&parent_block, &block, &output, None);
2476 return Err(err.into())
2477 }
2478
2479 debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
2480
2481 let root_time = Instant::now();
2482
2483 let mut maybe_state_root = None;
2484
2485 if run_parallel_state_root {
2486 if self.config.use_state_root_task() {
2489 match handle.state_root() {
2490 Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
2491 let elapsed = execution_finish.elapsed();
2492 info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
2493 if state_root == block.header().state_root() {
2495 maybe_state_root = Some((state_root, trie_updates, elapsed))
2496 } else {
2497 warn!(
2498 target: "engine::tree",
2499 ?state_root,
2500 block_state_root = ?block.header().state_root(),
2501 "State root task returned incorrect state root"
2502 );
2503 }
2504 }
2505 Err(error) => {
2506 debug!(target: "engine::tree", %error, "Background parallel state root computation failed");
2507 }
2508 }
2509 } else {
2510 match self.compute_state_root_parallel(
2511 persisting_kind,
2512 block.header().parent_hash(),
2513 &hashed_state,
2514 ) {
2515 Ok(result) => {
2516 info!(
2517 target: "engine::tree",
2518 block = ?block_num_hash,
2519 regular_state_root = ?result.0,
2520 "Regular root task finished"
2521 );
2522 maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
2523 }
2524 Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2525 debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
2526 }
2527 Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))),
2528 }
2529 }
2530 }
2531
2532 let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
2533 maybe_state_root
2534 {
2535 maybe_state_root
2536 } else {
2537 warn!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
2539 self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
2540 let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?;
2541 (root, updates, root_time.elapsed())
2542 };
2543
2544 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2545 debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
2546
2547 if state_root != block.header().state_root() {
2549 self.on_invalid_block(&parent_block, &block, &output, Some((&trie_output, state_root)));
2551 return Err(ConsensusError::BodyStateRootDiff(
2552 GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2553 )
2554 .into())
2555 }
2556
2557 handle.terminate_caching(Some(output.state.clone()));
2559
2560 let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
2561 block: ExecutedBlock {
2562 recovered_block: Arc::new(block),
2563 execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
2564 hashed_state: Arc::new(hashed_state),
2565 },
2566 trie: Arc::new(trie_output),
2567 };
2568
2569 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2571 {
2572 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2573 self.canonical_in_memory_state.set_pending_block(executed.clone());
2574 }
2575
2576 self.state.tree_state.insert_executed(executed.clone());
2577 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2578
2579 let elapsed = start.elapsed();
2581 let engine_event = if self.is_fork(block_num_hash.hash)? {
2582 BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2583 } else {
2584 BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2585 };
2586 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2587
2588 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2589 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2590 }
2591
2592 fn compute_state_root_parallel(
2601 &self,
2602 persisting_kind: PersistingKind,
2603 parent_hash: B256,
2604 hashed_state: &HashedPostState,
2605 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2606 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2607
2608 let mut input =
2609 self.compute_trie_input(persisting_kind, consistent_view.clone(), parent_hash)?;
2610 input.append_ref(hashed_state);
2612
2613 ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2614 }
2615
2616 fn compute_trie_input(
2632 &self,
2633 persisting_kind: PersistingKind,
2634 consistent_view: ConsistentDbView<P>,
2635 parent_hash: B256,
2636 ) -> Result<TrieInput, ParallelStateRootError> {
2637 let mut input = TrieInput::default();
2638
2639 let provider = consistent_view.provider_ro()?;
2640 let best_block_number = provider.best_block_number()?;
2641
2642 let (mut historical, mut blocks) = self
2643 .state
2644 .tree_state
2645 .blocks_by_hash(parent_hash)
2646 .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2647
2648 if persisting_kind.is_descendant() {
2651 while let Some(block) = blocks.last() {
2653 let recovered_block = block.recovered_block();
2654 if recovered_block.number() <= best_block_number {
2655 blocks.pop();
2658 } else {
2659 break
2662 }
2663 }
2664
2665 historical = if let Some(block) = blocks.last() {
2666 (block.recovered_block().number() - 1).into()
2669 } else {
2670 parent_hash.into()
2672 };
2673 }
2674
2675 if blocks.is_empty() {
2676 debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2677 } else {
2678 debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2679 }
2680
2681 let block_number = provider
2683 .convert_hash_or_number(historical)?
2684 .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2685
2686 let revert_state = if block_number == best_block_number {
2688 debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2691 HashedPostState::default()
2692 } else {
2693 let revert_state = HashedPostState::from_reverts::<
2694 <P::StateCommitment as StateCommitment>::KeyHasher,
2695 >(provider.tx_ref(), block_number + 1)
2696 .map_err(ProviderError::from)?;
2697 debug!(
2698 target: "engine::tree",
2699 block_number,
2700 best_block_number,
2701 accounts = revert_state.accounts.len(),
2702 storages = revert_state.storages.len(),
2703 "Non-empty revert state"
2704 );
2705 revert_state
2706 };
2707 input.append(revert_state);
2708
2709 for block in blocks.iter().rev() {
2711 input.append_cached_ref(block.trie_updates(), block.hashed_state())
2712 }
2713
2714 Ok(input)
2715 }
2716
2717 fn on_insert_block_error(
2723 &mut self,
2724 error: InsertBlockError<N::Block>,
2725 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2726 let (block, error) = error.split();
2727
2728 let validation_err = error.ensure_validation_error()?;
2731
2732 warn!(
2736 target: "engine::tree",
2737 invalid_hash=%block.hash(),
2738 invalid_number=block.number(),
2739 %validation_err,
2740 "Invalid block error on new payload",
2741 );
2742 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2743
2744 self.state.invalid_headers.insert(block.block_with_parent());
2746 self.emit_event(EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
2747 Box::new(block),
2748 )));
2749 Ok(PayloadStatus::new(
2750 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2751 latest_valid_hash,
2752 ))
2753 }
2754
2755 pub fn find_canonical_header(
2757 &self,
2758 hash: B256,
2759 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2760 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2761
2762 if canonical.is_none() {
2763 canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2764 }
2765
2766 Ok(canonical)
2767 }
2768
2769 fn update_finalized_block(
2771 &self,
2772 finalized_block_hash: B256,
2773 ) -> Result<(), OnForkChoiceUpdated> {
2774 if finalized_block_hash.is_zero() {
2775 return Ok(())
2776 }
2777
2778 match self.find_canonical_header(finalized_block_hash) {
2779 Ok(None) => {
2780 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2781 return Err(OnForkChoiceUpdated::invalid_state())
2783 }
2784 Ok(Some(finalized)) => {
2785 if Some(finalized.num_hash()) !=
2786 self.canonical_in_memory_state.get_finalized_num_hash()
2787 {
2788 let _ = self.persistence.save_finalized_block_number(finalized.number());
2791 self.canonical_in_memory_state.set_finalized(finalized);
2792 }
2793 }
2794 Err(err) => {
2795 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2796 }
2797 }
2798
2799 Ok(())
2800 }
2801
2802 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2804 if safe_block_hash.is_zero() {
2805 return Ok(())
2806 }
2807
2808 match self.find_canonical_header(safe_block_hash) {
2809 Ok(None) => {
2810 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2811 return Err(OnForkChoiceUpdated::invalid_state())
2813 }
2814 Ok(Some(safe)) => {
2815 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2816 let _ = self.persistence.save_safe_block_number(safe.number());
2819 self.canonical_in_memory_state.set_safe(safe);
2820 }
2821 }
2822 Err(err) => {
2823 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2824 }
2825 }
2826
2827 Ok(())
2828 }
2829
2830 fn ensure_consistent_forkchoice_state(
2839 &self,
2840 state: ForkchoiceState,
2841 ) -> Result<(), OnForkChoiceUpdated> {
2842 self.update_finalized_block(state.finalized_block_hash)?;
2848
2849 self.update_safe_block(state.safe_block_hash)
2855 }
2856
2857 fn pre_validate_forkchoice_update(
2862 &mut self,
2863 state: ForkchoiceState,
2864 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2865 if state.head_block_hash.is_zero() {
2866 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2867 }
2868
2869 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2872 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2873 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2874 }
2875
2876 if !self.backfill_sync_state.is_idle() {
2877 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2880 return Ok(Some(OnForkChoiceUpdated::syncing()))
2881 }
2882
2883 Ok(None)
2884 }
2885
2886 fn process_payload_attributes(
2891 &self,
2892 attrs: T::PayloadAttributes,
2893 head: &N::BlockHeader,
2894 state: ForkchoiceState,
2895 version: EngineApiMessageVersion,
2896 ) -> OnForkChoiceUpdated {
2897 if let Err(err) =
2898 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2899 {
2900 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2901 return OnForkChoiceUpdated::invalid_payload_attributes()
2902 }
2903
2904 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2909 state.head_block_hash,
2910 attrs,
2911 version as u8,
2912 ) {
2913 Ok(attributes) => {
2914 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2917
2918 OnForkChoiceUpdated::updated_with_pending_payload_id(
2930 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2931 pending_payload_id,
2932 )
2933 }
2934 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2935 }
2936 }
2937
2938 pub(crate) fn remove_before(
2945 &mut self,
2946 upper_bound: BlockNumHash,
2947 finalized_hash: Option<B256>,
2948 ) -> ProviderResult<()> {
2949 let num = if let Some(hash) = finalized_hash {
2952 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2953 } else {
2954 None
2955 };
2956
2957 self.state.tree_state.remove_until(
2958 upper_bound,
2959 self.persistence_state.last_persisted_block.hash,
2960 num,
2961 );
2962 Ok(())
2963 }
2964
2965 pub fn state_provider_builder(
2970 &self,
2971 hash: B256,
2972 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2973 where
2974 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
2975 {
2976 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2977 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2978 return Ok(Some(StateProviderBuilder::new(
2980 self.provider.clone(),
2981 historical,
2982 Some(blocks),
2983 )))
2984 }
2985
2986 if let Some(header) = self.provider.header(&hash)? {
2988 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2989 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2992 }
2993
2994 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2995 Ok(None)
2996 }
2997}
2998
2999#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3005pub enum BlockStatus {
3006 Valid,
3008 Disconnected {
3010 head: BlockNumHash,
3012 missing_ancestor: BlockNumHash,
3014 },
3015}
3016
3017#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3022pub enum InsertPayloadOk {
3023 AlreadySeen(BlockStatus),
3025 Inserted(BlockStatus),
3027}
3028
3029#[derive(Debug, Clone, Copy)]
3031pub enum PersistingKind {
3032 NotPersisting,
3034 PersistingNotDescendant,
3036 PersistingDescendant,
3038}
3039
3040impl PersistingKind {
3041 pub const fn can_run_parallel_state_root(&self) -> bool {
3046 matches!(self, Self::NotPersisting | Self::PersistingDescendant)
3047 }
3048
3049 pub const fn is_descendant(&self) -> bool {
3052 matches!(self, Self::PersistingDescendant)
3053 }
3054}
3055
3056#[cfg(test)]
3057mod tests {
3058 use super::*;
3059 use crate::persistence::PersistenceAction;
3060 use alloy_consensus::Header;
3061 use alloy_primitives::Bytes;
3062 use alloy_rlp::Decodable;
3063 use alloy_rpc_types_engine::{
3064 CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1,
3065 ExecutionPayloadV3,
3066 };
3067 use assert_matches::assert_matches;
3068 use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
3069 use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
3070 use reth_engine_primitives::ForkchoiceStatus;
3071 use reth_ethereum_consensus::EthBeaconConsensus;
3072 use reth_ethereum_engine_primitives::EthEngineTypes;
3073 use reth_ethereum_primitives::{Block, EthPrimitives};
3074 use reth_evm::test_utils::MockExecutorProvider;
3075 use reth_evm_ethereum::EthEvmConfig;
3076 use reth_node_ethereum::EthereumEngineValidator;
3077 use reth_primitives_traits::Block as _;
3078 use reth_provider::test_utils::MockEthProvider;
3079 use reth_trie::{updates::TrieUpdates, HashedPostState};
3080 use std::{
3081 str::FromStr,
3082 sync::mpsc::{channel, Sender},
3083 };
3084
3085 struct TestChannel<T> {
3089 release: Receiver<()>,
3091 tx: Sender<T>,
3093 rx: Receiver<T>,
3095 }
3096
3097 impl<T: Send + 'static> TestChannel<T> {
3098 fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
3100 let (original_tx, original_rx) = channel();
3101 let (wrapped_tx, wrapped_rx) = channel();
3102 let (release_tx, release_rx) = channel();
3103 let handle = TestChannelHandle::new(release_tx);
3104 let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
3105 std::thread::spawn(move || test_channel.intercept_loop());
3107 (original_tx, wrapped_rx, handle)
3108 }
3109
3110 fn intercept_loop(&self) {
3112 while self.release.recv() == Ok(()) {
3113 let Ok(value) = self.rx.recv() else { return };
3114
3115 let _ = self.tx.send(value);
3116 }
3117 }
3118 }
3119
3120 struct TestChannelHandle {
3121 release: Sender<()>,
3123 }
3124
3125 impl TestChannelHandle {
3126 const fn new(release: Sender<()>) -> Self {
3128 Self { release }
3129 }
3130
3131 #[expect(dead_code)]
3133 fn release(&self) {
3134 let _ = self.release.send(());
3135 }
3136 }
3137
3138 struct TestHarness {
3139 tree: EngineApiTreeHandler<
3140 EthPrimitives,
3141 MockEthProvider,
3142 MockExecutorProvider,
3143 EthEngineTypes,
3144 EthereumEngineValidator,
3145 EthEvmConfig,
3146 >,
3147 to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
3148 from_tree_rx: UnboundedReceiver<EngineApiEvent>,
3149 blocks: Vec<ExecutedBlockWithTrieUpdates>,
3150 action_rx: Receiver<PersistenceAction>,
3151 executor_provider: MockExecutorProvider,
3152 block_builder: TestBlockBuilder,
3153 provider: MockEthProvider,
3154 }
3155
3156 impl TestHarness {
3157 fn new(chain_spec: Arc<ChainSpec>) -> Self {
3158 let (action_tx, action_rx) = channel();
3159 Self::with_persistence_channel(chain_spec, action_tx, action_rx)
3160 }
3161
3162 #[expect(dead_code)]
3163 fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
3164 let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
3165 (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
3166 }
3167
3168 fn with_persistence_channel(
3169 chain_spec: Arc<ChainSpec>,
3170 action_tx: Sender<PersistenceAction>,
3171 action_rx: Receiver<PersistenceAction>,
3172 ) -> Self {
3173 let persistence_handle = PersistenceHandle::new(action_tx);
3174
3175 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
3176
3177 let provider = MockEthProvider::default();
3178 let executor_provider = MockExecutorProvider::default();
3179
3180 let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
3181
3182 let (from_tree_tx, from_tree_rx) = unbounded_channel();
3183
3184 let header = chain_spec.genesis_header().clone();
3185 let header = SealedHeader::seal_slow(header);
3186 let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
3187 let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
3188
3189 let (to_payload_service, _payload_command_rx) = unbounded_channel();
3190 let payload_builder = PayloadBuilderHandle::new(to_payload_service);
3191
3192 let evm_config = EthEvmConfig::new(chain_spec.clone());
3193
3194 let tree = EngineApiTreeHandler::new(
3195 provider.clone(),
3196 executor_provider.clone(),
3197 consensus,
3198 payload_validator,
3199 from_tree_tx,
3200 engine_api_tree_state,
3201 canonical_in_memory_state,
3202 persistence_handle,
3203 PersistenceState::default(),
3204 payload_builder,
3205 TreeConfig::default()
3208 .with_legacy_state_root(true)
3209 .with_has_enough_parallelism(true),
3210 EngineApiKind::Ethereum,
3211 evm_config,
3212 );
3213
3214 let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3215 Self {
3216 to_tree_tx: tree.incoming_tx.clone(),
3217 tree,
3218 from_tree_rx,
3219 blocks: vec![],
3220 action_rx,
3221 executor_provider,
3222 block_builder,
3223 provider,
3224 }
3225 }
3226
3227 fn with_blocks(mut self, blocks: Vec<ExecutedBlockWithTrieUpdates>) -> Self {
3228 let mut blocks_by_hash = HashMap::default();
3229 let mut blocks_by_number = BTreeMap::new();
3230 let mut state_by_hash = HashMap::default();
3231 let mut hash_by_number = BTreeMap::new();
3232 let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
3233 let mut parent_hash = B256::ZERO;
3234
3235 for block in &blocks {
3236 let sealed_block = block.recovered_block();
3237 let hash = sealed_block.hash();
3238 let number = sealed_block.number;
3239 blocks_by_hash.insert(hash, block.clone());
3240 blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
3241 state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
3242 hash_by_number.insert(number, hash);
3243 parent_to_child.entry(parent_hash).or_default().insert(hash);
3244 parent_hash = hash;
3245 }
3246
3247 self.tree.state.tree_state = TreeState {
3248 blocks_by_hash,
3249 blocks_by_number,
3250 current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
3251 parent_to_child,
3252 persisted_trie_updates: HashMap::default(),
3253 };
3254
3255 let last_executed_block = blocks.last().unwrap().clone();
3256 let pending = Some(BlockState::new(last_executed_block));
3257 self.tree.canonical_in_memory_state =
3258 CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
3259
3260 self.blocks = blocks.clone();
3261
3262 let recovered_blocks =
3263 blocks.iter().map(|b| b.recovered_block().clone()).collect::<Vec<_>>();
3264
3265 self.persist_blocks(recovered_blocks);
3266
3267 self
3268 }
3269
3270 const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
3271 self.tree.backfill_sync_state = state;
3272 self
3273 }
3274
3275 fn extend_execution_outcome(
3276 &self,
3277 execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
3278 ) {
3279 self.executor_provider.extend(execution_outcomes);
3280 }
3281
3282 fn insert_block(
3283 &mut self,
3284 block: RecoveredBlock<reth_ethereum_primitives::Block>,
3285 ) -> Result<InsertPayloadOk, InsertBlockError<Block>> {
3286 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3287 self.extend_execution_outcome([execution_outcome]);
3288 self.tree.provider.add_state_root(block.state_root);
3289 self.tree.insert_block(block)
3290 }
3291
3292 async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3293 let fcu_status = fcu_status.into();
3294
3295 self.send_fcu(block_hash, fcu_status).await;
3296
3297 self.check_fcu(block_hash, fcu_status).await;
3298 }
3299
3300 async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3301 let fcu_state = self.fcu_state(block_hash);
3302
3303 let (tx, rx) = oneshot::channel();
3304 self.tree
3305 .on_engine_message(FromEngine::Request(
3306 BeaconEngineMessage::ForkchoiceUpdated {
3307 state: fcu_state,
3308 payload_attrs: None,
3309 tx,
3310 version: EngineApiMessageVersion::default(),
3311 }
3312 .into(),
3313 ))
3314 .unwrap();
3315
3316 let response = rx.await.unwrap().unwrap().await.unwrap();
3317 match fcu_status.into() {
3318 ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
3319 ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
3320 ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
3321 }
3322 }
3323
3324 async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3325 let fcu_state = self.fcu_state(block_hash);
3326
3327 let event = self.from_tree_rx.recv().await.unwrap();
3329 match event {
3330 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
3331 state,
3332 status,
3333 )) => {
3334 assert_eq!(state, fcu_state);
3335 assert_eq!(status, fcu_status.into());
3336 }
3337 _ => panic!("Unexpected event: {:#?}", event),
3338 }
3339 }
3340
3341 const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
3342 ForkchoiceState {
3343 head_block_hash: block_hash,
3344 safe_block_hash: block_hash,
3345 finalized_block_hash: block_hash,
3346 }
3347 }
3348
3349 async fn send_new_payload(
3350 &mut self,
3351 block: RecoveredBlock<reth_ethereum_primitives::Block>,
3352 ) {
3353 let payload = ExecutionPayloadV3::from_block_unchecked(
3354 block.hash(),
3355 &block.clone_sealed_block().into_block(),
3356 );
3357 self.tree
3358 .on_new_payload(ExecutionData {
3359 payload: payload.into(),
3360 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
3361 parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
3362 versioned_hashes: vec![],
3363 }),
3364 })
3365 .unwrap();
3366 }
3367
3368 async fn insert_chain(
3369 &mut self,
3370 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3371 ) {
3372 for block in chain.clone() {
3373 self.insert_block(block.clone()).unwrap();
3374 }
3375 self.check_canon_chain_insertion(chain).await;
3376 }
3377
3378 async fn check_canon_commit(&mut self, hash: B256) {
3379 let event = self.from_tree_rx.recv().await.unwrap();
3380 match event {
3381 EngineApiEvent::BeaconConsensus(
3382 BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3383 ) => {
3384 assert_eq!(header.hash(), hash);
3385 }
3386 _ => panic!("Unexpected event: {:#?}", event),
3387 }
3388 }
3389
3390 async fn check_fork_chain_insertion(
3391 &mut self,
3392 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3393 ) {
3394 for block in chain {
3395 self.check_fork_block_added(block.hash()).await;
3396 }
3397 }
3398
3399 async fn check_canon_chain_insertion(
3400 &mut self,
3401 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3402 ) {
3403 for block in chain.clone() {
3404 self.check_canon_block_added(block.hash()).await;
3405 }
3406 }
3407
3408 async fn check_canon_block_added(&mut self, expected_hash: B256) {
3409 let event = self.from_tree_rx.recv().await.unwrap();
3410 match event {
3411 EngineApiEvent::BeaconConsensus(
3412 BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
3413 ) => {
3414 assert_eq!(executed.recovered_block.hash(), expected_hash);
3415 }
3416 _ => panic!("Unexpected event: {:#?}", event),
3417 }
3418 }
3419
3420 async fn check_fork_block_added(&mut self, expected_hash: B256) {
3421 let event = self.from_tree_rx.recv().await.unwrap();
3422 match event {
3423 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3424 executed,
3425 _,
3426 )) => {
3427 assert_eq!(executed.recovered_block.hash(), expected_hash);
3428 }
3429 _ => panic!("Unexpected event: {:#?}", event),
3430 }
3431 }
3432
3433 async fn check_invalid_block(&mut self, expected_hash: B256) {
3434 let event = self.from_tree_rx.recv().await.unwrap();
3435 match event {
3436 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
3437 block,
3438 )) => {
3439 assert_eq!(block.hash(), expected_hash);
3440 }
3441 _ => panic!("Unexpected event: {:#?}", event),
3442 }
3443 }
3444
3445 fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
3446 let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3447 let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3448
3449 for block in &blocks {
3450 block_data.push((block.hash(), block.clone_block()));
3451 headers_data.push((block.hash(), block.header().clone()));
3452 }
3453
3454 self.provider.extend_blocks(block_data);
3455 self.provider.extend_headers(headers_data);
3456 }
3457
3458 fn setup_range_insertion_for_valid_chain(
3459 &mut self,
3460 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3461 ) {
3462 self.setup_range_insertion_for_chain(chain, None)
3463 }
3464
3465 fn setup_range_insertion_for_invalid_chain(
3466 &mut self,
3467 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3468 index: usize,
3469 ) {
3470 self.setup_range_insertion_for_chain(chain, Some(index))
3471 }
3472
3473 fn setup_range_insertion_for_chain(
3474 &mut self,
3475 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3476 invalid_index: Option<usize>,
3477 ) {
3478 let mut chain_rev = chain;
3481 chain_rev.reverse();
3482
3483 let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3484 for (index, block) in chain_rev.iter().enumerate() {
3485 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3486 let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3487 B256::random()
3488 } else {
3489 block.state_root
3490 };
3491 self.tree.provider.add_state_root(state_root);
3492 execution_outcomes.push(execution_outcome);
3493 }
3494 self.extend_execution_outcome(execution_outcomes);
3495 }
3496
3497 fn check_canon_head(&self, head_hash: B256) {
3498 assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3499 }
3500 }
3501
3502 #[test]
3503 fn test_tree_persist_block_batch() {
3504 let tree_config = TreeConfig::default();
3505 let chain_spec = MAINNET.clone();
3506 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3507
3508 let blocks: Vec<_> = test_block_builder
3511 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3512 .collect();
3513 let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3514
3515 let mut blocks = vec![];
3516 for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3517 blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3518 }
3519
3520 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3521
3522 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3524 test_harness.tree.on_engine_message(msg).unwrap();
3525
3526 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3528 match msg {
3529 FromEngine::DownloadedBlocks(blocks) => {
3530 assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3531 }
3532 _ => panic!("unexpected message: {:#?}", msg),
3533 }
3534 }
3535
3536 #[tokio::test]
3537 async fn test_tree_persist_blocks() {
3538 let tree_config = TreeConfig::default();
3539 let chain_spec = MAINNET.clone();
3540 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3541
3542 let blocks: Vec<_> = test_block_builder
3545 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3546 .collect();
3547 let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3548 std::thread::Builder::new()
3549 .name("Tree Task".to_string())
3550 .spawn(|| test_harness.tree.run())
3551 .unwrap();
3552
3553 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3555
3556 let received_action =
3557 test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3558 if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3559 let expected_persist_len =
3562 blocks.len() - tree_config.memory_block_buffer_target() as usize;
3563 assert_eq!(saved_blocks.len(), expected_persist_len);
3564 assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3565 } else {
3566 panic!("unexpected action received {received_action:?}");
3567 }
3568 }
3569
3570 #[tokio::test]
3571 async fn test_in_memory_state_trait_impl() {
3572 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect();
3573 let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3574
3575 for executed_block in blocks {
3576 let sealed_block = executed_block.recovered_block();
3577
3578 let expected_state = BlockState::new(executed_block.clone());
3579
3580 let actual_state_by_hash = test_harness
3581 .tree
3582 .canonical_in_memory_state
3583 .state_by_hash(sealed_block.hash())
3584 .unwrap();
3585 assert_eq!(expected_state, *actual_state_by_hash);
3586
3587 let actual_state_by_number = test_harness
3588 .tree
3589 .canonical_in_memory_state
3590 .state_by_number(sealed_block.number)
3591 .unwrap();
3592 assert_eq!(expected_state, *actual_state_by_number);
3593 }
3594 }
3595
3596 #[tokio::test]
3597 async fn test_engine_request_during_backfill() {
3598 let tree_config = TreeConfig::default();
3599 let blocks: Vec<_> = TestBlockBuilder::eth()
3600 .get_executed_blocks(0..tree_config.persistence_threshold())
3601 .collect();
3602 let mut test_harness = TestHarness::new(MAINNET.clone())
3603 .with_blocks(blocks)
3604 .with_backfill_state(BackfillSyncState::Active);
3605
3606 let (tx, rx) = oneshot::channel();
3607 test_harness
3608 .tree
3609 .on_engine_message(FromEngine::Request(
3610 BeaconEngineMessage::ForkchoiceUpdated {
3611 state: ForkchoiceState {
3612 head_block_hash: B256::random(),
3613 safe_block_hash: B256::random(),
3614 finalized_block_hash: B256::random(),
3615 },
3616 payload_attrs: None,
3617 tx,
3618 version: EngineApiMessageVersion::default(),
3619 }
3620 .into(),
3621 ))
3622 .unwrap();
3623
3624 let resp = rx.await.unwrap().unwrap().await.unwrap();
3625 assert!(resp.payload_status.is_syncing());
3626 }
3627
3628 #[test]
3629 fn test_disconnected_payload() {
3630 let s = include_str!("../../test-data/holesky/2.rlp");
3631 let data = Bytes::from_str(s).unwrap();
3632 let block = Block::decode(&mut data.as_ref()).unwrap();
3633 let sealed = block.seal_slow();
3634 let hash = sealed.hash();
3635 let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block());
3636
3637 let mut test_harness = TestHarness::new(HOLESKY.clone());
3638
3639 let outcome = test_harness
3640 .tree
3641 .on_new_payload(ExecutionData {
3642 payload: payload.into(),
3643 sidecar: ExecutionPayloadSidecar::none(),
3644 })
3645 .unwrap();
3646 assert!(outcome.outcome.is_syncing());
3647
3648 let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3650 assert_eq!(buffered.clone_sealed_block(), sealed);
3651 }
3652
3653 #[test]
3654 fn test_disconnected_block() {
3655 let s = include_str!("../../test-data/holesky/2.rlp");
3656 let data = Bytes::from_str(s).unwrap();
3657 let block = Block::decode(&mut data.as_ref()).unwrap();
3658 let sealed = block.seal_slow().try_recover().unwrap();
3659
3660 let mut test_harness = TestHarness::new(HOLESKY.clone());
3661
3662 let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap();
3663 assert_eq!(
3664 outcome,
3665 InsertPayloadOk::Inserted(BlockStatus::Disconnected {
3666 head: test_harness.tree.state.tree_state.current_canonical_head,
3667 missing_ancestor: sealed.parent_num_hash()
3668 })
3669 );
3670 }
3671
3672 #[tokio::test]
3673 async fn test_holesky_payload() {
3674 let s = include_str!("../../test-data/holesky/1.rlp");
3675 let data = Bytes::from_str(s).unwrap();
3676 let block: Block = Block::decode(&mut data.as_ref()).unwrap();
3677 let sealed = block.seal_slow();
3678 let payload =
3679 ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
3680
3681 let mut test_harness =
3682 TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3683
3684 let (tx, rx) = oneshot::channel();
3685 test_harness
3686 .tree
3687 .on_engine_message(FromEngine::Request(
3688 BeaconEngineMessage::NewPayload {
3689 payload: ExecutionData {
3690 payload: payload.clone().into(),
3691 sidecar: ExecutionPayloadSidecar::none(),
3692 },
3693 tx,
3694 }
3695 .into(),
3696 ))
3697 .unwrap();
3698
3699 let resp = rx.await.unwrap().unwrap();
3700 assert!(resp.is_syncing());
3701 }
3702
3703 #[test]
3704 fn test_tree_state_normal_descendant() {
3705 let mut tree_state = TreeState::new(BlockNumHash::default());
3706 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3707
3708 tree_state.insert_executed(blocks[0].clone());
3709 assert!(tree_state.is_descendant(
3710 blocks[0].recovered_block().num_hash(),
3711 blocks[1].recovered_block().header()
3712 ));
3713
3714 tree_state.insert_executed(blocks[1].clone());
3715
3716 assert!(tree_state.is_descendant(
3717 blocks[0].recovered_block().num_hash(),
3718 blocks[2].recovered_block().header()
3719 ));
3720 assert!(tree_state.is_descendant(
3721 blocks[1].recovered_block().num_hash(),
3722 blocks[2].recovered_block().header()
3723 ));
3724 }
3725
3726 #[tokio::test]
3727 async fn test_tree_state_insert_executed() {
3728 let mut tree_state = TreeState::new(BlockNumHash::default());
3729 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3730
3731 tree_state.insert_executed(blocks[0].clone());
3732 tree_state.insert_executed(blocks[1].clone());
3733
3734 assert_eq!(
3735 tree_state.parent_to_child.get(&blocks[0].recovered_block().hash()),
3736 Some(&HashSet::from_iter([blocks[1].recovered_block().hash()]))
3737 );
3738
3739 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3740
3741 tree_state.insert_executed(blocks[2].clone());
3742
3743 assert_eq!(
3744 tree_state.parent_to_child.get(&blocks[1].recovered_block().hash()),
3745 Some(&HashSet::from_iter([blocks[2].recovered_block().hash()]))
3746 );
3747 assert!(tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3748
3749 assert!(!tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3750 }
3751
3752 #[tokio::test]
3753 async fn test_tree_state_insert_executed_with_reorg() {
3754 let mut tree_state = TreeState::new(BlockNumHash::default());
3755 let mut test_block_builder = TestBlockBuilder::eth();
3756 let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3757
3758 for block in &blocks {
3759 tree_state.insert_executed(block.clone());
3760 }
3761 assert_eq!(tree_state.blocks_by_hash.len(), 5);
3762
3763 let fork_block_3 = test_block_builder
3764 .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3765 let fork_block_4 = test_block_builder
3766 .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3767 let fork_block_5 = test_block_builder
3768 .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3769
3770 tree_state.insert_executed(fork_block_3.clone());
3771 tree_state.insert_executed(fork_block_4.clone());
3772 tree_state.insert_executed(fork_block_5.clone());
3773
3774 assert_eq!(tree_state.blocks_by_hash.len(), 8);
3775 assert_eq!(tree_state.blocks_by_number[&3].len(), 2); assert_eq!(tree_state.parent_to_child[&blocks[1].recovered_block().hash()].len(), 2); tree_state.insert_executed(fork_block_4.clone());
3780 assert_eq!(tree_state.blocks_by_hash.len(), 8);
3781
3782 assert!(tree_state.parent_to_child[&fork_block_3.recovered_block().hash()]
3783 .contains(&fork_block_4.recovered_block().hash()));
3784 assert!(tree_state.parent_to_child[&fork_block_4.recovered_block().hash()]
3785 .contains(&fork_block_5.recovered_block().hash()));
3786
3787 assert_eq!(tree_state.blocks_by_number[&4].len(), 2);
3788 assert_eq!(tree_state.blocks_by_number[&5].len(), 2);
3789 }
3790
3791 #[tokio::test]
3792 async fn test_tree_state_remove_before() {
3793 let start_num_hash = BlockNumHash::default();
3794 let mut tree_state = TreeState::new(start_num_hash);
3795 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3796
3797 for block in &blocks {
3798 tree_state.insert_executed(block.clone());
3799 }
3800
3801 let last = blocks.last().unwrap();
3802
3803 tree_state.set_canonical_head(last.recovered_block().num_hash());
3805
3806 tree_state.remove_until(
3808 BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3809 start_num_hash.hash,
3810 Some(blocks[1].recovered_block().num_hash()),
3811 );
3812
3813 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3814 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3815 assert!(!tree_state.blocks_by_number.contains_key(&1));
3816 assert!(!tree_state.blocks_by_number.contains_key(&2));
3817
3818 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3819 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3820 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3821 assert!(tree_state.blocks_by_number.contains_key(&3));
3822 assert!(tree_state.blocks_by_number.contains_key(&4));
3823 assert!(tree_state.blocks_by_number.contains_key(&5));
3824
3825 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3826 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3827 assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3828 assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3829 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3830
3831 assert_eq!(
3832 tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3833 Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3834 );
3835 assert_eq!(
3836 tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3837 Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3838 );
3839 }
3840
3841 #[tokio::test]
3842 async fn test_tree_state_remove_before_finalized() {
3843 let start_num_hash = BlockNumHash::default();
3844 let mut tree_state = TreeState::new(start_num_hash);
3845 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3846
3847 for block in &blocks {
3848 tree_state.insert_executed(block.clone());
3849 }
3850
3851 let last = blocks.last().unwrap();
3852
3853 tree_state.set_canonical_head(last.recovered_block().num_hash());
3855
3856 tree_state.remove_until(
3858 BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3859 start_num_hash.hash,
3860 None,
3861 );
3862
3863 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3864 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3865 assert!(!tree_state.blocks_by_number.contains_key(&1));
3866 assert!(!tree_state.blocks_by_number.contains_key(&2));
3867
3868 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3869 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3870 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3871 assert!(tree_state.blocks_by_number.contains_key(&3));
3872 assert!(tree_state.blocks_by_number.contains_key(&4));
3873 assert!(tree_state.blocks_by_number.contains_key(&5));
3874
3875 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3876 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3877 assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3878 assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3879 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3880
3881 assert_eq!(
3882 tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3883 Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3884 );
3885 assert_eq!(
3886 tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3887 Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3888 );
3889 }
3890
3891 #[tokio::test]
3892 async fn test_tree_state_remove_before_lower_finalized() {
3893 let start_num_hash = BlockNumHash::default();
3894 let mut tree_state = TreeState::new(start_num_hash);
3895 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3896
3897 for block in &blocks {
3898 tree_state.insert_executed(block.clone());
3899 }
3900
3901 let last = blocks.last().unwrap();
3902
3903 tree_state.set_canonical_head(last.recovered_block().num_hash());
3905
3906 tree_state.remove_until(
3908 BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3909 start_num_hash.hash,
3910 Some(blocks[0].recovered_block().num_hash()),
3911 );
3912
3913 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3914 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3915 assert!(!tree_state.blocks_by_number.contains_key(&1));
3916 assert!(!tree_state.blocks_by_number.contains_key(&2));
3917
3918 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3919 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3920 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3921 assert!(tree_state.blocks_by_number.contains_key(&3));
3922 assert!(tree_state.blocks_by_number.contains_key(&4));
3923 assert!(tree_state.blocks_by_number.contains_key(&5));
3924
3925 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3926 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3927 assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3928 assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3929 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3930
3931 assert_eq!(
3932 tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3933 Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3934 );
3935 assert_eq!(
3936 tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3937 Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3938 );
3939 }
3940
3941 #[tokio::test]
3942 async fn test_tree_state_on_new_head_reorg() {
3943 reth_tracing::init_test_tracing();
3944 let chain_spec = MAINNET.clone();
3945
3946 let mut test_harness = TestHarness::new(chain_spec);
3948 test_harness.tree.config = test_harness
3949 .tree
3950 .config
3951 .with_persistence_threshold(1)
3952 .with_memory_block_buffer_target(1);
3953 let mut test_block_builder = TestBlockBuilder::eth();
3954 let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3955
3956 for block in &blocks {
3957 test_harness.tree.state.tree_state.insert_executed(block.clone());
3958 }
3959
3960 test_harness
3962 .tree
3963 .state
3964 .tree_state
3965 .set_canonical_head(blocks[2].recovered_block().num_hash());
3966
3967 let fork_block_3 = test_block_builder
3969 .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3970 let fork_block_4 = test_block_builder
3971 .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3972 let fork_block_5 = test_block_builder
3973 .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3974
3975 test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3976 test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3977 test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3978
3979 let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap();
3981 assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3982 if let Some(NewCanonicalChain::Commit { new }) = result {
3983 assert_eq!(new.len(), 2);
3984 assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash());
3985 assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash());
3986 }
3987
3988 let current_action = test_harness.tree.persistence_state.current_action();
3990 assert_eq!(current_action, None);
3991
3992 test_harness.tree.advance_persistence().unwrap();
3997 let current_action = test_harness.tree.persistence_state.current_action().cloned();
3998 assert_eq!(
3999 current_action,
4000 Some(CurrentPersistenceAction::SavingBlocks {
4001 highest: blocks[1].recovered_block().num_hash()
4002 })
4003 );
4004
4005 let received_action = test_harness.action_rx.recv().unwrap();
4007 let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
4008 panic!("received wrong action");
4009 };
4010 assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
4011
4012 sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap();
4014
4015 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4017 assert_eq!(
4018 current_action,
4019 Some(CurrentPersistenceAction::SavingBlocks {
4020 highest: blocks[1].recovered_block().num_hash()
4021 })
4022 );
4023
4024 test_harness.tree.advance_persistence().unwrap();
4026 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4027 assert_eq!(current_action, None);
4028
4029 let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap();
4031 assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
4032
4033 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4034 assert_eq!(new.len(), 3);
4035 assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash());
4036 assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash());
4037 assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash());
4038
4039 assert_eq!(old.len(), 1);
4040 assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash());
4041 }
4042
4043 test_harness.tree.advance_persistence().unwrap();
4045 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4046 assert_eq!(current_action, None);
4047
4048 test_harness
4050 .tree
4051 .state
4052 .tree_state
4053 .set_canonical_head(fork_block_5.recovered_block().num_hash());
4054
4055 test_harness.tree.advance_persistence().unwrap();
4058 let current_action = test_harness.tree.persistence_state.current_action().cloned();
4059 assert_eq!(
4060 current_action,
4061 Some(CurrentPersistenceAction::SavingBlocks {
4062 highest: fork_block_4.recovered_block().num_hash()
4063 })
4064 );
4065 }
4066
4067 #[test]
4068 fn test_tree_state_on_new_head_deep_fork() {
4069 reth_tracing::init_test_tracing();
4070
4071 let chain_spec = MAINNET.clone();
4072 let mut test_harness = TestHarness::new(chain_spec);
4073 let mut test_block_builder = TestBlockBuilder::eth();
4074
4075 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4076
4077 for block in &blocks {
4078 test_harness.tree.state.tree_state.insert_executed(block.clone());
4079 }
4080
4081 let last_block = blocks.last().unwrap().recovered_block().clone();
4083
4084 test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
4085
4086 let chain_a = test_block_builder.create_fork(&last_block, 10);
4088 let chain_b = test_block_builder.create_fork(&last_block, 10);
4089
4090 for block in &chain_a {
4091 test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4092 block: ExecutedBlock {
4093 recovered_block: Arc::new(block.clone()),
4094 execution_output: Arc::new(ExecutionOutcome::default()),
4095 hashed_state: Arc::new(HashedPostState::default()),
4096 },
4097 trie: Arc::new(TrieUpdates::default()),
4098 });
4099 }
4100 test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
4101
4102 for block in &chain_b {
4103 test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4104 block: ExecutedBlock {
4105 recovered_block: Arc::new(block.clone()),
4106 execution_output: Arc::new(ExecutionOutcome::default()),
4107 hashed_state: Arc::new(HashedPostState::default()),
4108 },
4109 trie: Arc::new(TrieUpdates::default()),
4110 });
4111 }
4112
4113 let mut expected_new = Vec::new();
4115 for block in &chain_b {
4116 let result = test_harness.tree.on_new_head(block.hash()).unwrap();
4118 assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
4119
4120 expected_new.push(block);
4121 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4122 assert_eq!(new.len(), expected_new.len());
4123 for (index, block) in expected_new.iter().enumerate() {
4124 assert_eq!(new[index].recovered_block().hash(), block.hash());
4125 }
4126
4127 assert_eq!(old.len(), chain_a.len());
4128 for (index, block) in chain_a.iter().enumerate() {
4129 assert_eq!(old[index].recovered_block().hash(), block.hash());
4130 }
4131 }
4132
4133 test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
4135 }
4136 }
4137
4138 #[tokio::test]
4139 async fn test_get_canonical_blocks_to_persist() {
4140 let chain_spec = MAINNET.clone();
4141 let mut test_harness = TestHarness::new(chain_spec);
4142 let mut test_block_builder = TestBlockBuilder::eth();
4143
4144 let canonical_head_number = 9;
4145 let blocks: Vec<_> =
4146 test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
4147 test_harness = test_harness.with_blocks(blocks.clone());
4148
4149 let last_persisted_block_number = 3;
4150 test_harness.tree.persistence_state.last_persisted_block =
4151 blocks[last_persisted_block_number as usize].recovered_block.num_hash();
4152
4153 let persistence_threshold = 4;
4154 let memory_block_buffer_target = 3;
4155 test_harness.tree.config = TreeConfig::default()
4156 .with_persistence_threshold(persistence_threshold)
4157 .with_memory_block_buffer_target(memory_block_buffer_target);
4158
4159 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4160
4161 let expected_blocks_to_persist_length: usize =
4162 (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
4163 .try_into()
4164 .unwrap();
4165
4166 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4167 for (i, item) in
4168 blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
4169 {
4170 assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
4171 }
4172
4173 let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
4175 let fork_block_hash = fork_block.recovered_block().hash();
4176 test_harness.tree.state.tree_state.insert_executed(fork_block);
4177
4178 assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
4179
4180 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4181 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4182
4183 assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
4185
4186 assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
4188 b.recovered_block().hash() == blocks[4].recovered_block().hash()));
4189
4190 test_harness.tree.advance_persistence().expect("advancing persistence should succeed");
4192 assert_eq!(
4193 test_harness.tree.persistence_state.current_action().cloned(),
4194 Some(CurrentPersistenceAction::SavingBlocks {
4195 highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
4196 })
4197 );
4198 }
4199
4200 #[tokio::test]
4201 async fn test_engine_tree_fcu_missing_head() {
4202 let chain_spec = MAINNET.clone();
4203 let mut test_harness = TestHarness::new(chain_spec.clone());
4204
4205 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
4206
4207 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4208 test_harness = test_harness.with_blocks(blocks);
4209
4210 let missing_block = test_block_builder
4211 .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash());
4212
4213 test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
4214
4215 let event = test_harness.from_tree_rx.recv().await.unwrap();
4217 match event {
4218 EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
4219 let expected_block_set = HashSet::from_iter([missing_block.hash()]);
4220 assert_eq!(actual_block_set, expected_block_set);
4221 }
4222 _ => panic!("Unexpected event: {:#?}", event),
4223 }
4224 }
4225
4226 #[tokio::test]
4227 async fn test_engine_tree_fcu_canon_chain_insertion() {
4228 let chain_spec = MAINNET.clone();
4229 let mut test_harness = TestHarness::new(chain_spec.clone());
4230
4231 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4232 test_harness = test_harness.with_blocks(base_chain.clone());
4233
4234 test_harness
4235 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4236 .await;
4237
4238 let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3);
4240
4241 test_harness.insert_chain(main_chain).await;
4242 }
4243
4244 #[tokio::test]
4245 async fn test_engine_tree_fcu_reorg_with_all_blocks() {
4246 let chain_spec = MAINNET.clone();
4247 let mut test_harness = TestHarness::new(chain_spec.clone());
4248
4249 let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
4250 test_harness = test_harness.with_blocks(main_chain.clone());
4251
4252 let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3);
4253 let fork_chain_last_hash = fork_chain.last().unwrap().hash();
4254
4255 for block in &fork_chain {
4257 test_harness.insert_block(block.clone()).unwrap();
4258 }
4259
4260 test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4261
4262 test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
4264
4265 test_harness.check_canon_commit(fork_chain_last_hash).await;
4267
4268 test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4269
4270 test_harness.check_canon_head(fork_chain_last_hash);
4272 }
4273
4274 #[tokio::test]
4275 async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
4276 reth_tracing::init_test_tracing();
4277
4278 let chain_spec = MAINNET.clone();
4279 let mut test_harness = TestHarness::new(chain_spec.clone());
4280
4281 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4282 test_harness = test_harness.with_blocks(base_chain.clone());
4283
4284 test_harness
4285 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4286 .await;
4287
4288 let main_chain = test_harness
4290 .block_builder
4291 .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4292
4293 let main_chain_last_hash = main_chain.last().unwrap().hash();
4294 test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4295
4296 test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4297
4298 let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
4300 let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
4301 block_number: backfill_finished_block_number,
4302 });
4303
4304 let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
4305 test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
4307 test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
4308
4309 let event = test_harness.from_tree_rx.recv().await.unwrap();
4310 match event {
4311 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4312 assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
4313 }
4314 _ => panic!("Unexpected event: {:#?}", event),
4315 }
4316
4317 test_harness
4318 .tree
4319 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
4320 .last()
4321 .unwrap()
4322 .clone()]))
4323 .unwrap();
4324
4325 let event = test_harness.from_tree_rx.recv().await.unwrap();
4326 match event {
4327 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4328 assert_eq!(
4329 total_blocks,
4330 (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
4331 );
4332 assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
4333 }
4334 _ => panic!("Unexpected event: {:#?}", event),
4335 }
4336 }
4337
4338 #[tokio::test]
4339 async fn test_engine_tree_live_sync_transition_eventually_canonical() {
4340 reth_tracing::init_test_tracing();
4341
4342 let chain_spec = MAINNET.clone();
4343 let mut test_harness = TestHarness::new(chain_spec.clone());
4344 test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
4345
4346 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4348 test_harness = test_harness.with_blocks(base_chain.clone());
4349
4350 test_harness
4352 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4353 .await;
4354
4355 let main_chain = test_harness
4358 .block_builder
4359 .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4360
4361 let main_chain_last = main_chain.last().unwrap();
4362 let main_chain_last_hash = main_chain_last.hash();
4363 let main_chain_backfill_target =
4364 main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
4365 let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
4366
4367 test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4369 test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4370
4371 let event = test_harness.from_tree_rx.recv().await.unwrap();
4373 match event {
4374 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4375 assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
4376 }
4377 _ => panic!("Unexpected event: {:#?}", event),
4378 }
4379
4380 test_harness
4382 .tree
4383 .on_engine_message(FromEngine::DownloadedBlocks(vec![
4384 main_chain_backfill_target.clone()
4385 ]))
4386 .unwrap();
4387
4388 let event = test_harness.from_tree_rx.recv().await.unwrap();
4390 match event {
4391 EngineApiEvent::BackfillAction(BackfillAction::Start(
4392 reth_stages::PipelineTarget::Sync(target_hash),
4393 )) => {
4394 assert_eq!(target_hash, main_chain_backfill_target_hash);
4395 }
4396 _ => panic!("Unexpected event: {:#?}", event),
4397 }
4398
4399 let backfilled_chain: Vec<_> =
4401 main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
4402 test_harness.persist_blocks(backfilled_chain.clone());
4403
4404 test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
4405
4406 test_harness
4408 .tree
4409 .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
4410 ControlFlow::Continue { block_number: main_chain_backfill_target.number },
4411 )))
4412 .unwrap();
4413
4414 test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4416
4417 let event = test_harness.from_tree_rx.recv().await.unwrap();
4418 match event {
4419 EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
4420 assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
4421 }
4422 _ => panic!("Unexpected event: {:#?}", event),
4423 }
4424
4425 test_harness
4427 .tree
4428 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
4429 .unwrap();
4430
4431 let event = test_harness.from_tree_rx.recv().await.unwrap();
4433 match event {
4434 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4435 assert_eq!(
4436 total_blocks,
4437 (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
4438 );
4439 assert_eq!(initial_hash, main_chain_last.parent_hash);
4440 }
4441 _ => panic!("Unexpected event: {:#?}", event),
4442 }
4443
4444 let remaining: Vec<_> = main_chain
4445 .clone()
4446 .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
4447 .collect();
4448
4449 test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
4450
4451 test_harness
4453 .tree
4454 .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
4455 .unwrap();
4456
4457 test_harness.check_canon_chain_insertion(remaining).await;
4458
4459 test_harness.check_canon_commit(main_chain_last_hash).await;
4461
4462 test_harness.check_canon_head(main_chain_last_hash);
4464 }
4465
4466 #[tokio::test]
4467 async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
4468 reth_tracing::init_test_tracing();
4469
4470 let chain_spec = MAINNET.clone();
4471 let mut test_harness = TestHarness::new(chain_spec.clone());
4472
4473 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4475 test_harness = test_harness.with_blocks(base_chain.clone());
4476
4477 test_harness
4479 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4480 .await;
4481
4482 let main_chain =
4484 test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10);
4485 let target = main_chain.get(5).unwrap();
4487 let target_hash = target.hash();
4488 let main_last = main_chain.last().unwrap();
4489 let main_last_hash = main_last.hash();
4490
4491 test_harness.insert_chain(main_chain).await;
4493
4494 test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4496
4497 test_harness.check_canon_commit(target_hash).await;
4498 test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4499
4500 test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4502
4503 test_harness.check_canon_commit(main_last_hash).await;
4504 test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4505 test_harness.check_canon_head(main_last_hash);
4506 }
4507
4508 #[tokio::test]
4509 async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4510 reth_tracing::init_test_tracing();
4511
4512 let chain_spec = MAINNET.clone();
4513 let mut test_harness = TestHarness::new(chain_spec.clone());
4514
4515 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4517 test_harness = test_harness.with_blocks(base_chain.clone());
4518
4519 let old_head = base_chain.first().unwrap().recovered_block();
4520
4521 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4523 let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4524
4525 test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4526 test_harness.insert_chain(extension_chain).await;
4527
4528 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4530
4531 let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4533 let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4534
4535 test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4537 for block in &chain_a {
4538 test_harness.send_new_payload(block.clone()).await;
4539 }
4540
4541 test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4542
4543 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4545 for block in &chain_b {
4546 test_harness.send_new_payload(block.clone()).await;
4547 }
4548
4549 test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4550
4551 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4553 test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4554
4555 test_harness.check_canon_commit(chain_b_tip_hash).await;
4557
4558 test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4560
4561 test_harness.check_canon_head(chain_b_tip_hash);
4563
4564 assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4566 }
4567
4568 #[tokio::test]
4569 async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4570 let chain_spec = MAINNET.clone();
4571 let mut test_harness = TestHarness::new(chain_spec.clone());
4572
4573 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4574 test_harness = test_harness.with_blocks(base_chain.clone());
4575
4576 let side_chain =
4579 test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2);
4580
4581 let buffered_block = side_chain.last().unwrap();
4583 let buffered_block_hash = buffered_block.hash();
4584
4585 test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4586 test_harness.send_new_payload(buffered_block.clone()).await;
4587
4588 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4589
4590 let non_buffered_block = side_chain.first().unwrap();
4591 let non_buffered_block_hash = non_buffered_block.hash();
4592
4593 test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4595 test_harness.send_new_payload(non_buffered_block.clone()).await;
4596 assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4597
4598 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4600
4601 test_harness.check_canon_block_added(non_buffered_block_hash).await;
4603 test_harness.check_canon_block_added(buffered_block_hash).await;
4604 }
4605
4606 #[tokio::test]
4607 async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4608 reth_tracing::init_test_tracing();
4609
4610 let chain_spec = MAINNET.clone();
4611 let mut test_harness = TestHarness::new(chain_spec.clone());
4612
4613 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4615 test_harness = test_harness.with_blocks(base_chain.clone());
4616
4617 let old_head = base_chain.first().unwrap().recovered_block();
4618
4619 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4621 let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4622 test_harness.insert_chain(extension_chain).await;
4623
4624 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4626
4627 let total_fork_elements = 10;
4629 let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4630 let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4631
4632 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4634 for block in &chain_b {
4635 test_harness.send_new_payload(block.clone()).await;
4636 test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4637 test_harness.check_canon_block_added(block.hash()).await;
4638 test_harness.check_canon_commit(block.hash()).await;
4639 test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4640 }
4641
4642 let invalid_index = 3;
4644 test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4645 for block in &chain_a {
4646 test_harness.send_new_payload(block.clone()).await;
4647 }
4648
4649 test_harness
4652 .check_fork_chain_insertion(
4653 chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4654 )
4655 .await;
4656 for block in &chain_a[chain_a.len() - invalid_index - 1..] {
4657 test_harness.check_invalid_block(block.hash()).await;
4658 }
4659
4660 let chain_a_tip_hash = chain_a.last().unwrap().hash();
4662 test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4663
4664 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4666
4667 test_harness.check_canon_head(chain_b_tip_hash);
4669
4670 test_harness.check_canon_head(chain_b_tip_hash);
4672 }
4673
4674 #[tokio::test]
4675 async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4676 reth_tracing::init_test_tracing();
4677 let chain_spec = MAINNET.clone();
4678 let mut test_harness = TestHarness::new(chain_spec.clone());
4679
4680 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4681 test_harness = test_harness.with_blocks(base_chain.clone());
4682
4683 let side_chain = test_harness
4685 .block_builder
4686 .create_fork(base_chain.last().unwrap().recovered_block(), 15);
4687 let invalid_index = 9;
4688
4689 test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4690
4691 for (index, block) in side_chain.iter().enumerate() {
4692 test_harness.send_new_payload(block.clone()).await;
4693
4694 if index < side_chain.len() - invalid_index - 1 {
4695 test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4696 }
4697 }
4698
4699 let fork_tip_hash = side_chain.last().unwrap().hash();
4701 test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4702 }
4703}