reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{
7        cached_state::CachedStateProvider, executor::WorkloadExecutor, metrics::EngineApiMetrics,
8    },
9};
10use alloy_consensus::BlockHeader;
11use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash, NumHash};
12use alloy_primitives::{
13    map::{HashMap, HashSet},
14    BlockNumber, B256, U256,
15};
16use alloy_rpc_types_engine::{
17    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
18};
19use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
20use payload_processor::sparse_trie::StateRootComputeOutcome;
21use persistence_state::CurrentPersistenceAction;
22use reth_chain_state::{
23    CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
24    MemoryOverlayStateProvider, NewCanonicalChain,
25};
26use reth_consensus::{Consensus, FullConsensus};
27pub use reth_engine_primitives::InvalidBlockHook;
28use reth_engine_primitives::{
29    BeaconConsensusEngineEvent, BeaconEngineMessage, BeaconOnNewPayloadError, EngineValidator,
30    ExecutionPayload, ForkchoiceStateTracker, OnForkChoiceUpdated,
31};
32use reth_errors::{ConsensusError, ProviderResult};
33use reth_ethereum_primitives::EthPrimitives;
34use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
35use reth_payload_builder::PayloadBuilderHandle;
36use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes, PayloadTypes};
37use reth_primitives_traits::{
38    Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
39};
40use reth_provider::{
41    providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
42    ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
43    StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
44};
45use reth_revm::database::StateProviderDatabase;
46use reth_stages_api::ControlFlow;
47use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
48use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
49use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
50use std::{
51    collections::{btree_map, hash_map, BTreeMap, VecDeque},
52    fmt::Debug,
53    ops::Bound,
54    sync::{
55        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
56        Arc,
57    },
58    time::Instant,
59};
60use tokio::sync::{
61    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
62    oneshot::{self, error::TryRecvError},
63};
64use tracing::*;
65
66mod block_buffer;
67mod cached_state;
68pub mod error;
69mod invalid_block_hook;
70mod invalid_headers;
71mod metrics;
72mod payload_processor;
73mod persistence_state;
74// TODO(alexey): compare trie updates in `insert_block_inner`
75#[expect(unused)]
76mod trie_updates;
77
78use crate::tree::error::AdvancePersistenceError;
79pub use block_buffer::BlockBuffer;
80pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
81pub use invalid_headers::InvalidHeaderCache;
82pub use payload_processor::*;
83pub use persistence_state::PersistenceState;
84pub use reth_engine_primitives::TreeConfig;
85use reth_evm::execute::BlockExecutionOutput;
86
87/// The largest gap for which the tree will be used for sync. See docs for `pipeline_run_threshold`
88/// for more information.
89///
90/// This is the default threshold, the distance to the head that the tree will be used for sync.
91/// If the distance exceeds this threshold, the pipeline will be used for sync.
92pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
93
94/// Keeps track of the state of the tree.
95///
96/// ## Invariants
97///
98/// - This only stores blocks that are connected to the canonical chain.
99/// - All executed blocks are valid and have been executed.
100#[derive(Debug, Default)]
101pub struct TreeState<N: NodePrimitives = EthPrimitives> {
102    /// __All__ unique executed blocks by block hash that are connected to the canonical chain.
103    ///
104    /// This includes blocks of all forks.
105    blocks_by_hash: HashMap<B256, ExecutedBlockWithTrieUpdates<N>>,
106    /// Executed blocks grouped by their respective block number.
107    ///
108    /// This maps unique block number to all known blocks for that height.
109    ///
110    /// Note: there can be multiple blocks at the same height due to forks.
111    blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlockWithTrieUpdates<N>>>,
112    /// Map of any parent block hash to its children.
113    parent_to_child: HashMap<B256, HashSet<B256>>,
114    /// Map of hash to trie updates for canonical blocks that are persisted but not finalized.
115    ///
116    /// Contains the block number for easy removal.
117    persisted_trie_updates: HashMap<B256, (BlockNumber, Arc<TrieUpdates>)>,
118    /// Currently tracked canonical head of the chain.
119    current_canonical_head: BlockNumHash,
120}
121
122impl<N: NodePrimitives> TreeState<N> {
123    /// Returns a new, empty tree state that points to the given canonical head.
124    fn new(current_canonical_head: BlockNumHash) -> Self {
125        Self {
126            blocks_by_hash: HashMap::default(),
127            blocks_by_number: BTreeMap::new(),
128            current_canonical_head,
129            parent_to_child: HashMap::default(),
130            persisted_trie_updates: HashMap::default(),
131        }
132    }
133
134    /// Resets the state and points to the given canonical head.
135    fn reset(&mut self, current_canonical_head: BlockNumHash) {
136        *self = Self::new(current_canonical_head);
137    }
138
139    /// Returns the number of executed blocks stored.
140    fn block_count(&self) -> usize {
141        self.blocks_by_hash.len()
142    }
143
144    /// Returns the [`ExecutedBlockWithTrieUpdates`] by hash.
145    fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlockWithTrieUpdates<N>> {
146        self.blocks_by_hash.get(&hash)
147    }
148
149    /// Returns the block by hash.
150    fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock<N::Block>>> {
151        self.blocks_by_hash.get(&hash).map(|b| Arc::new(b.recovered_block().sealed_block().clone()))
152    }
153
154    /// Returns all available blocks for the given hash that lead back to the canonical chain, from
155    /// newest to oldest. And the parent hash of the oldest block that is missing from the buffer.
156    ///
157    /// Returns `None` if the block for the given hash is not found.
158    fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlockWithTrieUpdates<N>>)> {
159        let block = self.blocks_by_hash.get(&hash).cloned()?;
160        let mut parent_hash = block.recovered_block().parent_hash();
161        let mut blocks = vec![block];
162        while let Some(executed) = self.blocks_by_hash.get(&parent_hash) {
163            parent_hash = executed.recovered_block().parent_hash();
164            blocks.push(executed.clone());
165        }
166
167        Some((parent_hash, blocks))
168    }
169
170    /// Insert executed block into the state.
171    fn insert_executed(&mut self, executed: ExecutedBlockWithTrieUpdates<N>) {
172        let hash = executed.recovered_block().hash();
173        let parent_hash = executed.recovered_block().parent_hash();
174        let block_number = executed.recovered_block().number();
175
176        if self.blocks_by_hash.contains_key(&hash) {
177            return;
178        }
179
180        self.blocks_by_hash.insert(hash, executed.clone());
181
182        self.blocks_by_number.entry(block_number).or_default().push(executed);
183
184        self.parent_to_child.entry(parent_hash).or_default().insert(hash);
185
186        for children in self.parent_to_child.values_mut() {
187            children.retain(|child| self.blocks_by_hash.contains_key(child));
188        }
189    }
190
191    /// Remove single executed block by its hash.
192    ///
193    /// ## Returns
194    ///
195    /// The removed block and the block hashes of its children.
196    fn remove_by_hash(
197        &mut self,
198        hash: B256,
199    ) -> Option<(ExecutedBlockWithTrieUpdates<N>, HashSet<B256>)> {
200        let executed = self.blocks_by_hash.remove(&hash)?;
201
202        // Remove this block from collection of children of its parent block.
203        let parent_entry = self.parent_to_child.entry(executed.recovered_block().parent_hash());
204        if let hash_map::Entry::Occupied(mut entry) = parent_entry {
205            entry.get_mut().remove(&hash);
206
207            if entry.get().is_empty() {
208                entry.remove();
209            }
210        }
211
212        // Remove point to children of this block.
213        let children = self.parent_to_child.remove(&hash).unwrap_or_default();
214
215        // Remove this block from `blocks_by_number`.
216        let block_number_entry = self.blocks_by_number.entry(executed.recovered_block().number());
217        if let btree_map::Entry::Occupied(mut entry) = block_number_entry {
218            // We have to find the index of the block since it exists in a vec
219            if let Some(index) = entry.get().iter().position(|b| b.recovered_block().hash() == hash)
220            {
221                entry.get_mut().swap_remove(index);
222
223                // If there are no blocks left then remove the entry for this block
224                if entry.get().is_empty() {
225                    entry.remove();
226                }
227            }
228        }
229
230        Some((executed, children))
231    }
232
233    /// Returns whether or not the hash is part of the canonical chain.
234    pub(crate) fn is_canonical(&self, hash: B256) -> bool {
235        let mut current_block = self.current_canonical_head.hash;
236        if current_block == hash {
237            return true
238        }
239
240        while let Some(executed) = self.blocks_by_hash.get(&current_block) {
241            current_block = executed.recovered_block().parent_hash();
242            if current_block == hash {
243                return true
244            }
245        }
246
247        false
248    }
249
250    /// Removes canonical blocks below the upper bound, only if the last persisted hash is
251    /// part of the canonical chain.
252    pub(crate) fn remove_canonical_until(
253        &mut self,
254        upper_bound: BlockNumber,
255        last_persisted_hash: B256,
256    ) {
257        debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
258
259        // If the last persisted hash is not canonical, then we don't want to remove any canonical
260        // blocks yet.
261        if !self.is_canonical(last_persisted_hash) {
262            return
263        }
264
265        // First, let's walk back the canonical chain and remove canonical blocks lower than the
266        // upper bound
267        let mut current_block = self.current_canonical_head.hash;
268        while let Some(executed) = self.blocks_by_hash.get(&current_block) {
269            current_block = executed.recovered_block().parent_hash();
270            if executed.recovered_block().number() <= upper_bound {
271                debug!(target: "engine::tree", num_hash=?executed.recovered_block().num_hash(), "Attempting to remove block walking back from the head");
272                if let Some((removed, _)) = self.remove_by_hash(executed.recovered_block().hash()) {
273                    debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed block walking back from the head");
274                    // finally, move the trie updates
275                    self.persisted_trie_updates.insert(
276                        removed.recovered_block().hash(),
277                        (removed.recovered_block().number(), removed.trie),
278                    );
279                }
280            }
281        }
282        debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removed canonical blocks from the tree");
283    }
284
285    /// Removes all blocks that are below the finalized block, as well as removing non-canonical
286    /// sidechains that fork from below the finalized block.
287    pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
288        let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
289
290        // We remove disconnected sidechains in three steps:
291        // * first, remove everything with a block number __below__ the finalized block.
292        // * next, we populate a vec with parents __at__ the finalized block.
293        // * finally, we iterate through the vec, removing children until the vec is empty
294        // (BFS).
295
296        // We _exclude_ the finalized block because we will be dealing with the blocks __at__
297        // the finalized block later.
298        let blocks_to_remove = self
299            .blocks_by_number
300            .range((Bound::Unbounded, Bound::Excluded(finalized_num)))
301            .flat_map(|(_, blocks)| blocks.iter().map(|b| b.recovered_block().hash()))
302            .collect::<Vec<_>>();
303        for hash in blocks_to_remove {
304            if let Some((removed, _)) = self.remove_by_hash(hash) {
305                debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain block");
306            }
307        }
308
309        // remove trie updates that are below the finalized block
310        self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num > finalized_num);
311
312        // The only block that should remain at the `finalized` number now, is the finalized
313        // block, if it exists.
314        //
315        // For all other blocks, we  first put their children into this vec.
316        // Then, we will iterate over them, removing them, adding their children, etc,
317        // until the vec is empty.
318        let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
319
320        // re-insert the finalized hash if we removed it
321        if let Some(position) =
322            blocks_to_remove.iter().position(|b| b.recovered_block().hash() == finalized_hash)
323        {
324            let finalized_block = blocks_to_remove.swap_remove(position);
325            self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
326        }
327
328        let mut blocks_to_remove = blocks_to_remove
329            .into_iter()
330            .map(|e| e.recovered_block().hash())
331            .collect::<VecDeque<_>>();
332        while let Some(block) = blocks_to_remove.pop_front() {
333            if let Some((removed, children)) = self.remove_by_hash(block) {
334                debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain child block");
335                blocks_to_remove.extend(children);
336            }
337        }
338    }
339
340    /// Remove all blocks up to __and including__ the given block number.
341    ///
342    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
343    /// those which have a fork point at or below the finalized hash.
344    ///
345    /// Canonical blocks below the upper bound will still be removed.
346    ///
347    /// NOTE: if the finalized block is greater than the upper bound, the only blocks that will be
348    /// removed are canonical blocks and sidechains that fork below the `upper_bound`. This is the
349    /// same behavior as if the `finalized_num` were `Some(upper_bound)`.
350    pub(crate) fn remove_until(
351        &mut self,
352        upper_bound: BlockNumHash,
353        last_persisted_hash: B256,
354        finalized_num_hash: Option<BlockNumHash>,
355    ) {
356        debug!(target: "engine::tree", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
357
358        // If the finalized num is ahead of the upper bound, and exists, we need to instead ensure
359        // that the only blocks removed, are canonical blocks less than the upper bound
360        let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
361            if upper_bound.number < finalized.number {
362                finalized = upper_bound;
363                debug!(target: "engine::tree", ?finalized, "Adjusted upper bound");
364            }
365            finalized
366        });
367
368        // We want to do two things:
369        // * remove canonical blocks that are persisted
370        // * remove forks whose root are below the finalized block
371        // We can do this in 2 steps:
372        // * remove all canonical blocks below the upper bound
373        // * fetch the number of the finalized hash, removing any sidechains that are __below__ the
374        // finalized block
375        self.remove_canonical_until(upper_bound.number, last_persisted_hash);
376
377        // Now, we have removed canonical blocks (assuming the upper bound is above the finalized
378        // block) and only have sidechains below the finalized block.
379        if let Some(finalized_num_hash) = finalized_num_hash {
380            self.prune_finalized_sidechains(finalized_num_hash);
381        }
382    }
383
384    /// Determines if the second block is a direct descendant of the first block.
385    ///
386    /// If the two blocks are the same, this returns `false`.
387    fn is_descendant(&self, first: BlockNumHash, second: &N::BlockHeader) -> bool {
388        // If the second block's parent is the first block's hash, then it is a direct descendant
389        // and we can return early.
390        if second.parent_hash() == first.hash {
391            return true
392        }
393
394        // If the second block is lower than, or has the same block number, they are not
395        // descendants.
396        if second.number() <= first.number {
397            return false
398        }
399
400        // iterate through parents of the second until we reach the number
401        let Some(mut current_block) = self.block_by_hash(second.parent_hash()) else {
402            // If we can't find its parent in the tree, we can't continue, so return false
403            return false
404        };
405
406        while current_block.number() > first.number + 1 {
407            let Some(block) = self.block_by_hash(current_block.header().parent_hash()) else {
408                // If we can't find its parent in the tree, we can't continue, so return false
409                return false
410            };
411
412            current_block = block;
413        }
414
415        // Now the block numbers should be equal, so we compare hashes.
416        current_block.parent_hash() == first.hash
417    }
418
419    /// Updates the canonical head to the given block.
420    const fn set_canonical_head(&mut self, new_head: BlockNumHash) {
421        self.current_canonical_head = new_head;
422    }
423
424    /// Returns the tracked canonical head.
425    const fn canonical_head(&self) -> &BlockNumHash {
426        &self.current_canonical_head
427    }
428
429    /// Returns the block hash of the canonical head.
430    const fn canonical_block_hash(&self) -> B256 {
431        self.canonical_head().hash
432    }
433
434    /// Returns the block number of the canonical head.
435    const fn canonical_block_number(&self) -> BlockNumber {
436        self.canonical_head().number
437    }
438}
439
440/// A builder for creating state providers that can be used across threads.
441#[derive(Clone, Debug)]
442pub struct StateProviderBuilder<N: NodePrimitives, P> {
443    /// The provider factory used to create providers.
444    provider_factory: P,
445    /// The historical block hash to fetch state from.
446    historical: B256,
447    /// The blocks that form the chain from historical to target and are in memory.
448    overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
449}
450
451impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
452    /// Creates a new state provider from the provider factory, historical block hash and optional
453    /// overlaid blocks.
454    pub const fn new(
455        provider_factory: P,
456        historical: B256,
457        overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
458    ) -> Self {
459        Self { provider_factory, historical, overlay }
460    }
461}
462
463impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
464where
465    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
466{
467    /// Creates a new state provider from this builder.
468    pub fn build(&self) -> ProviderResult<StateProviderBox> {
469        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
470        if let Some(overlay) = self.overlay.clone() {
471            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
472        }
473        Ok(provider)
474    }
475}
476
477/// Tracks the state of the engine api internals.
478///
479/// This type is not shareable.
480#[derive(Debug)]
481pub struct EngineApiTreeState<N: NodePrimitives> {
482    /// Tracks the state of the blockchain tree.
483    tree_state: TreeState<N>,
484    /// Tracks the forkchoice state updates received by the CL.
485    forkchoice_state_tracker: ForkchoiceStateTracker,
486    /// Buffer of detached blocks.
487    buffer: BlockBuffer<N::Block>,
488    /// Tracks the header of invalid payloads that were rejected by the engine because they're
489    /// invalid.
490    invalid_headers: InvalidHeaderCache,
491}
492
493impl<N: NodePrimitives> EngineApiTreeState<N> {
494    fn new(
495        block_buffer_limit: u32,
496        max_invalid_header_cache_length: u32,
497        canonical_block: BlockNumHash,
498    ) -> Self {
499        Self {
500            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
501            buffer: BlockBuffer::new(block_buffer_limit),
502            tree_state: TreeState::new(canonical_block),
503            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
504        }
505    }
506}
507
508/// The outcome of a tree operation.
509#[derive(Debug)]
510pub struct TreeOutcome<T> {
511    /// The outcome of the operation.
512    pub outcome: T,
513    /// An optional event to tell the caller to do something.
514    pub event: Option<TreeEvent>,
515}
516
517impl<T> TreeOutcome<T> {
518    /// Create new tree outcome.
519    pub const fn new(outcome: T) -> Self {
520        Self { outcome, event: None }
521    }
522
523    /// Set event on the outcome.
524    pub fn with_event(mut self, event: TreeEvent) -> Self {
525        self.event = Some(event);
526        self
527    }
528}
529
530/// Events that are triggered by Tree Chain
531#[derive(Debug)]
532pub enum TreeEvent {
533    /// Tree action is needed.
534    TreeAction(TreeAction),
535    /// Backfill action is needed.
536    BackfillAction(BackfillAction),
537    /// Block download is needed.
538    Download(DownloadRequest),
539}
540
541impl TreeEvent {
542    /// Returns true if the event is a backfill action.
543    const fn is_backfill_action(&self) -> bool {
544        matches!(self, Self::BackfillAction(_))
545    }
546}
547
548/// The actions that can be performed on the tree.
549#[derive(Debug)]
550pub enum TreeAction {
551    /// Make target canonical.
552    MakeCanonical {
553        /// The sync target head hash
554        sync_target_head: B256,
555    },
556}
557
558/// The engine API tree handler implementation.
559///
560/// This type is responsible for processing engine API requests, maintaining the canonical state and
561/// emitting events.
562pub struct EngineApiTreeHandler<N, P, E, T, V, C>
563where
564    N: NodePrimitives,
565    T: PayloadTypes,
566{
567    provider: P,
568    executor_provider: E,
569    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
570    payload_validator: V,
571    /// Keeps track of internals such as executed and buffered blocks.
572    state: EngineApiTreeState<N>,
573    /// The half for sending messages to the engine.
574    ///
575    /// This is kept so that we can queue in messages to ourself that we can process later, for
576    /// example distributing workload across multiple messages that would otherwise take too long
577    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
578    /// them one by one so that we can handle incoming engine API in between and don't become
579    /// unresponsive. This can happen during live sync transition where we're trying to close the
580    /// gap (up to 3 epochs of blocks in the worst case).
581    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
582    /// Incoming engine API requests.
583    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
584    /// Outgoing events that are emitted to the handler.
585    outgoing: UnboundedSender<EngineApiEvent<N>>,
586    /// Channels to the persistence layer.
587    persistence: PersistenceHandle<N>,
588    /// Tracks the state changes of the persistence task.
589    persistence_state: PersistenceState,
590    /// Flag indicating the state of the node's backfill synchronization process.
591    backfill_sync_state: BackfillSyncState,
592    /// Keeps track of the state of the canonical chain that isn't persisted yet.
593    /// This is intended to be accessed from external sources, such as rpc.
594    canonical_in_memory_state: CanonicalInMemoryState<N>,
595    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
596    /// updates
597    payload_builder: PayloadBuilderHandle<T>,
598    /// Configuration settings.
599    config: TreeConfig,
600    /// Metrics for the engine api.
601    metrics: EngineApiMetrics,
602    /// An invalid block hook.
603    invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
604    /// The engine API variant of this handler
605    engine_kind: EngineApiKind,
606    /// The type responsible for processing new payloads
607    payload_processor: PayloadProcessor<N, C>,
608}
609
610impl<N, P: Debug, E: Debug, T: PayloadTypes + Debug, V: Debug, C: Debug> std::fmt::Debug
611    for EngineApiTreeHandler<N, P, E, T, V, C>
612where
613    N: NodePrimitives,
614{
615    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616        f.debug_struct("EngineApiTreeHandler")
617            .field("provider", &self.provider)
618            .field("executor_provider", &self.executor_provider)
619            .field("consensus", &self.consensus)
620            .field("payload_validator", &self.payload_validator)
621            .field("state", &self.state)
622            .field("incoming_tx", &self.incoming_tx)
623            .field("persistence", &self.persistence)
624            .field("persistence_state", &self.persistence_state)
625            .field("backfill_sync_state", &self.backfill_sync_state)
626            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
627            .field("payload_builder", &self.payload_builder)
628            .field("config", &self.config)
629            .field("metrics", &self.metrics)
630            .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
631            .field("engine_kind", &self.engine_kind)
632            .finish()
633    }
634}
635
636impl<N, P, E, T, V, C> EngineApiTreeHandler<N, P, E, T, V, C>
637where
638    N: NodePrimitives,
639    P: DatabaseProviderFactory
640        + BlockReader<Block = N::Block, Header = N::BlockHeader>
641        + StateProviderFactory
642        + StateReader<Receipt = N::Receipt>
643        + StateCommitmentProvider
644        + HashedPostStateProvider
645        + Clone
646        + 'static,
647    <P as DatabaseProviderFactory>::Provider:
648        BlockReader<Block = N::Block, Header = N::BlockHeader>,
649    E: BlockExecutorProvider<Primitives = N>,
650    C: ConfigureEvm<Primitives = N> + 'static,
651    T: PayloadTypes,
652    V: EngineValidator<T, Block = N::Block>,
653{
654    /// Creates a new [`EngineApiTreeHandler`].
655    #[expect(clippy::too_many_arguments)]
656    pub fn new(
657        provider: P,
658        executor_provider: E,
659        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
660        payload_validator: V,
661        outgoing: UnboundedSender<EngineApiEvent<N>>,
662        state: EngineApiTreeState<N>,
663        canonical_in_memory_state: CanonicalInMemoryState<N>,
664        persistence: PersistenceHandle<N>,
665        persistence_state: PersistenceState,
666        payload_builder: PayloadBuilderHandle<T>,
667        config: TreeConfig,
668        engine_kind: EngineApiKind,
669        evm_config: C,
670    ) -> Self {
671        let (incoming_tx, incoming) = std::sync::mpsc::channel();
672
673        let payload_processor =
674            PayloadProcessor::new(WorkloadExecutor::default(), evm_config, &config);
675
676        Self {
677            provider,
678            executor_provider,
679            consensus,
680            payload_validator,
681            incoming,
682            outgoing,
683            persistence,
684            persistence_state,
685            backfill_sync_state: BackfillSyncState::Idle,
686            state,
687            canonical_in_memory_state,
688            payload_builder,
689            config,
690            metrics: Default::default(),
691            incoming_tx,
692            invalid_block_hook: Box::new(NoopInvalidBlockHook),
693            engine_kind,
694            payload_processor,
695        }
696    }
697
698    /// Sets the invalid block hook.
699    fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
700        self.invalid_block_hook = invalid_block_hook;
701    }
702
703    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
704    /// own thread.
705    ///
706    /// Returns the sender through which incoming requests can be sent to the task and the receiver
707    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
708    #[expect(clippy::complexity)]
709    pub fn spawn_new(
710        provider: P,
711        executor_provider: E,
712        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
713        payload_validator: V,
714        persistence: PersistenceHandle<N>,
715        payload_builder: PayloadBuilderHandle<T>,
716        canonical_in_memory_state: CanonicalInMemoryState<N>,
717        config: TreeConfig,
718        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
719        kind: EngineApiKind,
720        evm_config: C,
721    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
722    {
723        let best_block_number = provider.best_block_number().unwrap_or(0);
724        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
725
726        let persistence_state = PersistenceState {
727            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
728            rx: None,
729        };
730
731        let (tx, outgoing) = unbounded_channel();
732        let state = EngineApiTreeState::new(
733            config.block_buffer_limit(),
734            config.max_invalid_header_cache_length(),
735            header.num_hash(),
736        );
737
738        let mut task = Self::new(
739            provider,
740            executor_provider,
741            consensus,
742            payload_validator,
743            tx,
744            state,
745            canonical_in_memory_state,
746            persistence,
747            persistence_state,
748            payload_builder,
749            config,
750            kind,
751            evm_config,
752        );
753        task.set_invalid_block_hook(invalid_block_hook);
754        let incoming = task.incoming_tx.clone();
755        std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
756        (incoming, outgoing)
757    }
758
759    /// Returns a new [`Sender`] to send messages to this type.
760    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
761        self.incoming_tx.clone()
762    }
763
764    /// Run the engine API handler.
765    ///
766    /// This will block the current thread and process incoming messages.
767    pub fn run(mut self) {
768        loop {
769            match self.try_recv_engine_message() {
770                Ok(Some(msg)) => {
771                    debug!(target: "engine::tree", %msg, "received new engine message");
772                    if let Err(fatal) = self.on_engine_message(msg) {
773                        error!(target: "engine::tree", %fatal, "insert block fatal error");
774                        return
775                    }
776                }
777                Ok(None) => {
778                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
779                }
780                Err(_err) => {
781                    error!(target: "engine::tree", "Engine channel disconnected");
782                    return
783                }
784            }
785
786            if let Err(err) = self.advance_persistence() {
787                error!(target: "engine::tree", %err, "Advancing persistence failed");
788                return
789            }
790        }
791    }
792
793    /// Invoked when previously requested blocks were downloaded.
794    ///
795    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
796    /// will execute the first batch and send the remaining blocks back through the channel so that
797    /// block request processing isn't blocked for a long time.
798    fn on_downloaded(
799        &mut self,
800        mut blocks: Vec<RecoveredBlock<N::Block>>,
801    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
802        if blocks.is_empty() {
803            // nothing to execute
804            return Ok(None)
805        }
806
807        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
808        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
809        for block in blocks.drain(..batch) {
810            if let Some(event) = self.on_downloaded_block(block)? {
811                let needs_backfill = event.is_backfill_action();
812                self.on_tree_event(event)?;
813                if needs_backfill {
814                    // can exit early if backfill is needed
815                    return Ok(None)
816                }
817            }
818        }
819
820        // if we still have blocks to execute, send them as a followup request
821        if !blocks.is_empty() {
822            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
823        }
824
825        Ok(None)
826    }
827
828    /// When the Consensus layer receives a new block via the consensus gossip protocol,
829    /// the transactions in the block are sent to the execution layer in the form of a
830    /// [`PayloadTypes::ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
831    /// Execution layer executes the transactions and validates the state in the block header,
832    /// then passes validation data back to Consensus layer, that adds the block to the head of
833    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
834    /// network in the form of a "Beacon block".
835    ///
836    /// These responses should adhere to the [Engine API Spec for
837    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
838    ///
839    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
840    /// returns an error if an internal error occurred.
841    #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
842    fn on_new_payload(
843        &mut self,
844        payload: T::ExecutionData,
845    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
846        trace!(target: "engine::tree", "invoked new payload");
847        self.metrics.engine.new_payload_messages.increment(1);
848
849        // Ensures that the given payload does not violate any consensus rules that concern the
850        // block's layout, like:
851        //    - missing or invalid base fee
852        //    - invalid extra data
853        //    - invalid transactions
854        //    - incorrect hash
855        //    - the versioned hashes passed with the payload do not exactly match transaction
856        //      versioned hashes
857        //    - the block does not contain blob transactions if it is pre-cancun
858        //
859        // This validates the following engine API rule:
860        //
861        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
862        //    validation by taking the following steps:
863        //
864        //   1. Obtain the actual array by concatenating blob versioned hashes lists
865        //      (`tx.blob_versioned_hashes`) of each [blob
866        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
867        //      in the payload, respecting the order of inclusion. If the payload has no blob
868        //      transactions the expected array **MUST** be `[]`.
869        //
870        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
871        //      null}` if the expected and the actual arrays don't match.
872        //
873        // This validation **MUST** be instantly run in all cases even during active sync process.
874        let parent_hash = payload.parent_hash();
875        let block = match self.payload_validator.ensure_well_formed_payload(payload) {
876            Ok(block) => block,
877            Err(error) => {
878                error!(target: "engine::tree", %error, "Invalid payload");
879                // we need to convert the error to a payload status (response to the CL)
880
881                let latest_valid_hash =
882                    if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
883                        // Engine-API rules:
884                        // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
885                        // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
886                        None
887                    } else {
888                        self.latest_valid_hash_for_invalid_payload(parent_hash)?
889                    };
890
891                let status = PayloadStatusEnum::from(error);
892                return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
893            }
894        };
895
896        let block_hash = block.hash();
897        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
898        if lowest_buffered_ancestor == block_hash {
899            lowest_buffered_ancestor = block.parent_hash();
900        }
901
902        // now check the block itself
903        if let Some(status) =
904            self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?
905        {
906            return Ok(TreeOutcome::new(status))
907        }
908
909        let status = if self.backfill_sync_state.is_idle() {
910            let mut latest_valid_hash = None;
911            let num_hash = block.num_hash();
912            match self.insert_block(block) {
913                Ok(status) => {
914                    let status = match status {
915                        InsertPayloadOk::Inserted(BlockStatus::Valid) => {
916                            latest_valid_hash = Some(block_hash);
917                            self.try_connect_buffered_blocks(num_hash)?;
918                            PayloadStatusEnum::Valid
919                        }
920                        InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
921                            latest_valid_hash = Some(block_hash);
922                            PayloadStatusEnum::Valid
923                        }
924                        InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
925                        InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
926                            // not known to be invalid, but we don't know anything else
927                            PayloadStatusEnum::Syncing
928                        }
929                    };
930
931                    PayloadStatus::new(status, latest_valid_hash)
932                }
933                Err(error) => self.on_insert_block_error(error)?,
934            }
935        } else if let Err(error) = self.buffer_block(block) {
936            self.on_insert_block_error(error)?
937        } else {
938            PayloadStatus::from_status(PayloadStatusEnum::Syncing)
939        };
940
941        let mut outcome = TreeOutcome::new(status);
942        // if the block is valid and it is the current sync target head, make it canonical
943        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
944            // but only if it isn't already the canonical head
945            if self.state.tree_state.canonical_block_hash() != block_hash {
946                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
947                    sync_target_head: block_hash,
948                }));
949            }
950        }
951
952        Ok(outcome)
953    }
954
955    /// Returns the new chain for the given head.
956    ///
957    /// This also handles reorgs.
958    ///
959    /// Note: This does not update the tracked state and instead returns the new chain based on the
960    /// given head.
961    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
962        // get the executed new head block
963        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
964            return Ok(None)
965        };
966
967        let new_head_number = new_head_block.recovered_block().number();
968        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
969
970        let mut new_chain = vec![new_head_block.clone()];
971        let mut current_hash = new_head_block.recovered_block().parent_hash();
972        let mut current_number = new_head_number - 1;
973
974        // Walk back the new chain until we reach a block we know about
975        //
976        // This is only done for in-memory blocks, because we should not have persisted any blocks
977        // that are _above_ the current canonical head.
978        while current_number > current_canonical_number {
979            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
980            {
981                current_hash = block.recovered_block().parent_hash();
982                current_number -= 1;
983                new_chain.push(block);
984            } else {
985                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
986                // This should never happen as we're walking back a chain that should connect to
987                // the canonical chain
988                return Ok(None);
989            }
990        }
991
992        // If we have reached the current canonical head by walking back from the target, then we
993        // know this represents an extension of the canonical chain.
994        if current_hash == self.state.tree_state.current_canonical_head.hash {
995            new_chain.reverse();
996
997            // Simple extension of the current chain
998            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
999        }
1000
1001        // We have a reorg. Walk back both chains to find the fork point.
1002        let mut old_chain = Vec::new();
1003        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
1004
1005        // If the canonical chain is ahead of the new chain,
1006        // gather all blocks until new head number.
1007        while current_canonical_number > current_number {
1008            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1009                old_chain.push(block.clone());
1010                old_hash = block.recovered_block().parent_hash();
1011                current_canonical_number -= 1;
1012            } else {
1013                // This shouldn't happen as we're walking back the canonical chain
1014                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1015                return Ok(None);
1016            }
1017        }
1018
1019        // Both new and old chain pointers are now at the same height.
1020        debug_assert_eq!(current_number, current_canonical_number);
1021
1022        // Walk both chains from specified hashes at same height until
1023        // a common ancestor (fork block) is reached.
1024        while old_hash != current_hash {
1025            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1026                old_hash = block.recovered_block().parent_hash();
1027                old_chain.push(block);
1028            } else {
1029                // This shouldn't happen as we're walking back the canonical chain
1030                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1031                return Ok(None);
1032            }
1033
1034            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
1035            {
1036                current_hash = block.recovered_block().parent_hash();
1037                new_chain.push(block);
1038            } else {
1039                // This shouldn't happen as we've already walked this path
1040                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
1041                return Ok(None);
1042            }
1043        }
1044        new_chain.reverse();
1045        old_chain.reverse();
1046
1047        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
1048    }
1049
1050    /// Determines if the given block is part of a fork by checking that these
1051    /// conditions are true:
1052    /// * walking back from the target hash to verify that the target hash is not part of an
1053    ///   extension of the canonical chain.
1054    /// * walking back from the current head to verify that the target hash is not already part of
1055    ///   the canonical chain.
1056    fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
1057        // verify that the given hash is not part of an extension of the canon chain.
1058        let canonical_head = self.state.tree_state.canonical_head();
1059        let mut current_hash = target_hash;
1060        while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
1061            if current_block.hash() == canonical_head.hash {
1062                return Ok(false)
1063            }
1064            // We already passed the canonical head
1065            if current_block.number() <= canonical_head.number {
1066                break
1067            }
1068            current_hash = current_block.parent_hash();
1069        }
1070
1071        // verify that the given hash is not already part of canonical chain stored in memory
1072        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
1073            return Ok(false)
1074        }
1075
1076        // verify that the given hash is not already part of persisted canonical chain
1077        if self.provider.block_number(target_hash)?.is_some() {
1078            return Ok(false)
1079        }
1080
1081        Ok(true)
1082    }
1083
1084    /// Returns the persisting kind for the input block.
1085    fn persisting_kind_for(&self, block: &N::BlockHeader) -> PersistingKind {
1086        // Check that we're currently persisting.
1087        let Some(action) = self.persistence_state.current_action() else {
1088            return PersistingKind::NotPersisting
1089        };
1090        // Check that the persistince action is saving blocks, not removing them.
1091        let CurrentPersistenceAction::SavingBlocks { highest } = action else {
1092            return PersistingKind::PersistingNotDescendant
1093        };
1094
1095        // The block being validated can only be a descendant if its number is higher than
1096        // the highest block persisting. Otherwise, it's likely a fork of a lower block.
1097        if block.number() > highest.number && self.state.tree_state.is_descendant(*highest, block) {
1098            return PersistingKind::PersistingDescendant
1099        }
1100
1101        // In all other cases, the block is not a descendant.
1102        PersistingKind::PersistingNotDescendant
1103    }
1104
1105    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1106    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1107    /// chain.
1108    ///
1109    /// These responses should adhere to the [Engine API Spec for
1110    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1111    ///
1112    /// Returns an error if an internal error occurred like a database error.
1113    #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
1114    fn on_forkchoice_updated(
1115        &mut self,
1116        state: ForkchoiceState,
1117        attrs: Option<T::PayloadAttributes>,
1118        version: EngineApiMessageVersion,
1119    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1120        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1121        self.metrics.engine.forkchoice_updated_messages.increment(1);
1122        self.canonical_in_memory_state.on_forkchoice_update_received();
1123
1124        if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1125            return Ok(TreeOutcome::new(on_updated))
1126        }
1127
1128        let valid_outcome = |head| {
1129            TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1130                PayloadStatusEnum::Valid,
1131                Some(head),
1132            )))
1133        };
1134
1135        // Process the forkchoice update by trying to make the head block canonical
1136        //
1137        // We can only process this forkchoice update if:
1138        // - we have the `head` block
1139        // - the head block is part of a chain that is connected to the canonical chain. This
1140        //   includes reorgs.
1141        //
1142        // Performing a FCU involves:
1143        // - marking the FCU's head block as canonical
1144        // - updating in memory state to reflect the new canonical chain
1145        // - updating canonical state trackers
1146        // - emitting a canonicalization event for the new chain (including reorg)
1147        // - if we have payload attributes, delegate them to the payload service
1148
1149        // 1. ensure we have a new head block
1150        if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1151            trace!(target: "engine::tree", "fcu head hash is already canonical");
1152
1153            // update the safe and finalized blocks and ensure their values are valid
1154            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1155                // safe or finalized hashes are invalid
1156                return Ok(TreeOutcome::new(outcome))
1157            }
1158
1159            // we still need to process payload attributes if the head is already canonical
1160            if let Some(attr) = attrs {
1161                let tip = self
1162                    .block_by_hash(self.state.tree_state.canonical_block_hash())?
1163                    .ok_or_else(|| {
1164                        // If we can't find the canonical block, then something is wrong and we need
1165                        // to return an error
1166                        ProviderError::HeaderNotFound(state.head_block_hash.into())
1167                    })?;
1168                let updated = self.process_payload_attributes(attr, tip.header(), state, version);
1169                return Ok(TreeOutcome::new(updated))
1170            }
1171
1172            // the head block is already canonical
1173            return Ok(valid_outcome(state.head_block_hash))
1174        }
1175
1176        // 2. check if the head is already part of the canonical chain
1177        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1178            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1179
1180            // For OpStack the proposers are allowed to reorg their own chain at will, so we need to
1181            // always trigger a new payload job if requested.
1182            if self.engine_kind.is_opstack() {
1183                if let Some(attr) = attrs {
1184                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1185                    let updated =
1186                        self.process_payload_attributes(attr, &canonical_header, state, version);
1187                    return Ok(TreeOutcome::new(updated))
1188                }
1189            }
1190
1191            // 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
1192            //    payload build process if `forkchoiceState.headBlockHash` references a `VALID`
1193            //    ancestor of the head of canonical chain, i.e. the ancestor passed payload
1194            //    validation process and deemed `VALID`. In the case of such an event, client
1195            //    software MUST return `{payloadStatus: {status: VALID, latestValidHash:
1196            //    forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1197
1198            // the head block is already canonical, so we're not triggering a payload job and can
1199            // return right away
1200            return Ok(valid_outcome(state.head_block_hash))
1201        }
1202
1203        // 3. ensure we can apply a new chain update for the head block
1204        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1205            let tip = chain_update.tip().clone_sealed_header();
1206            self.on_canonical_chain_update(chain_update);
1207
1208            // update the safe and finalized blocks and ensure their values are valid
1209            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1210                // safe or finalized hashes are invalid
1211                return Ok(TreeOutcome::new(outcome))
1212            }
1213
1214            if let Some(attr) = attrs {
1215                let updated = self.process_payload_attributes(attr, &tip, state, version);
1216                return Ok(TreeOutcome::new(updated))
1217            }
1218
1219            return Ok(valid_outcome(state.head_block_hash))
1220        }
1221
1222        // 4. we don't have the block to perform the update
1223        // we assume the FCU is valid and at least the head is missing,
1224        // so we need to start syncing to it
1225        //
1226        // find the appropriate target to sync to, if we don't have the safe block hash then we
1227        // start syncing to the safe block via backfill first
1228        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1229            // check that safe block is valid and missing
1230            !state.safe_block_hash.is_zero() &&
1231            self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1232        {
1233            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1234            state.safe_block_hash
1235        } else {
1236            state.head_block_hash
1237        };
1238
1239        let target = self.lowest_buffered_ancestor_or(target);
1240        trace!(target: "engine::tree", %target, "downloading missing block");
1241
1242        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1243            PayloadStatusEnum::Syncing,
1244        )))
1245        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1246    }
1247
1248    /// Attempts to receive the next engine request.
1249    ///
1250    /// If there's currently no persistence action in progress, this will block until a new request
1251    /// is received. If there's a persistence action in progress, this will try to receive the
1252    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1253    /// received in time.
1254    ///
1255    /// Returns an error if the engine channel is disconnected.
1256    #[expect(clippy::type_complexity)]
1257    fn try_recv_engine_message(
1258        &self,
1259    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1260        if self.persistence_state.in_progress() {
1261            // try to receive the next request with a timeout to not block indefinitely
1262            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1263                Ok(msg) => Ok(Some(msg)),
1264                Err(err) => match err {
1265                    RecvTimeoutError::Timeout => Ok(None),
1266                    RecvTimeoutError::Disconnected => Err(RecvError),
1267                },
1268            }
1269        } else {
1270            self.incoming.recv().map(Some)
1271        }
1272    }
1273
1274    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1275    /// the current persistence action while we're removing blocks.
1276    fn remove_blocks(&mut self, new_tip_num: u64) {
1277        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1278        if new_tip_num < self.persistence_state.last_persisted_block.number {
1279            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1280            let (tx, rx) = oneshot::channel();
1281            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1282            self.persistence_state.start_remove(new_tip_num, rx);
1283        }
1284    }
1285
1286    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1287    /// the current persistence action while we're saving blocks.
1288    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1289        if blocks_to_persist.is_empty() {
1290            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1291            return
1292        }
1293
1294        // NOTE: checked non-empty above
1295        let highest_num_hash = blocks_to_persist
1296            .iter()
1297            .max_by_key(|block| block.recovered_block().number())
1298            .map(|b| b.recovered_block().num_hash())
1299            .expect("Checked non-empty persisting blocks");
1300
1301        debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1302        let (tx, rx) = oneshot::channel();
1303        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1304
1305        self.persistence_state.start_save(highest_num_hash, rx);
1306    }
1307
1308    /// Attempts to advance the persistence state.
1309    ///
1310    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1311    /// or send a new persistence action if necessary.
1312    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1313        if self.persistence_state.in_progress() {
1314            let (mut rx, start_time, current_action) = self
1315                .persistence_state
1316                .rx
1317                .take()
1318                .expect("if a persistence task is in progress Receiver must be Some");
1319            // Check if persistence has complete
1320            match rx.try_recv() {
1321                Ok(last_persisted_hash_num) => {
1322                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1323                    let Some(BlockNumHash {
1324                        hash: last_persisted_block_hash,
1325                        number: last_persisted_block_number,
1326                    }) = last_persisted_hash_num
1327                    else {
1328                        // if this happened, then we persisted no blocks because we sent an
1329                        // empty vec of blocks
1330                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1331                        return Ok(())
1332                    };
1333
1334                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1335                    self.persistence_state
1336                        .finish(last_persisted_block_hash, last_persisted_block_number);
1337                    self.on_new_persisted_block()?;
1338                }
1339                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1340                Err(TryRecvError::Empty) => {
1341                    self.persistence_state.rx = Some((rx, start_time, current_action))
1342                }
1343            }
1344        }
1345
1346        if !self.persistence_state.in_progress() {
1347            if let Some(new_tip_num) = self.find_disk_reorg()? {
1348                self.remove_blocks(new_tip_num)
1349            } else if self.should_persist() {
1350                let blocks_to_persist = self.get_canonical_blocks_to_persist();
1351                self.persist_blocks(blocks_to_persist);
1352            }
1353        }
1354
1355        Ok(())
1356    }
1357
1358    /// Handles a message from the engine.
1359    fn on_engine_message(
1360        &mut self,
1361        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1362    ) -> Result<(), InsertBlockFatalError> {
1363        match msg {
1364            FromEngine::Event(event) => match event {
1365                FromOrchestrator::BackfillSyncStarted => {
1366                    debug!(target: "engine::tree", "received backfill sync started event");
1367                    self.backfill_sync_state = BackfillSyncState::Active;
1368                }
1369                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1370                    self.on_backfill_sync_finished(ctrl)?;
1371                }
1372            },
1373            FromEngine::Request(request) => {
1374                match request {
1375                    EngineApiRequest::InsertExecutedBlock(block) => {
1376                        let block_num_hash = block.recovered_block().num_hash();
1377                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1378                        let now = Instant::now();
1379
1380                        // if the parent is the canonical head, we can insert the block as the
1381                        // pending block
1382                        if self.state.tree_state.canonical_block_hash() ==
1383                            block.recovered_block().parent_hash()
1384                        {
1385                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1386                            self.canonical_in_memory_state.set_pending_block(block.clone());
1387                        }
1388
1389                        self.state.tree_state.insert_executed(block.clone());
1390                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1391                        self.emit_event(EngineApiEvent::BeaconConsensus(
1392                            BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1393                        ));
1394                    }
1395                    EngineApiRequest::Beacon(request) => {
1396                        match request {
1397                            BeaconEngineMessage::ForkchoiceUpdated {
1398                                state,
1399                                payload_attrs,
1400                                tx,
1401                                version,
1402                            } => {
1403                                let mut output =
1404                                    self.on_forkchoice_updated(state, payload_attrs, version);
1405
1406                                if let Ok(res) = &mut output {
1407                                    // track last received forkchoice state
1408                                    self.state
1409                                        .forkchoice_state_tracker
1410                                        .set_latest(state, res.outcome.forkchoice_status());
1411
1412                                    // emit an event about the handled FCU
1413                                    self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1414                                        state,
1415                                        res.outcome.forkchoice_status(),
1416                                    ));
1417
1418                                    // handle the event if any
1419                                    self.on_maybe_tree_event(res.event.take())?;
1420                                }
1421
1422                                if let Err(err) =
1423                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1424                                {
1425                                    self.metrics
1426                                        .engine
1427                                        .failed_forkchoice_updated_response_deliveries
1428                                        .increment(1);
1429                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1430                                }
1431                            }
1432                            BeaconEngineMessage::NewPayload { payload, tx } => {
1433                                let mut output = self.on_new_payload(payload);
1434
1435                                let maybe_event =
1436                                    output.as_mut().ok().and_then(|out| out.event.take());
1437
1438                                // emit response
1439                                if let Err(err) =
1440                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1441                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1442                                    }))
1443                                {
1444                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1445                                    self.metrics
1446                                        .engine
1447                                        .failed_new_payload_response_deliveries
1448                                        .increment(1);
1449                                }
1450
1451                                // handle the event if any
1452                                self.on_maybe_tree_event(maybe_event)?;
1453                            }
1454                            BeaconEngineMessage::TransitionConfigurationExchanged => {
1455                                // triggering this hook will record that we received a request from
1456                                // the CL
1457                                self.canonical_in_memory_state
1458                                    .on_transition_configuration_exchanged();
1459                            }
1460                        }
1461                    }
1462                }
1463            }
1464            FromEngine::DownloadedBlocks(blocks) => {
1465                if let Some(event) = self.on_downloaded(blocks)? {
1466                    self.on_tree_event(event)?;
1467                }
1468            }
1469        }
1470        Ok(())
1471    }
1472
1473    /// Invoked if the backfill sync has finished to target.
1474    ///
1475    /// At this point we consider the block synced to the backfill target.
1476    ///
1477    /// Checks the tracked finalized block against the block on disk and requests another backfill
1478    /// run if the distance to the tip exceeds the threshold for another backfill run.
1479    ///
1480    /// This will also do the necessary housekeeping of the tree state, this includes:
1481    ///  - removing all blocks below the backfill height
1482    ///  - resetting the canonical in-memory state
1483    ///
1484    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1485    /// target block.
1486    fn on_backfill_sync_finished(
1487        &mut self,
1488        ctrl: ControlFlow,
1489    ) -> Result<(), InsertBlockFatalError> {
1490        debug!(target: "engine::tree", "received backfill sync finished event");
1491        self.backfill_sync_state = BackfillSyncState::Idle;
1492
1493        // backfill height is the block number that the backfill finished at
1494        let mut backfill_height = ctrl.block_number();
1495
1496        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1497        if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1498            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1499            // update the `invalid_headers` cache with the new invalid header
1500            self.state.invalid_headers.insert(**bad_block);
1501
1502            // if this was an unwind then the target is the new height
1503            backfill_height = Some(*target);
1504        }
1505
1506        // backfill height is the block number that the backfill finished at
1507        let Some(backfill_height) = backfill_height else { return Ok(()) };
1508
1509        // state house keeping after backfill sync
1510        // remove all executed blocks below the backfill height
1511        //
1512        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1513        // before that
1514        let Some(backfill_num_hash) = self
1515            .provider
1516            .block_hash(backfill_height)?
1517            .map(|hash| BlockNumHash { hash, number: backfill_height })
1518        else {
1519            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1520            return Ok(())
1521        };
1522
1523        if ctrl.is_unwind() {
1524            // the node reset so we need to clear everything above that height so that backfill
1525            // height is the new canonical block.
1526            self.state.tree_state.reset(backfill_num_hash)
1527        } else {
1528            self.state.tree_state.remove_until(
1529                backfill_num_hash,
1530                self.persistence_state.last_persisted_block.hash,
1531                Some(backfill_num_hash),
1532            );
1533        }
1534
1535        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1536        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1537
1538        // remove all buffered blocks below the backfill height
1539        self.state.buffer.remove_old_blocks(backfill_height);
1540        // we remove all entries because now we're synced to the backfill target and consider this
1541        // the canonical chain
1542        self.canonical_in_memory_state.clear_state();
1543
1544        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1545            // update the tracked chain height, after backfill sync both the canonical height and
1546            // persisted height are the same
1547            self.state.tree_state.set_canonical_head(new_head.num_hash());
1548            self.persistence_state.finish(new_head.hash(), new_head.number());
1549
1550            // update the tracked canonical head
1551            self.canonical_in_memory_state.set_canonical_head(new_head);
1552        }
1553
1554        // check if we need to run backfill again by comparing the most recent finalized height to
1555        // the backfill height
1556        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1557        else {
1558            return Ok(())
1559        };
1560        if sync_target_state.finalized_block_hash.is_zero() {
1561            // no finalized block, can't check distance
1562            return Ok(())
1563        }
1564        // get the block number of the finalized block, if we have it
1565        let newest_finalized = self
1566            .state
1567            .buffer
1568            .block(&sync_target_state.finalized_block_hash)
1569            .map(|block| block.number());
1570
1571        // The block number that the backfill finished at - if the progress or newest
1572        // finalized is None then we can't check the distance anyways.
1573        //
1574        // If both are Some, we perform another distance check and return the desired
1575        // backfill target
1576        if let Some(backfill_target) =
1577            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1578                // Determines whether or not we should run backfill again, in case
1579                // the new gap is still large enough and requires running backfill again
1580                self.backfill_sync_target(progress, finalized_number, None)
1581            })
1582        {
1583            // request another backfill run
1584            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1585                backfill_target.into(),
1586            )));
1587            return Ok(())
1588        };
1589
1590        // try to close the gap by executing buffered blocks that are child blocks of the new head
1591        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1592    }
1593
1594    /// Attempts to make the given target canonical.
1595    ///
1596    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1597    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1598        if let Some(chain_update) = self.on_new_head(target)? {
1599            self.on_canonical_chain_update(chain_update);
1600        }
1601
1602        Ok(())
1603    }
1604
1605    /// Convenience function to handle an optional tree event.
1606    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1607        if let Some(event) = event {
1608            self.on_tree_event(event)?;
1609        }
1610
1611        Ok(())
1612    }
1613
1614    /// Handles a tree event.
1615    ///
1616    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1617    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1618        match event {
1619            TreeEvent::TreeAction(action) => match action {
1620                TreeAction::MakeCanonical { sync_target_head } => {
1621                    self.make_canonical(sync_target_head)?;
1622                }
1623            },
1624            TreeEvent::BackfillAction(action) => {
1625                self.emit_event(EngineApiEvent::BackfillAction(action));
1626            }
1627            TreeEvent::Download(action) => {
1628                self.emit_event(EngineApiEvent::Download(action));
1629            }
1630        }
1631
1632        Ok(())
1633    }
1634
1635    /// Emits an outgoing event to the engine.
1636    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1637        let event = event.into();
1638
1639        if event.is_backfill_action() {
1640            debug_assert_eq!(
1641                self.backfill_sync_state,
1642                BackfillSyncState::Idle,
1643                "backfill action should only be emitted when backfill is idle"
1644            );
1645
1646            if self.persistence_state.in_progress() {
1647                // backfill sync and persisting data are mutually exclusive, so we can't start
1648                // backfill while we're still persisting
1649                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1650                return
1651            }
1652
1653            self.backfill_sync_state = BackfillSyncState::Pending;
1654            self.metrics.engine.pipeline_runs.increment(1);
1655            debug!(target: "engine::tree", "emitting backfill action event");
1656        }
1657
1658        let _ = self.outgoing.send(event).inspect_err(
1659            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1660        );
1661    }
1662
1663    /// Returns true if the canonical chain length minus the last persisted
1664    /// block is greater than or equal to the persistence threshold and
1665    /// backfill is not running.
1666    pub const fn should_persist(&self) -> bool {
1667        if !self.backfill_sync_state.is_idle() {
1668            // can't persist if backfill is running
1669            return false
1670        }
1671
1672        let min_block = self.persistence_state.last_persisted_block.number;
1673        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1674            self.config.persistence_threshold()
1675    }
1676
1677    /// Returns a batch of consecutive canonical blocks to persist in the range
1678    /// `(last_persisted_number .. canonical_head - threshold]` . The expected
1679    /// order is oldest -> newest.
1680    fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
1681        let mut blocks_to_persist = Vec::new();
1682        let mut current_hash = self.state.tree_state.canonical_block_hash();
1683        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1684
1685        let canonical_head_number = self.state.tree_state.canonical_block_number();
1686
1687        let target_number =
1688            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1689
1690        debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1691        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1692            if block.recovered_block().number() <= last_persisted_number {
1693                break;
1694            }
1695
1696            if block.recovered_block().number() <= target_number {
1697                blocks_to_persist.push(block.clone());
1698            }
1699
1700            current_hash = block.recovered_block().parent_hash();
1701        }
1702
1703        // reverse the order so that the oldest block comes first
1704        blocks_to_persist.reverse();
1705
1706        blocks_to_persist
1707    }
1708
1709    /// This clears the blocks from the in-memory tree state that have been persisted to the
1710    /// database.
1711    ///
1712    /// This also updates the canonical in-memory state to reflect the newest persisted block
1713    /// height.
1714    ///
1715    /// Assumes that `finish` has been called on the `persistence_state` at least once
1716    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1717        // If we have an on-disk reorg, we need to handle it firsrt before touching the in-memory
1718        // state.
1719        if let Some(remove_above) = self.find_disk_reorg()? {
1720            self.remove_blocks(remove_above);
1721            return Ok(())
1722        }
1723
1724        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1725        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1726        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1727            number: self.persistence_state.last_persisted_block.number,
1728            hash: self.persistence_state.last_persisted_block.hash,
1729        });
1730        Ok(())
1731    }
1732
1733    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1734    ///
1735    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1736    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1737    /// has in memory.
1738    ///
1739    /// For finalized blocks, this will return `None`.
1740    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1741        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1742        // check memory first
1743        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash).cloned() {
1744            return Ok(Some(block.block))
1745        }
1746
1747        let (block, senders) = self
1748            .provider
1749            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1750            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1751            .split_sealed();
1752        let execution_output = self
1753            .provider
1754            .get_state(block.header().number())?
1755            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1756        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1757
1758        Ok(Some(ExecutedBlock {
1759            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1760            execution_output: Arc::new(execution_output),
1761            hashed_state: Arc::new(hashed_state),
1762        }))
1763    }
1764
1765    /// Return sealed block from database or in-memory state by hash.
1766    fn sealed_header_by_hash(
1767        &self,
1768        hash: B256,
1769    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1770        // check memory first
1771        let block = self
1772            .state
1773            .tree_state
1774            .block_by_hash(hash)
1775            .map(|block| block.as_ref().clone_sealed_header());
1776
1777        if block.is_some() {
1778            Ok(block)
1779        } else {
1780            self.provider.sealed_header_by_hash(hash)
1781        }
1782    }
1783
1784    /// Return block from database or in-memory state by hash.
1785    fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1786        // check database first
1787        let mut block = self.provider.block_by_hash(hash)?;
1788        if block.is_none() {
1789            // Note: it's fine to return the unsealed block because the caller already has
1790            // the hash
1791            block = self
1792                .state
1793                .tree_state
1794                .block_by_hash(hash)
1795                // TODO: clone for compatibility. should we return an Arc here?
1796                .map(|block| block.as_ref().clone().into_block());
1797        }
1798        Ok(block)
1799    }
1800
1801    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1802    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1803    /// not exist in the buffer, this returns the hash that is passed in.
1804    ///
1805    /// Returns the parent hash of the block itself if the block is buffered and has no other
1806    /// buffered ancestors.
1807    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1808        self.state
1809            .buffer
1810            .lowest_ancestor(&hash)
1811            .map(|block| block.parent_hash())
1812            .unwrap_or_else(|| hash)
1813    }
1814
1815    /// If validation fails, the response MUST contain the latest valid hash:
1816    ///
1817    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1818    ///     conditions:
1819    ///     - It is fully validated and deemed VALID
1820    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1821    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1822    ///     conditions are satisfied by a `PoW` block.
1823    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1824    ///     the above conditions.
1825    fn latest_valid_hash_for_invalid_payload(
1826        &mut self,
1827        parent_hash: B256,
1828    ) -> ProviderResult<Option<B256>> {
1829        // Check if parent exists in side chain or in canonical chain.
1830        if self.block_by_hash(parent_hash)?.is_some() {
1831            return Ok(Some(parent_hash))
1832        }
1833
1834        // iterate over ancestors in the invalid cache
1835        // until we encounter the first valid ancestor
1836        let mut current_hash = parent_hash;
1837        let mut current_block = self.state.invalid_headers.get(&current_hash);
1838        while let Some(block_with_parent) = current_block {
1839            current_hash = block_with_parent.parent;
1840            current_block = self.state.invalid_headers.get(&current_hash);
1841
1842            // If current_header is None, then the current_hash does not have an invalid
1843            // ancestor in the cache, check its presence in blockchain tree
1844            if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1845                return Ok(Some(current_hash))
1846            }
1847        }
1848        Ok(None)
1849    }
1850
1851    /// Prepares the invalid payload response for the given hash, checking the
1852    /// database for the parent hash and populating the payload status with the latest valid hash
1853    /// according to the engine api spec.
1854    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1855        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1856        // PoW block, which we need to identify by looking at the parent's block difficulty
1857        if let Some(parent) = self.block_by_hash(parent_hash)? {
1858            if !parent.header().difficulty().is_zero() {
1859                parent_hash = B256::ZERO;
1860            }
1861        }
1862
1863        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1864        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1865            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1866        })
1867        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1868    }
1869
1870    /// Returns true if the given hash is the last received sync target block.
1871    ///
1872    /// See [`ForkchoiceStateTracker::sync_target_state`]
1873    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1874        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1875            return target.head_block_hash == block_hash
1876        }
1877        false
1878    }
1879
1880    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1881    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1882    ///
1883    /// Returns a payload status response according to the engine API spec if the block is known to
1884    /// be invalid.
1885    fn check_invalid_ancestor_with_head(
1886        &mut self,
1887        check: B256,
1888        head: &SealedBlock<N::Block>,
1889    ) -> ProviderResult<Option<PayloadStatus>> {
1890        // check if the check hash was previously marked as invalid
1891        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1892
1893        // populate the latest valid hash field
1894        let status = self.prepare_invalid_response(header.parent)?;
1895
1896        // insert the head block into the invalid header cache
1897        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), header);
1898        self.emit_event(BeaconConsensusEngineEvent::InvalidBlock(Box::new(head.clone())));
1899
1900        Ok(Some(status))
1901    }
1902
1903    /// Checks if the given `head` points to an invalid header, which requires a specific response
1904    /// to a forkchoice update.
1905    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1906        // check if the head was previously marked as invalid
1907        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1908        // populate the latest valid hash field
1909        Ok(Some(self.prepare_invalid_response(header.parent)?))
1910    }
1911
1912    /// Validate if block is correct and satisfies all the consensus rules that concern the header
1913    /// and block body itself.
1914    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1915        if let Err(e) =
1916            self.consensus.validate_header_with_total_difficulty(block.header(), U256::MAX)
1917        {
1918            error!(
1919                target: "engine::tree",
1920                ?block,
1921                "Failed to validate total difficulty for block {}: {e}",
1922                block.hash()
1923            );
1924            return Err(e)
1925        }
1926
1927        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1928            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1929            return Err(e)
1930        }
1931
1932        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1933            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1934            return Err(e)
1935        }
1936
1937        Ok(())
1938    }
1939
1940    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
1941    #[instrument(level = "trace", skip(self), target = "engine::tree")]
1942    fn try_connect_buffered_blocks(
1943        &mut self,
1944        parent: BlockNumHash,
1945    ) -> Result<(), InsertBlockFatalError> {
1946        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1947
1948        if blocks.is_empty() {
1949            // nothing to append
1950            return Ok(())
1951        }
1952
1953        let now = Instant::now();
1954        let block_count = blocks.len();
1955        for child in blocks {
1956            let child_num_hash = child.num_hash();
1957            match self.insert_block(child) {
1958                Ok(res) => {
1959                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1960                    if self.is_sync_target_head(child_num_hash.hash) &&
1961                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1962                    {
1963                        self.make_canonical(child_num_hash.hash)?;
1964                    }
1965                }
1966                Err(err) => {
1967                    debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1968                    if let Err(fatal) = self.on_insert_block_error(err) {
1969                        warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1970                        return Err(fatal)
1971                    }
1972                }
1973            }
1974        }
1975
1976        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1977        Ok(())
1978    }
1979
1980    /// Pre-validates the block and inserts it into the buffer.
1981    fn buffer_block(
1982        &mut self,
1983        block: RecoveredBlock<N::Block>,
1984    ) -> Result<(), InsertBlockError<N::Block>> {
1985        if let Err(err) = self.validate_block(&block) {
1986            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1987        }
1988        self.state.buffer.insert_block(block);
1989        Ok(())
1990    }
1991
1992    /// Returns true if the distance from the local tip to the block is greater than the configured
1993    /// threshold.
1994    ///
1995    /// If the `local_tip` is greater than the `block`, then this will return false.
1996    #[inline]
1997    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1998        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1999    }
2000
2001    /// Returns how far the local tip is from the given block. If the local tip is at the same
2002    /// height or its block number is greater than the given block, this returns None.
2003    #[inline]
2004    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2005        if block > local_tip {
2006            Some(block - local_tip)
2007        } else {
2008            None
2009        }
2010    }
2011
2012    /// Returns the target hash to sync to if the distance from the local tip to the block is
2013    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2014    /// that block already).
2015    ///
2016    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2017    /// (missing) finalized block.
2018    fn backfill_sync_target(
2019        &self,
2020        canonical_tip_num: u64,
2021        target_block_number: u64,
2022        downloaded_block: Option<BlockNumHash>,
2023    ) -> Option<B256> {
2024        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2025
2026        // check if the distance exceeds the threshold for backfill sync
2027        let mut exceeds_backfill_threshold =
2028            self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
2029
2030        // check if the downloaded block is the tracked finalized block
2031        if let Some(buffered_finalized) = sync_target_state
2032            .as_ref()
2033            .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2034        {
2035            // if we have buffered the finalized block, we should check how far
2036            // we're off
2037            exceeds_backfill_threshold =
2038                self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
2039        }
2040
2041        // If this is invoked after we downloaded a block we can check if this block is the
2042        // finalized block
2043        if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
2044            if downloaded_block.hash == state.finalized_block_hash {
2045                // we downloaded the finalized block and can now check how far we're off
2046                exceeds_backfill_threshold =
2047                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
2048            }
2049        }
2050
2051        // if the number of missing blocks is greater than the max, trigger backfill
2052        if exceeds_backfill_threshold {
2053            if let Some(state) = sync_target_state {
2054                // if we have already canonicalized the finalized block, we should skip backfill
2055                match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2056                    Err(err) => {
2057                        warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2058                    }
2059                    Ok(None) => {
2060                        // ensure the finalized block is known (not the zero hash)
2061                        if !state.finalized_block_hash.is_zero() {
2062                            // we don't have the block yet and the distance exceeds the allowed
2063                            // threshold
2064                            return Some(state.finalized_block_hash)
2065                        }
2066
2067                        // OPTIMISTIC SYNCING
2068                        //
2069                        // It can happen when the node is doing an
2070                        // optimistic sync, where the CL has no knowledge of the finalized hash,
2071                        // but is expecting the EL to sync as high
2072                        // as possible before finalizing.
2073                        //
2074                        // This usually doesn't happen on ETH mainnet since CLs use the more
2075                        // secure checkpoint syncing.
2076                        //
2077                        // However, optimism chains will do this. The risk of a reorg is however
2078                        // low.
2079                        debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2080                        return Some(state.head_block_hash)
2081                    }
2082                    Ok(Some(_)) => {
2083                        // we're fully synced to the finalized block
2084                    }
2085                }
2086            }
2087        }
2088
2089        None
2090    }
2091
2092    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2093    /// happen if a reorg is happening while we are persisting a block.
2094    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2095        let mut canonical = self.state.tree_state.current_canonical_head;
2096        let mut persisted = self.persistence_state.last_persisted_block;
2097
2098        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2099            Ok(self
2100                .sealed_header_by_hash(num_hash.hash)?
2101                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2102                .parent_num_hash())
2103        };
2104
2105        // Happy path, canonical chain is ahead or equal to persisted chain.
2106        // Walk canonical chain back to make sure that it connects to persisted chain.
2107        while canonical.number > persisted.number {
2108            canonical = parent_num_hash(canonical)?;
2109        }
2110
2111        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2112        if canonical == persisted {
2113            return Ok(None);
2114        }
2115
2116        // At this point, we know that `persisted` block can't be reached by walking the canonical
2117        // chain back. In this case we need to truncate it to the first canonical block it connects
2118        // to.
2119
2120        // Firstly, walk back until we reach the same height as `canonical`.
2121        while persisted.number > canonical.number {
2122            persisted = parent_num_hash(persisted)?;
2123        }
2124
2125        debug_assert_eq!(persisted.number, canonical.number);
2126
2127        // Now walk both chains back until we find a common ancestor.
2128        while persisted.hash != canonical.hash {
2129            canonical = parent_num_hash(canonical)?;
2130            persisted = parent_num_hash(persisted)?;
2131        }
2132
2133        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2134
2135        Ok(Some(persisted.number))
2136    }
2137
2138    /// Invoked when we the canonical chain has been updated.
2139    ///
2140    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2141    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2142        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2143        let start = Instant::now();
2144
2145        // update the tracked canonical head
2146        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2147
2148        let tip = chain_update.tip().clone_sealed_header();
2149        let notification = chain_update.to_chain_notification();
2150
2151        // reinsert any missing reorged blocks
2152        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2153            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2154            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2155            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2156
2157            self.update_reorg_metrics(old.len());
2158            self.reinsert_reorged_blocks(new.clone());
2159            // Try reinserting the reorged canonical chain. This is only possible if we have
2160            // `persisted_trie_updates` for those blocks.
2161            let old = old
2162                .iter()
2163                .filter_map(|block| {
2164                    let (_, trie) = self
2165                        .state
2166                        .tree_state
2167                        .persisted_trie_updates
2168                        .get(&block.recovered_block.hash())
2169                        .cloned()?;
2170                    Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
2171                })
2172                .collect::<Vec<_>>();
2173            self.reinsert_reorged_blocks(old);
2174        }
2175
2176        // update the tracked in-memory state with the new chain
2177        self.canonical_in_memory_state.update_chain(chain_update);
2178        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2179
2180        // Update metrics based on new tip
2181        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2182
2183        // sends an event to all active listeners about the new canonical chain
2184        self.canonical_in_memory_state.notify_canon_state(notification);
2185
2186        // emit event
2187        self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
2188            Box::new(tip),
2189            start.elapsed(),
2190        ));
2191    }
2192
2193    /// This updates metrics based on the given reorg length.
2194    fn update_reorg_metrics(&self, old_chain_length: usize) {
2195        self.metrics.tree.reorgs.increment(1);
2196        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2197    }
2198
2199    /// This reinserts any blocks in the new chain that do not already exist in the tree
2200    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2201        for block in new_chain {
2202            if self
2203                .state
2204                .tree_state
2205                .executed_block_by_hash(block.recovered_block().hash())
2206                .is_none()
2207            {
2208                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2209                self.state.tree_state.insert_executed(block);
2210            }
2211        }
2212    }
2213
2214    /// Invoke the invalid block hook if this is a new invalid block.
2215    fn on_invalid_block(
2216        &mut self,
2217        parent_header: &SealedHeader<N::BlockHeader>,
2218        block: &RecoveredBlock<N::Block>,
2219        output: &BlockExecutionOutput<N::Receipt>,
2220        trie_updates: Option<(&TrieUpdates, B256)>,
2221    ) {
2222        if self.state.invalid_headers.get(&block.hash()).is_some() {
2223            // we already marked this block as invalid
2224            return;
2225        }
2226        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
2227    }
2228
2229    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2230    ///
2231    /// This mainly compares the missing parent of the downloaded block with the current canonical
2232    /// tip, and decides whether or not backfill sync should be triggered.
2233    fn on_disconnected_downloaded_block(
2234        &self,
2235        downloaded_block: BlockNumHash,
2236        missing_parent: BlockNumHash,
2237        head: BlockNumHash,
2238    ) -> Option<TreeEvent> {
2239        // compare the missing parent with the canonical tip
2240        if let Some(target) =
2241            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2242        {
2243            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2244            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2245        }
2246
2247        // continue downloading the missing parent
2248        //
2249        // this happens if either:
2250        //  * the missing parent block num < canonical tip num
2251        //    * this case represents a missing block on a fork that is shorter than the canonical
2252        //      chain
2253        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2254        //    less than the backfill threshold
2255        //    * this case represents a potentially long range of blocks to download and execute
2256        let request = if let Some(distance) =
2257            self.distance_from_local_tip(head.number, missing_parent.number)
2258        {
2259            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2260            DownloadRequest::BlockRange(missing_parent.hash, distance)
2261        } else {
2262            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2263            // This happens when the missing parent is on an outdated
2264            // sidechain and we can only download the missing block itself
2265            DownloadRequest::single_block(missing_parent.hash)
2266        };
2267
2268        Some(TreeEvent::Download(request))
2269    }
2270
2271    /// Invoked with a block downloaded from the network
2272    ///
2273    /// Returns an event with the appropriate action to take, such as:
2274    ///  - download more missing blocks
2275    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2276    #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2277    fn on_downloaded_block(
2278        &mut self,
2279        block: RecoveredBlock<N::Block>,
2280    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2281        let block_num_hash = block.num_hash();
2282        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2283        if self
2284            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2285            .is_some()
2286        {
2287            return Ok(None)
2288        }
2289
2290        if !self.backfill_sync_state.is_idle() {
2291            return Ok(None)
2292        }
2293
2294        // try to append the block
2295        match self.insert_block(block) {
2296            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2297                if self.is_sync_target_head(block_num_hash.hash) {
2298                    trace!(target: "engine::tree", "appended downloaded sync target block");
2299
2300                    // we just inserted the current sync target block, we can try to make it
2301                    // canonical
2302                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2303                        sync_target_head: block_num_hash.hash,
2304                    })))
2305                }
2306                trace!(target: "engine::tree", "appended downloaded block");
2307                self.try_connect_buffered_blocks(block_num_hash)?;
2308            }
2309            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2310                // block is not connected to the canonical head, we need to download
2311                // its missing branch first
2312                return Ok(self.on_disconnected_downloaded_block(
2313                    block_num_hash,
2314                    missing_ancestor,
2315                    head,
2316                ))
2317            }
2318            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2319                trace!(target: "engine::tree", "downloaded block already executed");
2320            }
2321            Err(err) => {
2322                debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2323                if let Err(fatal) = self.on_insert_block_error(err) {
2324                    warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2325                    return Err(fatal)
2326                }
2327            }
2328        }
2329        Ok(None)
2330    }
2331
2332    fn insert_block(
2333        &mut self,
2334        block: RecoveredBlock<N::Block>,
2335    ) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
2336        self.insert_block_inner(block.clone())
2337            .map_err(|kind| InsertBlockError::new(block.into_sealed_block(), kind))
2338    }
2339
2340    fn insert_block_inner(
2341        &mut self,
2342        block: RecoveredBlock<N::Block>,
2343    ) -> Result<InsertPayloadOk, InsertBlockErrorKind> {
2344        let block_num_hash = block.num_hash();
2345        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2346
2347        if self.block_by_hash(block.hash())?.is_some() {
2348            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2349        }
2350
2351        let start = Instant::now();
2352
2353        trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
2354
2355        // validate block consensus rules
2356        self.validate_block(&block)?;
2357
2358        trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
2359        let Some(provider_builder) = self.state_provider_builder(block.parent_hash())? else {
2360            // we don't have the state required to execute this block, buffering it and find the
2361            // missing parent block
2362            let missing_ancestor = self
2363                .state
2364                .buffer
2365                .lowest_ancestor(&block.parent_hash())
2366                .map(|block| block.parent_num_hash())
2367                .unwrap_or_else(|| block.parent_num_hash());
2368
2369            self.state.buffer.insert_block(block);
2370
2371            return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2372                head: self.state.tree_state.current_canonical_head,
2373                missing_ancestor,
2374            }))
2375        };
2376
2377        // now validate against the parent
2378        let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
2379            InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
2380                block.parent_hash().into(),
2381            ))
2382        })?;
2383        if let Err(e) =
2384            self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
2385        {
2386            warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
2387            return Err(e.into())
2388        }
2389
2390        let state_provider = provider_builder.build()?;
2391
2392        // We only run the parallel state root if we are not currently persisting any blocks or
2393        // persisting blocks that are all ancestors of the one we are executing.
2394        //
2395        // If we're committing ancestor blocks, then: any trie updates being committed are a subset
2396        // of the in-memory trie updates collected before fetching reverts. So any diff in
2397        // reverts (pre vs post commit) is already covered by the in-memory trie updates we
2398        // collect in `compute_state_root_parallel`.
2399        //
2400        // See https://github.com/paradigmxyz/reth/issues/12688 for more details
2401        let persisting_kind = self.persisting_kind_for(block.header());
2402        let run_parallel_state_root = persisting_kind.can_run_parallel_state_root();
2403
2404        // use prewarming background task
2405        let header = block.clone_sealed_header();
2406        let txs = block.clone_transactions_recovered().collect();
2407        let mut handle = if run_parallel_state_root && self.config.use_state_root_task() {
2408            // use background tasks for state root calc
2409            let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2410
2411            // Compute trie input
2412            let trie_input_start = Instant::now();
2413            let trie_input = self
2414                .compute_trie_input(
2415                    persisting_kind,
2416                    consistent_view.clone(),
2417                    block.header().parent_hash(),
2418                )
2419                .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
2420
2421            self.metrics
2422                .block_validation
2423                .trie_input_duration
2424                .record(trie_input_start.elapsed().as_secs_f64());
2425
2426            self.payload_processor.spawn(
2427                header,
2428                txs,
2429                provider_builder,
2430                consistent_view,
2431                trie_input,
2432                &self.config,
2433            )
2434        } else {
2435            self.payload_processor.spawn_cache_exclusive(header, txs, provider_builder)
2436        };
2437
2438        // Use cached state provider before executing, used in execution after prewarming threads
2439        // complete
2440        let state_provider = CachedStateProvider::new_with_caches(
2441            state_provider,
2442            handle.caches(),
2443            handle.cache_metrics(),
2444        );
2445
2446        debug!(target: "engine::tree", block=?block_num_hash, "Executing block");
2447
2448        let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
2449        let execution_start = Instant::now();
2450        let output = self.metrics.executor.execute_metered(
2451            executor,
2452            &block,
2453            Box::new(handle.state_hook()),
2454        )?;
2455        let execution_finish = Instant::now();
2456        let execution_time = execution_finish.duration_since(execution_start);
2457        debug!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
2458
2459        // after executing the block we can stop executing transactions
2460        handle.stop_prewarming_execution();
2461
2462        if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
2463            // call post-block hook
2464            self.on_invalid_block(&parent_block, &block, &output, None);
2465            return Err(err.into())
2466        }
2467
2468        let hashed_state = self.provider.hashed_post_state(&output.state);
2469
2470        if let Err(err) = self
2471            .payload_validator
2472            .validate_block_post_execution_with_hashed_state(&hashed_state, &block)
2473        {
2474            // call post-block hook
2475            self.on_invalid_block(&parent_block, &block, &output, None);
2476            return Err(err.into())
2477        }
2478
2479        debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
2480
2481        let root_time = Instant::now();
2482
2483        let mut maybe_state_root = None;
2484
2485        if run_parallel_state_root {
2486            // if we new payload extends the current canonical change we attempt to use the
2487            // background task or try to compute it in parallel
2488            if self.config.use_state_root_task() {
2489                match handle.state_root() {
2490                    Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
2491                        let elapsed = execution_finish.elapsed();
2492                        info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
2493                        // we double check the state root here for good measure
2494                        if state_root == block.header().state_root() {
2495                            maybe_state_root = Some((state_root, trie_updates, elapsed))
2496                        } else {
2497                            warn!(
2498                                target: "engine::tree",
2499                                ?state_root,
2500                                block_state_root = ?block.header().state_root(),
2501                                "State root task returned incorrect state root"
2502                            );
2503                        }
2504                    }
2505                    Err(error) => {
2506                        debug!(target: "engine::tree", %error, "Background parallel state root computation failed");
2507                    }
2508                }
2509            } else {
2510                match self.compute_state_root_parallel(
2511                    persisting_kind,
2512                    block.header().parent_hash(),
2513                    &hashed_state,
2514                ) {
2515                    Ok(result) => {
2516                        info!(
2517                            target: "engine::tree",
2518                            block = ?block_num_hash,
2519                            regular_state_root = ?result.0,
2520                            "Regular root task finished"
2521                        );
2522                        maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
2523                    }
2524                    Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2525                        debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
2526                    }
2527                    Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))),
2528                }
2529            }
2530        }
2531
2532        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
2533            maybe_state_root
2534        {
2535            maybe_state_root
2536        } else {
2537            // fallback is to compute the state root regularly in sync
2538            warn!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
2539            self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
2540            let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?;
2541            (root, updates, root_time.elapsed())
2542        };
2543
2544        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2545        debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
2546
2547        // ensure state root matches
2548        if state_root != block.header().state_root() {
2549            // call post-block hook
2550            self.on_invalid_block(&parent_block, &block, &output, Some((&trie_output, state_root)));
2551            return Err(ConsensusError::BodyStateRootDiff(
2552                GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2553            )
2554            .into())
2555        }
2556
2557        // terminate prewarming task with good state output
2558        handle.terminate_caching(Some(output.state.clone()));
2559
2560        let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
2561            block: ExecutedBlock {
2562                recovered_block: Arc::new(block),
2563                execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
2564                hashed_state: Arc::new(hashed_state),
2565            },
2566            trie: Arc::new(trie_output),
2567        };
2568
2569        // if the parent is the canonical head, we can insert the block as the pending block
2570        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2571        {
2572            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2573            self.canonical_in_memory_state.set_pending_block(executed.clone());
2574        }
2575
2576        self.state.tree_state.insert_executed(executed.clone());
2577        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2578
2579        // emit insert event
2580        let elapsed = start.elapsed();
2581        let engine_event = if self.is_fork(block_num_hash.hash)? {
2582            BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2583        } else {
2584            BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2585        };
2586        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2587
2588        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2589        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2590    }
2591
2592    /// Compute state root for the given hashed post state in parallel.
2593    ///
2594    /// # Returns
2595    ///
2596    /// Returns `Ok(_)` if computed successfully.
2597    /// Returns `Err(_)` if error was encountered during computation.
2598    /// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
2599    /// should be used instead.
2600    fn compute_state_root_parallel(
2601        &self,
2602        persisting_kind: PersistingKind,
2603        parent_hash: B256,
2604        hashed_state: &HashedPostState,
2605    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2606        let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2607
2608        let mut input =
2609            self.compute_trie_input(persisting_kind, consistent_view.clone(), parent_hash)?;
2610        // Extend with block we are validating root for.
2611        input.append_ref(hashed_state);
2612
2613        ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2614    }
2615
2616    /// Computes the trie input at the provided parent hash.
2617    ///
2618    /// The goal of this function is to take in-memory blocks and generate a [`TrieInput`] that
2619    /// serves as an overlay to the database blocks.
2620    ///
2621    /// It works as follows:
2622    /// 1. Collect in-memory blocks that are descendants of the provided parent hash using
2623    ///    [`TreeState::blocks_by_hash`].
2624    /// 2. If the persistence is in progress, and the block that we're computing the trie input for
2625    ///    is a descendant of the currently persisting blocks, we need to be sure that in-memory
2626    ///    blocks are not overlapping with the database blocks that may have been already persisted.
2627    ///    To do that, we're filtering out in-memory blocks that are lower than the highest database
2628    ///    block.
2629    /// 3. Once in-memory blocks are collected and optionally filtered, we compute the
2630    ///    [`HashedPostState`] from them.
2631    fn compute_trie_input(
2632        &self,
2633        persisting_kind: PersistingKind,
2634        consistent_view: ConsistentDbView<P>,
2635        parent_hash: B256,
2636    ) -> Result<TrieInput, ParallelStateRootError> {
2637        let mut input = TrieInput::default();
2638
2639        let provider = consistent_view.provider_ro()?;
2640        let best_block_number = provider.best_block_number()?;
2641
2642        let (mut historical, mut blocks) = self
2643            .state
2644            .tree_state
2645            .blocks_by_hash(parent_hash)
2646            .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2647
2648        // If the current block is a descendant of the currently persisting blocks, then we need to
2649        // filter in-memory blocks, so that none of them are already persisted in the database.
2650        if persisting_kind.is_descendant() {
2651            // Iterate over the blocks from oldest to newest.
2652            while let Some(block) = blocks.last() {
2653                let recovered_block = block.recovered_block();
2654                if recovered_block.number() <= best_block_number {
2655                    // Remove those blocks that lower than or equal to the highest database
2656                    // block.
2657                    blocks.pop();
2658                } else {
2659                    // If the block is higher than the best block number, stop filtering, as it's
2660                    // the first block that's not in the database.
2661                    break
2662                }
2663            }
2664
2665            historical = if let Some(block) = blocks.last() {
2666                // If there are any in-memory blocks left after filtering, set the anchor to the
2667                // parent of the oldest block.
2668                (block.recovered_block().number() - 1).into()
2669            } else {
2670                // Otherwise, set the anchor to the original provided parent hash.
2671                parent_hash.into()
2672            };
2673        }
2674
2675        if blocks.is_empty() {
2676            debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2677        } else {
2678            debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2679        }
2680
2681        // Convert the historical block to the block number.
2682        let block_number = provider
2683            .convert_hash_or_number(historical)?
2684            .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2685
2686        // Retrieve revert state for historical block.
2687        let revert_state = if block_number == best_block_number {
2688            // We do not check against the `last_block_number` here because
2689            // `HashedPostState::from_reverts` only uses the database tables, and not static files.
2690            debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2691            HashedPostState::default()
2692        } else {
2693            let revert_state = HashedPostState::from_reverts::<
2694                <P::StateCommitment as StateCommitment>::KeyHasher,
2695            >(provider.tx_ref(), block_number + 1)
2696            .map_err(ProviderError::from)?;
2697            debug!(
2698                target: "engine::tree",
2699                block_number,
2700                best_block_number,
2701                accounts = revert_state.accounts.len(),
2702                storages = revert_state.storages.len(),
2703                "Non-empty revert state"
2704            );
2705            revert_state
2706        };
2707        input.append(revert_state);
2708
2709        // Extend with contents of parent in-memory blocks.
2710        for block in blocks.iter().rev() {
2711            input.append_cached_ref(block.trie_updates(), block.hashed_state())
2712        }
2713
2714        Ok(input)
2715    }
2716
2717    /// Handles an error that occurred while inserting a block.
2718    ///
2719    /// If this is a validation error this will mark the block as invalid.
2720    ///
2721    /// Returns the proper payload status response if the block is invalid.
2722    fn on_insert_block_error(
2723        &mut self,
2724        error: InsertBlockError<N::Block>,
2725    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2726        let (block, error) = error.split();
2727
2728        // if invalid block, we check the validation error. Otherwise return the fatal
2729        // error.
2730        let validation_err = error.ensure_validation_error()?;
2731
2732        // If the error was due to an invalid payload, the payload is added to the
2733        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2734        // returned.
2735        warn!(
2736            target: "engine::tree",
2737            invalid_hash=%block.hash(),
2738            invalid_number=block.number(),
2739            %validation_err,
2740            "Invalid block error on new payload",
2741        );
2742        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2743
2744        // keep track of the invalid header
2745        self.state.invalid_headers.insert(block.block_with_parent());
2746        self.emit_event(EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
2747            Box::new(block),
2748        )));
2749        Ok(PayloadStatus::new(
2750            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2751            latest_valid_hash,
2752        ))
2753    }
2754
2755    /// Attempts to find the header for the given block hash if it is canonical.
2756    pub fn find_canonical_header(
2757        &self,
2758        hash: B256,
2759    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2760        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2761
2762        if canonical.is_none() {
2763            canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2764        }
2765
2766        Ok(canonical)
2767    }
2768
2769    /// Updates the tracked finalized block if we have it.
2770    fn update_finalized_block(
2771        &self,
2772        finalized_block_hash: B256,
2773    ) -> Result<(), OnForkChoiceUpdated> {
2774        if finalized_block_hash.is_zero() {
2775            return Ok(())
2776        }
2777
2778        match self.find_canonical_header(finalized_block_hash) {
2779            Ok(None) => {
2780                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2781                // if the finalized block is not known, we can't update the finalized block
2782                return Err(OnForkChoiceUpdated::invalid_state())
2783            }
2784            Ok(Some(finalized)) => {
2785                if Some(finalized.num_hash()) !=
2786                    self.canonical_in_memory_state.get_finalized_num_hash()
2787                {
2788                    // we're also persisting the finalized block on disk so we can reload it on
2789                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2790                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2791                    self.canonical_in_memory_state.set_finalized(finalized);
2792                }
2793            }
2794            Err(err) => {
2795                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2796            }
2797        }
2798
2799        Ok(())
2800    }
2801
2802    /// Updates the tracked safe block if we have it
2803    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2804        if safe_block_hash.is_zero() {
2805            return Ok(())
2806        }
2807
2808        match self.find_canonical_header(safe_block_hash) {
2809            Ok(None) => {
2810                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2811                // if the safe block is not known, we can't update the safe block
2812                return Err(OnForkChoiceUpdated::invalid_state())
2813            }
2814            Ok(Some(safe)) => {
2815                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2816                    // we're also persisting the safe block on disk so we can reload it on
2817                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2818                    let _ = self.persistence.save_safe_block_number(safe.number());
2819                    self.canonical_in_memory_state.set_safe(safe);
2820                }
2821            }
2822            Err(err) => {
2823                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2824            }
2825        }
2826
2827        Ok(())
2828    }
2829
2830    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2831    /// made canonical.
2832    ///
2833    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2834    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2835    ///
2836    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2837    /// are consistent with the head block.
2838    fn ensure_consistent_forkchoice_state(
2839        &self,
2840        state: ForkchoiceState,
2841    ) -> Result<(), OnForkChoiceUpdated> {
2842        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2843        // after the head block is canonicalized.
2844        //
2845        // This ensures that the finalized block is consistent with the head block, i.e. the
2846        // finalized block is an ancestor of the head block.
2847        self.update_finalized_block(state.finalized_block_hash)?;
2848
2849        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2850        // after the head block is canonicalized.
2851        //
2852        // This ensures that the safe block is consistent with the head block, i.e. the safe
2853        // block is an ancestor of the head block.
2854        self.update_safe_block(state.safe_block_hash)
2855    }
2856
2857    /// Pre-validate forkchoice update and check whether it can be processed.
2858    ///
2859    /// This method returns the update outcome if validation fails or
2860    /// the node is syncing and the update cannot be processed at the moment.
2861    fn pre_validate_forkchoice_update(
2862        &mut self,
2863        state: ForkchoiceState,
2864    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2865        if state.head_block_hash.is_zero() {
2866            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2867        }
2868
2869        // check if the new head hash is connected to any ancestor that we previously marked as
2870        // invalid
2871        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2872        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2873            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2874        }
2875
2876        if !self.backfill_sync_state.is_idle() {
2877            // We can only process new forkchoice updates if the pipeline is idle, since it requires
2878            // exclusive access to the database
2879            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2880            return Ok(Some(OnForkChoiceUpdated::syncing()))
2881        }
2882
2883        Ok(None)
2884    }
2885
2886    /// Validates the payload attributes with respect to the header and fork choice state.
2887    ///
2888    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2889    /// return an error if the payload attributes are invalid.
2890    fn process_payload_attributes(
2891        &self,
2892        attrs: T::PayloadAttributes,
2893        head: &N::BlockHeader,
2894        state: ForkchoiceState,
2895        version: EngineApiMessageVersion,
2896    ) -> OnForkChoiceUpdated {
2897        if let Err(err) =
2898            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2899        {
2900            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2901            return OnForkChoiceUpdated::invalid_payload_attributes()
2902        }
2903
2904        // 8. Client software MUST begin a payload build process building on top of
2905        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2906        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2907        //    The build process is specified in the Payload building section.
2908        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2909            state.head_block_hash,
2910            attrs,
2911            version as u8,
2912        ) {
2913            Ok(attributes) => {
2914                // send the payload to the builder and return the receiver for the pending payload
2915                // id, initiating payload job is handled asynchronously
2916                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2917
2918                // Client software MUST respond to this method call in the following way:
2919                // {
2920                //      payloadStatus: {
2921                //          status: VALID,
2922                //          latestValidHash: forkchoiceState.headBlockHash,
2923                //          validationError: null
2924                //      },
2925                //      payloadId: buildProcessId
2926                // }
2927                //
2928                // if the payload is deemed VALID and the build process has begun.
2929                OnForkChoiceUpdated::updated_with_pending_payload_id(
2930                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2931                    pending_payload_id,
2932                )
2933            }
2934            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2935        }
2936    }
2937
2938    /// Remove all blocks up to __and including__ the given block number.
2939    ///
2940    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2941    /// those which have a fork point at or below the finalized hash.
2942    ///
2943    /// Canonical blocks below the upper bound will still be removed.
2944    pub(crate) fn remove_before(
2945        &mut self,
2946        upper_bound: BlockNumHash,
2947        finalized_hash: Option<B256>,
2948    ) -> ProviderResult<()> {
2949        // first fetch the finalized block number and then call the remove_before method on
2950        // tree_state
2951        let num = if let Some(hash) = finalized_hash {
2952            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2953        } else {
2954            None
2955        };
2956
2957        self.state.tree_state.remove_until(
2958            upper_bound,
2959            self.persistence_state.last_persisted_block.hash,
2960            num,
2961        );
2962        Ok(())
2963    }
2964
2965    /// Returns a builder for creating state providers for the given hash.
2966    ///
2967    /// This is an optimization for parallel execution contexts where we want to avoid
2968    /// creating state providers in the critical path.
2969    pub fn state_provider_builder(
2970        &self,
2971        hash: B256,
2972    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2973    where
2974        P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
2975    {
2976        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2977            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2978            // the block leads back to the canonical chain
2979            return Ok(Some(StateProviderBuilder::new(
2980                self.provider.clone(),
2981                historical,
2982                Some(blocks),
2983            )))
2984        }
2985
2986        // Check if the block is persisted
2987        if let Some(header) = self.provider.header(&hash)? {
2988            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2989            // For persisted blocks, we create a builder that will fetch state directly from the
2990            // database
2991            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2992        }
2993
2994        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2995        Ok(None)
2996    }
2997}
2998
2999/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3000/// variant.
3001///
3002/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3003/// is valid or not.
3004#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3005pub enum BlockStatus {
3006    /// The block is valid and block extends canonical chain.
3007    Valid,
3008    /// The block may be valid and has an unknown missing ancestor.
3009    Disconnected {
3010        /// Current canonical head.
3011        head: BlockNumHash,
3012        /// The lowest ancestor block that is not connected to the canonical chain.
3013        missing_ancestor: BlockNumHash,
3014    },
3015}
3016
3017/// How a payload was inserted if it was valid.
3018///
3019/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen(_)`] is
3020/// returned, otherwise [`InsertPayloadOk::Inserted(_)`] is returned.
3021#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3022pub enum InsertPayloadOk {
3023    /// The payload was valid, but we have already seen it.
3024    AlreadySeen(BlockStatus),
3025    /// The payload was valid and inserted into the tree.
3026    Inserted(BlockStatus),
3027}
3028
3029/// Whether or not the blocks are currently persisting and the input block is a descendant.
3030#[derive(Debug, Clone, Copy)]
3031pub enum PersistingKind {
3032    /// The blocks are not currently persisting.
3033    NotPersisting,
3034    /// The blocks are currently persisting but the input block is not a descendant.
3035    PersistingNotDescendant,
3036    /// The blocks are currently persisting and the input block is a descendant.
3037    PersistingDescendant,
3038}
3039
3040impl PersistingKind {
3041    /// Returns true if the parallel state root can be run.
3042    ///
3043    /// We only run the parallel state root if we are not currently persisting any blocks or
3044    /// persisting blocks that are all ancestors of the one we are calculating the state root for.
3045    pub const fn can_run_parallel_state_root(&self) -> bool {
3046        matches!(self, Self::NotPersisting | Self::PersistingDescendant)
3047    }
3048
3049    /// Returns true if the blocks are currently being persisted and the input block is a
3050    /// descendant.
3051    pub const fn is_descendant(&self) -> bool {
3052        matches!(self, Self::PersistingDescendant)
3053    }
3054}
3055
3056#[cfg(test)]
3057mod tests {
3058    use super::*;
3059    use crate::persistence::PersistenceAction;
3060    use alloy_consensus::Header;
3061    use alloy_primitives::Bytes;
3062    use alloy_rlp::Decodable;
3063    use alloy_rpc_types_engine::{
3064        CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1,
3065        ExecutionPayloadV3,
3066    };
3067    use assert_matches::assert_matches;
3068    use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
3069    use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
3070    use reth_engine_primitives::ForkchoiceStatus;
3071    use reth_ethereum_consensus::EthBeaconConsensus;
3072    use reth_ethereum_engine_primitives::EthEngineTypes;
3073    use reth_ethereum_primitives::{Block, EthPrimitives};
3074    use reth_evm::test_utils::MockExecutorProvider;
3075    use reth_evm_ethereum::EthEvmConfig;
3076    use reth_node_ethereum::EthereumEngineValidator;
3077    use reth_primitives_traits::Block as _;
3078    use reth_provider::test_utils::MockEthProvider;
3079    use reth_trie::{updates::TrieUpdates, HashedPostState};
3080    use std::{
3081        str::FromStr,
3082        sync::mpsc::{channel, Sender},
3083    };
3084
3085    /// This is a test channel that allows you to `release` any value that is in the channel.
3086    ///
3087    /// If nothing has been sent, then the next value will be immediately sent.
3088    struct TestChannel<T> {
3089        /// If an item is sent to this channel, an item will be released in the wrapped channel
3090        release: Receiver<()>,
3091        /// The sender channel
3092        tx: Sender<T>,
3093        /// The receiver channel
3094        rx: Receiver<T>,
3095    }
3096
3097    impl<T: Send + 'static> TestChannel<T> {
3098        /// Creates a new test channel
3099        fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
3100            let (original_tx, original_rx) = channel();
3101            let (wrapped_tx, wrapped_rx) = channel();
3102            let (release_tx, release_rx) = channel();
3103            let handle = TestChannelHandle::new(release_tx);
3104            let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
3105            // spawn the task that listens and releases stuff
3106            std::thread::spawn(move || test_channel.intercept_loop());
3107            (original_tx, wrapped_rx, handle)
3108        }
3109
3110        /// Runs the intercept loop, waiting for the handle to release a value
3111        fn intercept_loop(&self) {
3112            while self.release.recv() == Ok(()) {
3113                let Ok(value) = self.rx.recv() else { return };
3114
3115                let _ = self.tx.send(value);
3116            }
3117        }
3118    }
3119
3120    struct TestChannelHandle {
3121        /// The sender to use for releasing values
3122        release: Sender<()>,
3123    }
3124
3125    impl TestChannelHandle {
3126        /// Returns a [`TestChannelHandle`]
3127        const fn new(release: Sender<()>) -> Self {
3128            Self { release }
3129        }
3130
3131        /// Signals to the channel task that a value should be released
3132        #[expect(dead_code)]
3133        fn release(&self) {
3134            let _ = self.release.send(());
3135        }
3136    }
3137
3138    struct TestHarness {
3139        tree: EngineApiTreeHandler<
3140            EthPrimitives,
3141            MockEthProvider,
3142            MockExecutorProvider,
3143            EthEngineTypes,
3144            EthereumEngineValidator,
3145            EthEvmConfig,
3146        >,
3147        to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
3148        from_tree_rx: UnboundedReceiver<EngineApiEvent>,
3149        blocks: Vec<ExecutedBlockWithTrieUpdates>,
3150        action_rx: Receiver<PersistenceAction>,
3151        executor_provider: MockExecutorProvider,
3152        block_builder: TestBlockBuilder,
3153        provider: MockEthProvider,
3154    }
3155
3156    impl TestHarness {
3157        fn new(chain_spec: Arc<ChainSpec>) -> Self {
3158            let (action_tx, action_rx) = channel();
3159            Self::with_persistence_channel(chain_spec, action_tx, action_rx)
3160        }
3161
3162        #[expect(dead_code)]
3163        fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
3164            let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
3165            (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
3166        }
3167
3168        fn with_persistence_channel(
3169            chain_spec: Arc<ChainSpec>,
3170            action_tx: Sender<PersistenceAction>,
3171            action_rx: Receiver<PersistenceAction>,
3172        ) -> Self {
3173            let persistence_handle = PersistenceHandle::new(action_tx);
3174
3175            let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
3176
3177            let provider = MockEthProvider::default();
3178            let executor_provider = MockExecutorProvider::default();
3179
3180            let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
3181
3182            let (from_tree_tx, from_tree_rx) = unbounded_channel();
3183
3184            let header = chain_spec.genesis_header().clone();
3185            let header = SealedHeader::seal_slow(header);
3186            let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
3187            let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
3188
3189            let (to_payload_service, _payload_command_rx) = unbounded_channel();
3190            let payload_builder = PayloadBuilderHandle::new(to_payload_service);
3191
3192            let evm_config = EthEvmConfig::new(chain_spec.clone());
3193
3194            let tree = EngineApiTreeHandler::new(
3195                provider.clone(),
3196                executor_provider.clone(),
3197                consensus,
3198                payload_validator,
3199                from_tree_tx,
3200                engine_api_tree_state,
3201                canonical_in_memory_state,
3202                persistence_handle,
3203                PersistenceState::default(),
3204                payload_builder,
3205                // TODO: fix tests for state root task https://github.com/paradigmxyz/reth/issues/14376
3206                // always assume enough parallelism for tests
3207                TreeConfig::default()
3208                    .with_legacy_state_root(true)
3209                    .with_has_enough_parallelism(true),
3210                EngineApiKind::Ethereum,
3211                evm_config,
3212            );
3213
3214            let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3215            Self {
3216                to_tree_tx: tree.incoming_tx.clone(),
3217                tree,
3218                from_tree_rx,
3219                blocks: vec![],
3220                action_rx,
3221                executor_provider,
3222                block_builder,
3223                provider,
3224            }
3225        }
3226
3227        fn with_blocks(mut self, blocks: Vec<ExecutedBlockWithTrieUpdates>) -> Self {
3228            let mut blocks_by_hash = HashMap::default();
3229            let mut blocks_by_number = BTreeMap::new();
3230            let mut state_by_hash = HashMap::default();
3231            let mut hash_by_number = BTreeMap::new();
3232            let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
3233            let mut parent_hash = B256::ZERO;
3234
3235            for block in &blocks {
3236                let sealed_block = block.recovered_block();
3237                let hash = sealed_block.hash();
3238                let number = sealed_block.number;
3239                blocks_by_hash.insert(hash, block.clone());
3240                blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
3241                state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
3242                hash_by_number.insert(number, hash);
3243                parent_to_child.entry(parent_hash).or_default().insert(hash);
3244                parent_hash = hash;
3245            }
3246
3247            self.tree.state.tree_state = TreeState {
3248                blocks_by_hash,
3249                blocks_by_number,
3250                current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
3251                parent_to_child,
3252                persisted_trie_updates: HashMap::default(),
3253            };
3254
3255            let last_executed_block = blocks.last().unwrap().clone();
3256            let pending = Some(BlockState::new(last_executed_block));
3257            self.tree.canonical_in_memory_state =
3258                CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
3259
3260            self.blocks = blocks.clone();
3261
3262            let recovered_blocks =
3263                blocks.iter().map(|b| b.recovered_block().clone()).collect::<Vec<_>>();
3264
3265            self.persist_blocks(recovered_blocks);
3266
3267            self
3268        }
3269
3270        const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
3271            self.tree.backfill_sync_state = state;
3272            self
3273        }
3274
3275        fn extend_execution_outcome(
3276            &self,
3277            execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
3278        ) {
3279            self.executor_provider.extend(execution_outcomes);
3280        }
3281
3282        fn insert_block(
3283            &mut self,
3284            block: RecoveredBlock<reth_ethereum_primitives::Block>,
3285        ) -> Result<InsertPayloadOk, InsertBlockError<Block>> {
3286            let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3287            self.extend_execution_outcome([execution_outcome]);
3288            self.tree.provider.add_state_root(block.state_root);
3289            self.tree.insert_block(block)
3290        }
3291
3292        async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3293            let fcu_status = fcu_status.into();
3294
3295            self.send_fcu(block_hash, fcu_status).await;
3296
3297            self.check_fcu(block_hash, fcu_status).await;
3298        }
3299
3300        async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3301            let fcu_state = self.fcu_state(block_hash);
3302
3303            let (tx, rx) = oneshot::channel();
3304            self.tree
3305                .on_engine_message(FromEngine::Request(
3306                    BeaconEngineMessage::ForkchoiceUpdated {
3307                        state: fcu_state,
3308                        payload_attrs: None,
3309                        tx,
3310                        version: EngineApiMessageVersion::default(),
3311                    }
3312                    .into(),
3313                ))
3314                .unwrap();
3315
3316            let response = rx.await.unwrap().unwrap().await.unwrap();
3317            match fcu_status.into() {
3318                ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
3319                ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
3320                ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
3321            }
3322        }
3323
3324        async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3325            let fcu_state = self.fcu_state(block_hash);
3326
3327            // check for ForkchoiceUpdated event
3328            let event = self.from_tree_rx.recv().await.unwrap();
3329            match event {
3330                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
3331                    state,
3332                    status,
3333                )) => {
3334                    assert_eq!(state, fcu_state);
3335                    assert_eq!(status, fcu_status.into());
3336                }
3337                _ => panic!("Unexpected event: {:#?}", event),
3338            }
3339        }
3340
3341        const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
3342            ForkchoiceState {
3343                head_block_hash: block_hash,
3344                safe_block_hash: block_hash,
3345                finalized_block_hash: block_hash,
3346            }
3347        }
3348
3349        async fn send_new_payload(
3350            &mut self,
3351            block: RecoveredBlock<reth_ethereum_primitives::Block>,
3352        ) {
3353            let payload = ExecutionPayloadV3::from_block_unchecked(
3354                block.hash(),
3355                &block.clone_sealed_block().into_block(),
3356            );
3357            self.tree
3358                .on_new_payload(ExecutionData {
3359                    payload: payload.into(),
3360                    sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
3361                        parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
3362                        versioned_hashes: vec![],
3363                    }),
3364                })
3365                .unwrap();
3366        }
3367
3368        async fn insert_chain(
3369            &mut self,
3370            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3371        ) {
3372            for block in chain.clone() {
3373                self.insert_block(block.clone()).unwrap();
3374            }
3375            self.check_canon_chain_insertion(chain).await;
3376        }
3377
3378        async fn check_canon_commit(&mut self, hash: B256) {
3379            let event = self.from_tree_rx.recv().await.unwrap();
3380            match event {
3381                EngineApiEvent::BeaconConsensus(
3382                    BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3383                ) => {
3384                    assert_eq!(header.hash(), hash);
3385                }
3386                _ => panic!("Unexpected event: {:#?}", event),
3387            }
3388        }
3389
3390        async fn check_fork_chain_insertion(
3391            &mut self,
3392            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3393        ) {
3394            for block in chain {
3395                self.check_fork_block_added(block.hash()).await;
3396            }
3397        }
3398
3399        async fn check_canon_chain_insertion(
3400            &mut self,
3401            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3402        ) {
3403            for block in chain.clone() {
3404                self.check_canon_block_added(block.hash()).await;
3405            }
3406        }
3407
3408        async fn check_canon_block_added(&mut self, expected_hash: B256) {
3409            let event = self.from_tree_rx.recv().await.unwrap();
3410            match event {
3411                EngineApiEvent::BeaconConsensus(
3412                    BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
3413                ) => {
3414                    assert_eq!(executed.recovered_block.hash(), expected_hash);
3415                }
3416                _ => panic!("Unexpected event: {:#?}", event),
3417            }
3418        }
3419
3420        async fn check_fork_block_added(&mut self, expected_hash: B256) {
3421            let event = self.from_tree_rx.recv().await.unwrap();
3422            match event {
3423                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3424                    executed,
3425                    _,
3426                )) => {
3427                    assert_eq!(executed.recovered_block.hash(), expected_hash);
3428                }
3429                _ => panic!("Unexpected event: {:#?}", event),
3430            }
3431        }
3432
3433        async fn check_invalid_block(&mut self, expected_hash: B256) {
3434            let event = self.from_tree_rx.recv().await.unwrap();
3435            match event {
3436                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
3437                    block,
3438                )) => {
3439                    assert_eq!(block.hash(), expected_hash);
3440                }
3441                _ => panic!("Unexpected event: {:#?}", event),
3442            }
3443        }
3444
3445        fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
3446            let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3447            let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3448
3449            for block in &blocks {
3450                block_data.push((block.hash(), block.clone_block()));
3451                headers_data.push((block.hash(), block.header().clone()));
3452            }
3453
3454            self.provider.extend_blocks(block_data);
3455            self.provider.extend_headers(headers_data);
3456        }
3457
3458        fn setup_range_insertion_for_valid_chain(
3459            &mut self,
3460            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3461        ) {
3462            self.setup_range_insertion_for_chain(chain, None)
3463        }
3464
3465        fn setup_range_insertion_for_invalid_chain(
3466            &mut self,
3467            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3468            index: usize,
3469        ) {
3470            self.setup_range_insertion_for_chain(chain, Some(index))
3471        }
3472
3473        fn setup_range_insertion_for_chain(
3474            &mut self,
3475            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3476            invalid_index: Option<usize>,
3477        ) {
3478            // setting up execution outcomes for the chain, the blocks will be
3479            // executed starting from the oldest, so we need to reverse.
3480            let mut chain_rev = chain;
3481            chain_rev.reverse();
3482
3483            let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3484            for (index, block) in chain_rev.iter().enumerate() {
3485                let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3486                let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3487                    B256::random()
3488                } else {
3489                    block.state_root
3490                };
3491                self.tree.provider.add_state_root(state_root);
3492                execution_outcomes.push(execution_outcome);
3493            }
3494            self.extend_execution_outcome(execution_outcomes);
3495        }
3496
3497        fn check_canon_head(&self, head_hash: B256) {
3498            assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3499        }
3500    }
3501
3502    #[test]
3503    fn test_tree_persist_block_batch() {
3504        let tree_config = TreeConfig::default();
3505        let chain_spec = MAINNET.clone();
3506        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3507
3508        // we need more than tree_config.persistence_threshold() +1 blocks to
3509        // trigger the persistence task.
3510        let blocks: Vec<_> = test_block_builder
3511            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3512            .collect();
3513        let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3514
3515        let mut blocks = vec![];
3516        for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3517            blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3518        }
3519
3520        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3521
3522        // process the message
3523        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3524        test_harness.tree.on_engine_message(msg).unwrap();
3525
3526        // we now should receive the other batch
3527        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3528        match msg {
3529            FromEngine::DownloadedBlocks(blocks) => {
3530                assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3531            }
3532            _ => panic!("unexpected message: {:#?}", msg),
3533        }
3534    }
3535
3536    #[tokio::test]
3537    async fn test_tree_persist_blocks() {
3538        let tree_config = TreeConfig::default();
3539        let chain_spec = MAINNET.clone();
3540        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3541
3542        // we need more than tree_config.persistence_threshold() +1 blocks to
3543        // trigger the persistence task.
3544        let blocks: Vec<_> = test_block_builder
3545            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3546            .collect();
3547        let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3548        std::thread::Builder::new()
3549            .name("Tree Task".to_string())
3550            .spawn(|| test_harness.tree.run())
3551            .unwrap();
3552
3553        // send a message to the tree to enter the main loop.
3554        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3555
3556        let received_action =
3557            test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3558        if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3559            // only blocks.len() - tree_config.memory_block_buffer_target() will be
3560            // persisted
3561            let expected_persist_len =
3562                blocks.len() - tree_config.memory_block_buffer_target() as usize;
3563            assert_eq!(saved_blocks.len(), expected_persist_len);
3564            assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3565        } else {
3566            panic!("unexpected action received {received_action:?}");
3567        }
3568    }
3569
3570    #[tokio::test]
3571    async fn test_in_memory_state_trait_impl() {
3572        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect();
3573        let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3574
3575        for executed_block in blocks {
3576            let sealed_block = executed_block.recovered_block();
3577
3578            let expected_state = BlockState::new(executed_block.clone());
3579
3580            let actual_state_by_hash = test_harness
3581                .tree
3582                .canonical_in_memory_state
3583                .state_by_hash(sealed_block.hash())
3584                .unwrap();
3585            assert_eq!(expected_state, *actual_state_by_hash);
3586
3587            let actual_state_by_number = test_harness
3588                .tree
3589                .canonical_in_memory_state
3590                .state_by_number(sealed_block.number)
3591                .unwrap();
3592            assert_eq!(expected_state, *actual_state_by_number);
3593        }
3594    }
3595
3596    #[tokio::test]
3597    async fn test_engine_request_during_backfill() {
3598        let tree_config = TreeConfig::default();
3599        let blocks: Vec<_> = TestBlockBuilder::eth()
3600            .get_executed_blocks(0..tree_config.persistence_threshold())
3601            .collect();
3602        let mut test_harness = TestHarness::new(MAINNET.clone())
3603            .with_blocks(blocks)
3604            .with_backfill_state(BackfillSyncState::Active);
3605
3606        let (tx, rx) = oneshot::channel();
3607        test_harness
3608            .tree
3609            .on_engine_message(FromEngine::Request(
3610                BeaconEngineMessage::ForkchoiceUpdated {
3611                    state: ForkchoiceState {
3612                        head_block_hash: B256::random(),
3613                        safe_block_hash: B256::random(),
3614                        finalized_block_hash: B256::random(),
3615                    },
3616                    payload_attrs: None,
3617                    tx,
3618                    version: EngineApiMessageVersion::default(),
3619                }
3620                .into(),
3621            ))
3622            .unwrap();
3623
3624        let resp = rx.await.unwrap().unwrap().await.unwrap();
3625        assert!(resp.payload_status.is_syncing());
3626    }
3627
3628    #[test]
3629    fn test_disconnected_payload() {
3630        let s = include_str!("../../test-data/holesky/2.rlp");
3631        let data = Bytes::from_str(s).unwrap();
3632        let block = Block::decode(&mut data.as_ref()).unwrap();
3633        let sealed = block.seal_slow();
3634        let hash = sealed.hash();
3635        let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block());
3636
3637        let mut test_harness = TestHarness::new(HOLESKY.clone());
3638
3639        let outcome = test_harness
3640            .tree
3641            .on_new_payload(ExecutionData {
3642                payload: payload.into(),
3643                sidecar: ExecutionPayloadSidecar::none(),
3644            })
3645            .unwrap();
3646        assert!(outcome.outcome.is_syncing());
3647
3648        // ensure block is buffered
3649        let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3650        assert_eq!(buffered.clone_sealed_block(), sealed);
3651    }
3652
3653    #[test]
3654    fn test_disconnected_block() {
3655        let s = include_str!("../../test-data/holesky/2.rlp");
3656        let data = Bytes::from_str(s).unwrap();
3657        let block = Block::decode(&mut data.as_ref()).unwrap();
3658        let sealed = block.seal_slow().try_recover().unwrap();
3659
3660        let mut test_harness = TestHarness::new(HOLESKY.clone());
3661
3662        let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap();
3663        assert_eq!(
3664            outcome,
3665            InsertPayloadOk::Inserted(BlockStatus::Disconnected {
3666                head: test_harness.tree.state.tree_state.current_canonical_head,
3667                missing_ancestor: sealed.parent_num_hash()
3668            })
3669        );
3670    }
3671
3672    #[tokio::test]
3673    async fn test_holesky_payload() {
3674        let s = include_str!("../../test-data/holesky/1.rlp");
3675        let data = Bytes::from_str(s).unwrap();
3676        let block: Block = Block::decode(&mut data.as_ref()).unwrap();
3677        let sealed = block.seal_slow();
3678        let payload =
3679            ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
3680
3681        let mut test_harness =
3682            TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3683
3684        let (tx, rx) = oneshot::channel();
3685        test_harness
3686            .tree
3687            .on_engine_message(FromEngine::Request(
3688                BeaconEngineMessage::NewPayload {
3689                    payload: ExecutionData {
3690                        payload: payload.clone().into(),
3691                        sidecar: ExecutionPayloadSidecar::none(),
3692                    },
3693                    tx,
3694                }
3695                .into(),
3696            ))
3697            .unwrap();
3698
3699        let resp = rx.await.unwrap().unwrap();
3700        assert!(resp.is_syncing());
3701    }
3702
3703    #[test]
3704    fn test_tree_state_normal_descendant() {
3705        let mut tree_state = TreeState::new(BlockNumHash::default());
3706        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3707
3708        tree_state.insert_executed(blocks[0].clone());
3709        assert!(tree_state.is_descendant(
3710            blocks[0].recovered_block().num_hash(),
3711            blocks[1].recovered_block().header()
3712        ));
3713
3714        tree_state.insert_executed(blocks[1].clone());
3715
3716        assert!(tree_state.is_descendant(
3717            blocks[0].recovered_block().num_hash(),
3718            blocks[2].recovered_block().header()
3719        ));
3720        assert!(tree_state.is_descendant(
3721            blocks[1].recovered_block().num_hash(),
3722            blocks[2].recovered_block().header()
3723        ));
3724    }
3725
3726    #[tokio::test]
3727    async fn test_tree_state_insert_executed() {
3728        let mut tree_state = TreeState::new(BlockNumHash::default());
3729        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3730
3731        tree_state.insert_executed(blocks[0].clone());
3732        tree_state.insert_executed(blocks[1].clone());
3733
3734        assert_eq!(
3735            tree_state.parent_to_child.get(&blocks[0].recovered_block().hash()),
3736            Some(&HashSet::from_iter([blocks[1].recovered_block().hash()]))
3737        );
3738
3739        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3740
3741        tree_state.insert_executed(blocks[2].clone());
3742
3743        assert_eq!(
3744            tree_state.parent_to_child.get(&blocks[1].recovered_block().hash()),
3745            Some(&HashSet::from_iter([blocks[2].recovered_block().hash()]))
3746        );
3747        assert!(tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3748
3749        assert!(!tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3750    }
3751
3752    #[tokio::test]
3753    async fn test_tree_state_insert_executed_with_reorg() {
3754        let mut tree_state = TreeState::new(BlockNumHash::default());
3755        let mut test_block_builder = TestBlockBuilder::eth();
3756        let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3757
3758        for block in &blocks {
3759            tree_state.insert_executed(block.clone());
3760        }
3761        assert_eq!(tree_state.blocks_by_hash.len(), 5);
3762
3763        let fork_block_3 = test_block_builder
3764            .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3765        let fork_block_4 = test_block_builder
3766            .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3767        let fork_block_5 = test_block_builder
3768            .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3769
3770        tree_state.insert_executed(fork_block_3.clone());
3771        tree_state.insert_executed(fork_block_4.clone());
3772        tree_state.insert_executed(fork_block_5.clone());
3773
3774        assert_eq!(tree_state.blocks_by_hash.len(), 8);
3775        assert_eq!(tree_state.blocks_by_number[&3].len(), 2); // two blocks at height 3 (original and fork)
3776        assert_eq!(tree_state.parent_to_child[&blocks[1].recovered_block().hash()].len(), 2); // block 2 should have two children
3777
3778        // verify that we can insert the same block again without issues
3779        tree_state.insert_executed(fork_block_4.clone());
3780        assert_eq!(tree_state.blocks_by_hash.len(), 8);
3781
3782        assert!(tree_state.parent_to_child[&fork_block_3.recovered_block().hash()]
3783            .contains(&fork_block_4.recovered_block().hash()));
3784        assert!(tree_state.parent_to_child[&fork_block_4.recovered_block().hash()]
3785            .contains(&fork_block_5.recovered_block().hash()));
3786
3787        assert_eq!(tree_state.blocks_by_number[&4].len(), 2);
3788        assert_eq!(tree_state.blocks_by_number[&5].len(), 2);
3789    }
3790
3791    #[tokio::test]
3792    async fn test_tree_state_remove_before() {
3793        let start_num_hash = BlockNumHash::default();
3794        let mut tree_state = TreeState::new(start_num_hash);
3795        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3796
3797        for block in &blocks {
3798            tree_state.insert_executed(block.clone());
3799        }
3800
3801        let last = blocks.last().unwrap();
3802
3803        // set the canonical head
3804        tree_state.set_canonical_head(last.recovered_block().num_hash());
3805
3806        // inclusive bound, so we should remove anything up to and including 2
3807        tree_state.remove_until(
3808            BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3809            start_num_hash.hash,
3810            Some(blocks[1].recovered_block().num_hash()),
3811        );
3812
3813        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3814        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3815        assert!(!tree_state.blocks_by_number.contains_key(&1));
3816        assert!(!tree_state.blocks_by_number.contains_key(&2));
3817
3818        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3819        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3820        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3821        assert!(tree_state.blocks_by_number.contains_key(&3));
3822        assert!(tree_state.blocks_by_number.contains_key(&4));
3823        assert!(tree_state.blocks_by_number.contains_key(&5));
3824
3825        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3826        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3827        assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3828        assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3829        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3830
3831        assert_eq!(
3832            tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3833            Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3834        );
3835        assert_eq!(
3836            tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3837            Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3838        );
3839    }
3840
3841    #[tokio::test]
3842    async fn test_tree_state_remove_before_finalized() {
3843        let start_num_hash = BlockNumHash::default();
3844        let mut tree_state = TreeState::new(start_num_hash);
3845        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3846
3847        for block in &blocks {
3848            tree_state.insert_executed(block.clone());
3849        }
3850
3851        let last = blocks.last().unwrap();
3852
3853        // set the canonical head
3854        tree_state.set_canonical_head(last.recovered_block().num_hash());
3855
3856        // we should still remove everything up to and including 2
3857        tree_state.remove_until(
3858            BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3859            start_num_hash.hash,
3860            None,
3861        );
3862
3863        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3864        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3865        assert!(!tree_state.blocks_by_number.contains_key(&1));
3866        assert!(!tree_state.blocks_by_number.contains_key(&2));
3867
3868        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3869        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3870        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3871        assert!(tree_state.blocks_by_number.contains_key(&3));
3872        assert!(tree_state.blocks_by_number.contains_key(&4));
3873        assert!(tree_state.blocks_by_number.contains_key(&5));
3874
3875        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3876        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3877        assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3878        assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3879        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3880
3881        assert_eq!(
3882            tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3883            Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3884        );
3885        assert_eq!(
3886            tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3887            Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3888        );
3889    }
3890
3891    #[tokio::test]
3892    async fn test_tree_state_remove_before_lower_finalized() {
3893        let start_num_hash = BlockNumHash::default();
3894        let mut tree_state = TreeState::new(start_num_hash);
3895        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3896
3897        for block in &blocks {
3898            tree_state.insert_executed(block.clone());
3899        }
3900
3901        let last = blocks.last().unwrap();
3902
3903        // set the canonical head
3904        tree_state.set_canonical_head(last.recovered_block().num_hash());
3905
3906        // we have no forks so we should still remove anything up to and including 2
3907        tree_state.remove_until(
3908            BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3909            start_num_hash.hash,
3910            Some(blocks[0].recovered_block().num_hash()),
3911        );
3912
3913        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3914        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3915        assert!(!tree_state.blocks_by_number.contains_key(&1));
3916        assert!(!tree_state.blocks_by_number.contains_key(&2));
3917
3918        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3919        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3920        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3921        assert!(tree_state.blocks_by_number.contains_key(&3));
3922        assert!(tree_state.blocks_by_number.contains_key(&4));
3923        assert!(tree_state.blocks_by_number.contains_key(&5));
3924
3925        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3926        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3927        assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3928        assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3929        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3930
3931        assert_eq!(
3932            tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3933            Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3934        );
3935        assert_eq!(
3936            tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3937            Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3938        );
3939    }
3940
3941    #[tokio::test]
3942    async fn test_tree_state_on_new_head_reorg() {
3943        reth_tracing::init_test_tracing();
3944        let chain_spec = MAINNET.clone();
3945
3946        // Set persistence_threshold to 1
3947        let mut test_harness = TestHarness::new(chain_spec);
3948        test_harness.tree.config = test_harness
3949            .tree
3950            .config
3951            .with_persistence_threshold(1)
3952            .with_memory_block_buffer_target(1);
3953        let mut test_block_builder = TestBlockBuilder::eth();
3954        let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3955
3956        for block in &blocks {
3957            test_harness.tree.state.tree_state.insert_executed(block.clone());
3958        }
3959
3960        // set block 3 as the current canonical head
3961        test_harness
3962            .tree
3963            .state
3964            .tree_state
3965            .set_canonical_head(blocks[2].recovered_block().num_hash());
3966
3967        // create a fork from block 2
3968        let fork_block_3 = test_block_builder
3969            .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3970        let fork_block_4 = test_block_builder
3971            .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3972        let fork_block_5 = test_block_builder
3973            .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3974
3975        test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3976        test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3977        test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3978
3979        // normal (non-reorg) case
3980        let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap();
3981        assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3982        if let Some(NewCanonicalChain::Commit { new }) = result {
3983            assert_eq!(new.len(), 2);
3984            assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash());
3985            assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash());
3986        }
3987
3988        // should be a None persistence action before we advance persistence
3989        let current_action = test_harness.tree.persistence_state.current_action();
3990        assert_eq!(current_action, None);
3991
3992        // let's attempt to persist and check that it attempts to save blocks
3993        //
3994        // since in-memory block buffer target and persistence_threshold are both 1, this should
3995        // save all but the current tip of the canonical chain (up to blocks[1])
3996        test_harness.tree.advance_persistence().unwrap();
3997        let current_action = test_harness.tree.persistence_state.current_action().cloned();
3998        assert_eq!(
3999            current_action,
4000            Some(CurrentPersistenceAction::SavingBlocks {
4001                highest: blocks[1].recovered_block().num_hash()
4002            })
4003        );
4004
4005        // get rid of the prev action
4006        let received_action = test_harness.action_rx.recv().unwrap();
4007        let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
4008            panic!("received wrong action");
4009        };
4010        assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
4011
4012        // send the response so we can advance again
4013        sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap();
4014
4015        // we should be persisting blocks[1] because we threw out the prev action
4016        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4017        assert_eq!(
4018            current_action,
4019            Some(CurrentPersistenceAction::SavingBlocks {
4020                highest: blocks[1].recovered_block().num_hash()
4021            })
4022        );
4023
4024        // after advancing persistence, we should be at `None` for the next action
4025        test_harness.tree.advance_persistence().unwrap();
4026        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4027        assert_eq!(current_action, None);
4028
4029        // reorg case
4030        let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap();
4031        assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
4032
4033        if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4034            assert_eq!(new.len(), 3);
4035            assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash());
4036            assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash());
4037            assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash());
4038
4039            assert_eq!(old.len(), 1);
4040            assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash());
4041        }
4042
4043        // The canonical block has not changed, so we will not get any active persistence action
4044        test_harness.tree.advance_persistence().unwrap();
4045        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4046        assert_eq!(current_action, None);
4047
4048        // Let's change the canonical head and advance persistence
4049        test_harness
4050            .tree
4051            .state
4052            .tree_state
4053            .set_canonical_head(fork_block_5.recovered_block().num_hash());
4054
4055        // The canonical block has changed now, we should get fork_block_4 due to the persistence
4056        // threshold and in memory block buffer target
4057        test_harness.tree.advance_persistence().unwrap();
4058        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4059        assert_eq!(
4060            current_action,
4061            Some(CurrentPersistenceAction::SavingBlocks {
4062                highest: fork_block_4.recovered_block().num_hash()
4063            })
4064        );
4065    }
4066
4067    #[test]
4068    fn test_tree_state_on_new_head_deep_fork() {
4069        reth_tracing::init_test_tracing();
4070
4071        let chain_spec = MAINNET.clone();
4072        let mut test_harness = TestHarness::new(chain_spec);
4073        let mut test_block_builder = TestBlockBuilder::eth();
4074
4075        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4076
4077        for block in &blocks {
4078            test_harness.tree.state.tree_state.insert_executed(block.clone());
4079        }
4080
4081        // set last block as the current canonical head
4082        let last_block = blocks.last().unwrap().recovered_block().clone();
4083
4084        test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
4085
4086        // create a fork chain from last_block
4087        let chain_a = test_block_builder.create_fork(&last_block, 10);
4088        let chain_b = test_block_builder.create_fork(&last_block, 10);
4089
4090        for block in &chain_a {
4091            test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4092                block: ExecutedBlock {
4093                    recovered_block: Arc::new(block.clone()),
4094                    execution_output: Arc::new(ExecutionOutcome::default()),
4095                    hashed_state: Arc::new(HashedPostState::default()),
4096                },
4097                trie: Arc::new(TrieUpdates::default()),
4098            });
4099        }
4100        test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
4101
4102        for block in &chain_b {
4103            test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4104                block: ExecutedBlock {
4105                    recovered_block: Arc::new(block.clone()),
4106                    execution_output: Arc::new(ExecutionOutcome::default()),
4107                    hashed_state: Arc::new(HashedPostState::default()),
4108                },
4109                trie: Arc::new(TrieUpdates::default()),
4110            });
4111        }
4112
4113        // for each block in chain_b, reorg to it and then back to canonical
4114        let mut expected_new = Vec::new();
4115        for block in &chain_b {
4116            // reorg to chain from block b
4117            let result = test_harness.tree.on_new_head(block.hash()).unwrap();
4118            assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
4119
4120            expected_new.push(block);
4121            if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4122                assert_eq!(new.len(), expected_new.len());
4123                for (index, block) in expected_new.iter().enumerate() {
4124                    assert_eq!(new[index].recovered_block().hash(), block.hash());
4125                }
4126
4127                assert_eq!(old.len(), chain_a.len());
4128                for (index, block) in chain_a.iter().enumerate() {
4129                    assert_eq!(old[index].recovered_block().hash(), block.hash());
4130                }
4131            }
4132
4133            // set last block of chain a as canonical head
4134            test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
4135        }
4136    }
4137
4138    #[tokio::test]
4139    async fn test_get_canonical_blocks_to_persist() {
4140        let chain_spec = MAINNET.clone();
4141        let mut test_harness = TestHarness::new(chain_spec);
4142        let mut test_block_builder = TestBlockBuilder::eth();
4143
4144        let canonical_head_number = 9;
4145        let blocks: Vec<_> =
4146            test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
4147        test_harness = test_harness.with_blocks(blocks.clone());
4148
4149        let last_persisted_block_number = 3;
4150        test_harness.tree.persistence_state.last_persisted_block =
4151            blocks[last_persisted_block_number as usize].recovered_block.num_hash();
4152
4153        let persistence_threshold = 4;
4154        let memory_block_buffer_target = 3;
4155        test_harness.tree.config = TreeConfig::default()
4156            .with_persistence_threshold(persistence_threshold)
4157            .with_memory_block_buffer_target(memory_block_buffer_target);
4158
4159        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4160
4161        let expected_blocks_to_persist_length: usize =
4162            (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
4163                .try_into()
4164                .unwrap();
4165
4166        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4167        for (i, item) in
4168            blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
4169        {
4170            assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
4171        }
4172
4173        // make sure only canonical blocks are included
4174        let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
4175        let fork_block_hash = fork_block.recovered_block().hash();
4176        test_harness.tree.state.tree_state.insert_executed(fork_block);
4177
4178        assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
4179
4180        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4181        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4182
4183        // check that the fork block is not included in the blocks to persist
4184        assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
4185
4186        // check that the original block 4 is still included
4187        assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
4188            b.recovered_block().hash() == blocks[4].recovered_block().hash()));
4189
4190        // check that if we advance persistence, the persistence action is the correct value
4191        test_harness.tree.advance_persistence().expect("advancing persistence should succeed");
4192        assert_eq!(
4193            test_harness.tree.persistence_state.current_action().cloned(),
4194            Some(CurrentPersistenceAction::SavingBlocks {
4195                highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
4196            })
4197        );
4198    }
4199
4200    #[tokio::test]
4201    async fn test_engine_tree_fcu_missing_head() {
4202        let chain_spec = MAINNET.clone();
4203        let mut test_harness = TestHarness::new(chain_spec.clone());
4204
4205        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
4206
4207        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4208        test_harness = test_harness.with_blocks(blocks);
4209
4210        let missing_block = test_block_builder
4211            .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash());
4212
4213        test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
4214
4215        // after FCU we receive an EngineApiEvent::Download event to get the missing block.
4216        let event = test_harness.from_tree_rx.recv().await.unwrap();
4217        match event {
4218            EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
4219                let expected_block_set = HashSet::from_iter([missing_block.hash()]);
4220                assert_eq!(actual_block_set, expected_block_set);
4221            }
4222            _ => panic!("Unexpected event: {:#?}", event),
4223        }
4224    }
4225
4226    #[tokio::test]
4227    async fn test_engine_tree_fcu_canon_chain_insertion() {
4228        let chain_spec = MAINNET.clone();
4229        let mut test_harness = TestHarness::new(chain_spec.clone());
4230
4231        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4232        test_harness = test_harness.with_blocks(base_chain.clone());
4233
4234        test_harness
4235            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4236            .await;
4237
4238        // extend main chain
4239        let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3);
4240
4241        test_harness.insert_chain(main_chain).await;
4242    }
4243
4244    #[tokio::test]
4245    async fn test_engine_tree_fcu_reorg_with_all_blocks() {
4246        let chain_spec = MAINNET.clone();
4247        let mut test_harness = TestHarness::new(chain_spec.clone());
4248
4249        let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
4250        test_harness = test_harness.with_blocks(main_chain.clone());
4251
4252        let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3);
4253        let fork_chain_last_hash = fork_chain.last().unwrap().hash();
4254
4255        // add fork blocks to the tree
4256        for block in &fork_chain {
4257            test_harness.insert_block(block.clone()).unwrap();
4258        }
4259
4260        test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4261
4262        // check for ForkBlockAdded events, we expect fork_chain.len() blocks added
4263        test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
4264
4265        // check for CanonicalChainCommitted event
4266        test_harness.check_canon_commit(fork_chain_last_hash).await;
4267
4268        test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4269
4270        // new head is the tip of the fork chain
4271        test_harness.check_canon_head(fork_chain_last_hash);
4272    }
4273
4274    #[tokio::test]
4275    async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
4276        reth_tracing::init_test_tracing();
4277
4278        let chain_spec = MAINNET.clone();
4279        let mut test_harness = TestHarness::new(chain_spec.clone());
4280
4281        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4282        test_harness = test_harness.with_blocks(base_chain.clone());
4283
4284        test_harness
4285            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4286            .await;
4287
4288        // extend main chain with enough blocks to trigger pipeline run but don't insert them
4289        let main_chain = test_harness
4290            .block_builder
4291            .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4292
4293        let main_chain_last_hash = main_chain.last().unwrap().hash();
4294        test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4295
4296        test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4297
4298        // create event for backfill finished
4299        let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
4300        let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
4301            block_number: backfill_finished_block_number,
4302        });
4303
4304        let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
4305        // add block to mock provider to enable persistence clean up.
4306        test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
4307        test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
4308
4309        let event = test_harness.from_tree_rx.recv().await.unwrap();
4310        match event {
4311            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4312                assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
4313            }
4314            _ => panic!("Unexpected event: {:#?}", event),
4315        }
4316
4317        test_harness
4318            .tree
4319            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
4320                .last()
4321                .unwrap()
4322                .clone()]))
4323            .unwrap();
4324
4325        let event = test_harness.from_tree_rx.recv().await.unwrap();
4326        match event {
4327            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4328                assert_eq!(
4329                    total_blocks,
4330                    (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
4331                );
4332                assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
4333            }
4334            _ => panic!("Unexpected event: {:#?}", event),
4335        }
4336    }
4337
4338    #[tokio::test]
4339    async fn test_engine_tree_live_sync_transition_eventually_canonical() {
4340        reth_tracing::init_test_tracing();
4341
4342        let chain_spec = MAINNET.clone();
4343        let mut test_harness = TestHarness::new(chain_spec.clone());
4344        test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
4345
4346        // create base chain and setup test harness with it
4347        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4348        test_harness = test_harness.with_blocks(base_chain.clone());
4349
4350        // fcu to the tip of base chain
4351        test_harness
4352            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4353            .await;
4354
4355        // create main chain, extension of base chain, with enough blocks to
4356        // trigger backfill sync
4357        let main_chain = test_harness
4358            .block_builder
4359            .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4360
4361        let main_chain_last = main_chain.last().unwrap();
4362        let main_chain_last_hash = main_chain_last.hash();
4363        let main_chain_backfill_target =
4364            main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
4365        let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
4366
4367        // fcu to the element of main chain that should trigger backfill sync
4368        test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4369        test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4370
4371        // check download request for target
4372        let event = test_harness.from_tree_rx.recv().await.unwrap();
4373        match event {
4374            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4375                assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
4376            }
4377            _ => panic!("Unexpected event: {:#?}", event),
4378        }
4379
4380        // send message to tell the engine the requested block was downloaded
4381        test_harness
4382            .tree
4383            .on_engine_message(FromEngine::DownloadedBlocks(vec![
4384                main_chain_backfill_target.clone()
4385            ]))
4386            .unwrap();
4387
4388        // check that backfill is triggered
4389        let event = test_harness.from_tree_rx.recv().await.unwrap();
4390        match event {
4391            EngineApiEvent::BackfillAction(BackfillAction::Start(
4392                reth_stages::PipelineTarget::Sync(target_hash),
4393            )) => {
4394                assert_eq!(target_hash, main_chain_backfill_target_hash);
4395            }
4396            _ => panic!("Unexpected event: {:#?}", event),
4397        }
4398
4399        // persist blocks of main chain, same as the backfill operation would do
4400        let backfilled_chain: Vec<_> =
4401            main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
4402        test_harness.persist_blocks(backfilled_chain.clone());
4403
4404        test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
4405
4406        // send message to mark backfill finished
4407        test_harness
4408            .tree
4409            .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
4410                ControlFlow::Continue { block_number: main_chain_backfill_target.number },
4411            )))
4412            .unwrap();
4413
4414        // send fcu to the tip of main
4415        test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4416
4417        let event = test_harness.from_tree_rx.recv().await.unwrap();
4418        match event {
4419            EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
4420                assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
4421            }
4422            _ => panic!("Unexpected event: {:#?}", event),
4423        }
4424
4425        // tell engine main chain tip downloaded
4426        test_harness
4427            .tree
4428            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
4429            .unwrap();
4430
4431        // check download range request
4432        let event = test_harness.from_tree_rx.recv().await.unwrap();
4433        match event {
4434            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4435                assert_eq!(
4436                    total_blocks,
4437                    (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
4438                );
4439                assert_eq!(initial_hash, main_chain_last.parent_hash);
4440            }
4441            _ => panic!("Unexpected event: {:#?}", event),
4442        }
4443
4444        let remaining: Vec<_> = main_chain
4445            .clone()
4446            .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
4447            .collect();
4448
4449        test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
4450
4451        // tell engine block range downloaded
4452        test_harness
4453            .tree
4454            .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
4455            .unwrap();
4456
4457        test_harness.check_canon_chain_insertion(remaining).await;
4458
4459        // check canonical chain committed event with the hash of the latest block
4460        test_harness.check_canon_commit(main_chain_last_hash).await;
4461
4462        // new head is the tip of the main chain
4463        test_harness.check_canon_head(main_chain_last_hash);
4464    }
4465
4466    #[tokio::test]
4467    async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
4468        reth_tracing::init_test_tracing();
4469
4470        let chain_spec = MAINNET.clone();
4471        let mut test_harness = TestHarness::new(chain_spec.clone());
4472
4473        // create base chain and setup test harness with it
4474        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4475        test_harness = test_harness.with_blocks(base_chain.clone());
4476
4477        // fcu to the tip of base chain
4478        test_harness
4479            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4480            .await;
4481
4482        // create main chain, extension of base chain
4483        let main_chain =
4484            test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10);
4485        // determine target in the middle of main hain
4486        let target = main_chain.get(5).unwrap();
4487        let target_hash = target.hash();
4488        let main_last = main_chain.last().unwrap();
4489        let main_last_hash = main_last.hash();
4490
4491        // insert main chain
4492        test_harness.insert_chain(main_chain).await;
4493
4494        // send fcu to target
4495        test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4496
4497        test_harness.check_canon_commit(target_hash).await;
4498        test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4499
4500        // send fcu to main tip
4501        test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4502
4503        test_harness.check_canon_commit(main_last_hash).await;
4504        test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4505        test_harness.check_canon_head(main_last_hash);
4506    }
4507
4508    #[tokio::test]
4509    async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4510        reth_tracing::init_test_tracing();
4511
4512        let chain_spec = MAINNET.clone();
4513        let mut test_harness = TestHarness::new(chain_spec.clone());
4514
4515        // create base chain and setup test harness with it
4516        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4517        test_harness = test_harness.with_blocks(base_chain.clone());
4518
4519        let old_head = base_chain.first().unwrap().recovered_block();
4520
4521        // extend base chain
4522        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4523        let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4524
4525        test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4526        test_harness.insert_chain(extension_chain).await;
4527
4528        // fcu to old_head
4529        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4530
4531        // create two competing chains starting from fork_block
4532        let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4533        let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4534
4535        // insert chain A blocks using newPayload
4536        test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4537        for block in &chain_a {
4538            test_harness.send_new_payload(block.clone()).await;
4539        }
4540
4541        test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4542
4543        // insert chain B blocks using newPayload
4544        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4545        for block in &chain_b {
4546            test_harness.send_new_payload(block.clone()).await;
4547        }
4548
4549        test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4550
4551        // send FCU to make the tip of chain B the new head
4552        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4553        test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4554
4555        // check for CanonicalChainCommitted event
4556        test_harness.check_canon_commit(chain_b_tip_hash).await;
4557
4558        // verify FCU was processed
4559        test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4560
4561        // verify the new canonical head
4562        test_harness.check_canon_head(chain_b_tip_hash);
4563
4564        // verify that chain A is now considered a fork
4565        assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4566    }
4567
4568    #[tokio::test]
4569    async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4570        let chain_spec = MAINNET.clone();
4571        let mut test_harness = TestHarness::new(chain_spec.clone());
4572
4573        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4574        test_harness = test_harness.with_blocks(base_chain.clone());
4575
4576        // side chain consisting of two blocks, the last will be inserted first
4577        // so that we force it to be buffered
4578        let side_chain =
4579            test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2);
4580
4581        // buffer last block of side chain
4582        let buffered_block = side_chain.last().unwrap();
4583        let buffered_block_hash = buffered_block.hash();
4584
4585        test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4586        test_harness.send_new_payload(buffered_block.clone()).await;
4587
4588        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4589
4590        let non_buffered_block = side_chain.first().unwrap();
4591        let non_buffered_block_hash = non_buffered_block.hash();
4592
4593        // insert block that continues the canon chain, should not be buffered
4594        test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4595        test_harness.send_new_payload(non_buffered_block.clone()).await;
4596        assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4597
4598        // the previously buffered block should be connected now
4599        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4600
4601        // both blocks are added to the canon chain in order
4602        test_harness.check_canon_block_added(non_buffered_block_hash).await;
4603        test_harness.check_canon_block_added(buffered_block_hash).await;
4604    }
4605
4606    #[tokio::test]
4607    async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4608        reth_tracing::init_test_tracing();
4609
4610        let chain_spec = MAINNET.clone();
4611        let mut test_harness = TestHarness::new(chain_spec.clone());
4612
4613        // create base chain and setup test harness with it
4614        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4615        test_harness = test_harness.with_blocks(base_chain.clone());
4616
4617        let old_head = base_chain.first().unwrap().recovered_block();
4618
4619        // extend base chain
4620        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4621        let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4622        test_harness.insert_chain(extension_chain).await;
4623
4624        // fcu to old_head
4625        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4626
4627        // create two competing chains starting from fork_block, one of them invalid
4628        let total_fork_elements = 10;
4629        let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4630        let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4631
4632        // insert chain B blocks using newPayload
4633        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4634        for block in &chain_b {
4635            test_harness.send_new_payload(block.clone()).await;
4636            test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4637            test_harness.check_canon_block_added(block.hash()).await;
4638            test_harness.check_canon_commit(block.hash()).await;
4639            test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4640        }
4641
4642        // insert chain A blocks using newPayload, one of the blocks will be invalid
4643        let invalid_index = 3;
4644        test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4645        for block in &chain_a {
4646            test_harness.send_new_payload(block.clone()).await;
4647        }
4648
4649        // check canon chain insertion up to the invalid index and taking into
4650        // account reversed ordering
4651        test_harness
4652            .check_fork_chain_insertion(
4653                chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4654            )
4655            .await;
4656        for block in &chain_a[chain_a.len() - invalid_index - 1..] {
4657            test_harness.check_invalid_block(block.hash()).await;
4658        }
4659
4660        // send FCU to make the tip of chain A, expect invalid
4661        let chain_a_tip_hash = chain_a.last().unwrap().hash();
4662        test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4663
4664        // send FCU to make the tip of chain B the new head
4665        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4666
4667        // verify the new canonical head
4668        test_harness.check_canon_head(chain_b_tip_hash);
4669
4670        // verify the canonical head didn't change
4671        test_harness.check_canon_head(chain_b_tip_hash);
4672    }
4673
4674    #[tokio::test]
4675    async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4676        reth_tracing::init_test_tracing();
4677        let chain_spec = MAINNET.clone();
4678        let mut test_harness = TestHarness::new(chain_spec.clone());
4679
4680        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4681        test_harness = test_harness.with_blocks(base_chain.clone());
4682
4683        // create a side chain with an invalid block
4684        let side_chain = test_harness
4685            .block_builder
4686            .create_fork(base_chain.last().unwrap().recovered_block(), 15);
4687        let invalid_index = 9;
4688
4689        test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4690
4691        for (index, block) in side_chain.iter().enumerate() {
4692            test_harness.send_new_payload(block.clone()).await;
4693
4694            if index < side_chain.len() - invalid_index - 1 {
4695                test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4696            }
4697        }
4698
4699        // Try to do a forkchoice update to a block after the invalid one
4700        let fork_tip_hash = side_chain.last().unwrap().hash();
4701        test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4702    }
4703}