reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{
7        cached_state::CachedStateProvider, executor::WorkloadExecutor, metrics::EngineApiMetrics,
8    },
9};
10use alloy_consensus::BlockHeader;
11use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash};
12use alloy_primitives::{
13    map::{HashMap, HashSet},
14    BlockNumber, B256, U256,
15};
16use alloy_rpc_types_engine::{
17    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
18};
19use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
20use payload_processor::sparse_trie::StateRootComputeOutcome;
21use persistence_state::CurrentPersistenceAction;
22use reth_chain_state::{
23    CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
24    MemoryOverlayStateProvider, NewCanonicalChain,
25};
26use reth_consensus::{Consensus, FullConsensus};
27pub use reth_engine_primitives::InvalidBlockHook;
28use reth_engine_primitives::{
29    BeaconConsensusEngineEvent, BeaconEngineMessage, BeaconOnNewPayloadError, EngineTypes,
30    EngineValidator, ExecutionPayload, ForkchoiceStateTracker, OnForkChoiceUpdated,
31};
32use reth_errors::{ConsensusError, ProviderResult};
33use reth_ethereum_primitives::EthPrimitives;
34use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
35use reth_payload_builder::PayloadBuilderHandle;
36use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes};
37use reth_primitives_traits::{
38    Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
39};
40use reth_provider::{
41    providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
42    ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
43    StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
44};
45use reth_revm::database::StateProviderDatabase;
46use reth_stages_api::ControlFlow;
47use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
48use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
49use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
50use std::{
51    cmp::Ordering,
52    collections::{btree_map, hash_map, BTreeMap, VecDeque},
53    fmt::Debug,
54    ops::Bound,
55    sync::{
56        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
57        Arc,
58    },
59    time::Instant,
60};
61use tokio::sync::{
62    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
63    oneshot::{self, error::TryRecvError},
64};
65use tracing::*;
66
67mod block_buffer;
68mod cached_state;
69pub mod error;
70mod invalid_block_hook;
71mod invalid_headers;
72mod metrics;
73mod payload_processor;
74mod persistence_state;
75// TODO(alexey): compare trie updates in `insert_block_inner`
76#[allow(unused)]
77mod trie_updates;
78
79use crate::tree::error::AdvancePersistenceError;
80pub use block_buffer::BlockBuffer;
81pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
82pub use invalid_headers::InvalidHeaderCache;
83pub use payload_processor::*;
84pub use persistence_state::PersistenceState;
85pub use reth_engine_primitives::TreeConfig;
86use reth_evm::execute::BlockExecutionOutput;
87
88/// The largest gap for which the tree will be used for sync. See docs for `pipeline_run_threshold`
89/// for more information.
90///
91/// This is the default threshold, the distance to the head that the tree will be used for sync.
92/// If the distance exceeds this threshold, the pipeline will be used for sync.
93pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
94
95/// Keeps track of the state of the tree.
96///
97/// ## Invariants
98///
99/// - This only stores blocks that are connected to the canonical chain.
100/// - All executed blocks are valid and have been executed.
101#[derive(Debug, Default)]
102pub struct TreeState<N: NodePrimitives = EthPrimitives> {
103    /// __All__ unique executed blocks by block hash that are connected to the canonical chain.
104    ///
105    /// This includes blocks of all forks.
106    blocks_by_hash: HashMap<B256, ExecutedBlockWithTrieUpdates<N>>,
107    /// Executed blocks grouped by their respective block number.
108    ///
109    /// This maps unique block number to all known blocks for that height.
110    ///
111    /// Note: there can be multiple blocks at the same height due to forks.
112    blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlockWithTrieUpdates<N>>>,
113    /// Map of any parent block hash to its children.
114    parent_to_child: HashMap<B256, HashSet<B256>>,
115    /// Map of hash to trie updates for canonical blocks that are persisted but not finalized.
116    ///
117    /// Contains the block number for easy removal.
118    persisted_trie_updates: HashMap<B256, (BlockNumber, Arc<TrieUpdates>)>,
119    /// Currently tracked canonical head of the chain.
120    current_canonical_head: BlockNumHash,
121}
122
123impl<N: NodePrimitives> TreeState<N> {
124    /// Returns a new, empty tree state that points to the given canonical head.
125    fn new(current_canonical_head: BlockNumHash) -> Self {
126        Self {
127            blocks_by_hash: HashMap::default(),
128            blocks_by_number: BTreeMap::new(),
129            current_canonical_head,
130            parent_to_child: HashMap::default(),
131            persisted_trie_updates: HashMap::default(),
132        }
133    }
134
135    /// Resets the state and points to the given canonical head.
136    fn reset(&mut self, current_canonical_head: BlockNumHash) {
137        *self = Self::new(current_canonical_head);
138    }
139
140    /// Returns the number of executed blocks stored.
141    fn block_count(&self) -> usize {
142        self.blocks_by_hash.len()
143    }
144
145    /// Returns the [`ExecutedBlockWithTrieUpdates`] by hash.
146    fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlockWithTrieUpdates<N>> {
147        self.blocks_by_hash.get(&hash)
148    }
149
150    /// Returns the block by hash.
151    fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock<N::Block>>> {
152        self.blocks_by_hash.get(&hash).map(|b| Arc::new(b.recovered_block().sealed_block().clone()))
153    }
154
155    /// Returns all available blocks for the given hash that lead back to the canonical chain, from
156    /// newest to oldest. And the parent hash of the oldest block that is missing from the buffer.
157    ///
158    /// Returns `None` if the block for the given hash is not found.
159    fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlockWithTrieUpdates<N>>)> {
160        let block = self.blocks_by_hash.get(&hash).cloned()?;
161        let mut parent_hash = block.recovered_block().parent_hash();
162        let mut blocks = vec![block];
163        while let Some(executed) = self.blocks_by_hash.get(&parent_hash) {
164            parent_hash = executed.recovered_block().parent_hash();
165            blocks.push(executed.clone());
166        }
167
168        Some((parent_hash, blocks))
169    }
170
171    /// Insert executed block into the state.
172    fn insert_executed(&mut self, executed: ExecutedBlockWithTrieUpdates<N>) {
173        let hash = executed.recovered_block().hash();
174        let parent_hash = executed.recovered_block().parent_hash();
175        let block_number = executed.recovered_block().number();
176
177        if self.blocks_by_hash.contains_key(&hash) {
178            return;
179        }
180
181        self.blocks_by_hash.insert(hash, executed.clone());
182
183        self.blocks_by_number.entry(block_number).or_default().push(executed);
184
185        self.parent_to_child.entry(parent_hash).or_default().insert(hash);
186
187        for children in self.parent_to_child.values_mut() {
188            children.retain(|child| self.blocks_by_hash.contains_key(child));
189        }
190    }
191
192    /// Remove single executed block by its hash.
193    ///
194    /// ## Returns
195    ///
196    /// The removed block and the block hashes of its children.
197    fn remove_by_hash(
198        &mut self,
199        hash: B256,
200    ) -> Option<(ExecutedBlockWithTrieUpdates<N>, HashSet<B256>)> {
201        let executed = self.blocks_by_hash.remove(&hash)?;
202
203        // Remove this block from collection of children of its parent block.
204        let parent_entry = self.parent_to_child.entry(executed.recovered_block().parent_hash());
205        if let hash_map::Entry::Occupied(mut entry) = parent_entry {
206            entry.get_mut().remove(&hash);
207
208            if entry.get().is_empty() {
209                entry.remove();
210            }
211        }
212
213        // Remove point to children of this block.
214        let children = self.parent_to_child.remove(&hash).unwrap_or_default();
215
216        // Remove this block from `blocks_by_number`.
217        let block_number_entry = self.blocks_by_number.entry(executed.recovered_block().number());
218        if let btree_map::Entry::Occupied(mut entry) = block_number_entry {
219            // We have to find the index of the block since it exists in a vec
220            if let Some(index) = entry.get().iter().position(|b| b.recovered_block().hash() == hash)
221            {
222                entry.get_mut().swap_remove(index);
223
224                // If there are no blocks left then remove the entry for this block
225                if entry.get().is_empty() {
226                    entry.remove();
227                }
228            }
229        }
230
231        Some((executed, children))
232    }
233
234    /// Returns whether or not the hash is part of the canonical chain.
235    pub(crate) fn is_canonical(&self, hash: B256) -> bool {
236        let mut current_block = self.current_canonical_head.hash;
237        if current_block == hash {
238            return true
239        }
240
241        while let Some(executed) = self.blocks_by_hash.get(&current_block) {
242            current_block = executed.recovered_block().parent_hash();
243            if current_block == hash {
244                return true
245            }
246        }
247
248        false
249    }
250
251    /// Removes canonical blocks below the upper bound, only if the last persisted hash is
252    /// part of the canonical chain.
253    pub(crate) fn remove_canonical_until(
254        &mut self,
255        upper_bound: BlockNumber,
256        last_persisted_hash: B256,
257    ) {
258        debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
259
260        // If the last persisted hash is not canonical, then we don't want to remove any canonical
261        // blocks yet.
262        if !self.is_canonical(last_persisted_hash) {
263            return
264        }
265
266        // First, let's walk back the canonical chain and remove canonical blocks lower than the
267        // upper bound
268        let mut current_block = self.current_canonical_head.hash;
269        while let Some(executed) = self.blocks_by_hash.get(&current_block) {
270            current_block = executed.recovered_block().parent_hash();
271            if executed.recovered_block().number() <= upper_bound {
272                debug!(target: "engine::tree", num_hash=?executed.recovered_block().num_hash(), "Attempting to remove block walking back from the head");
273                if let Some((removed, _)) = self.remove_by_hash(executed.recovered_block().hash()) {
274                    debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed block walking back from the head");
275                    // finally, move the trie updates
276                    self.persisted_trie_updates.insert(
277                        removed.recovered_block().hash(),
278                        (removed.recovered_block().number(), removed.trie),
279                    );
280                }
281            }
282        }
283        debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removed canonical blocks from the tree");
284    }
285
286    /// Removes all blocks that are below the finalized block, as well as removing non-canonical
287    /// sidechains that fork from below the finalized block.
288    pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
289        let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
290
291        // We remove disconnected sidechains in three steps:
292        // * first, remove everything with a block number __below__ the finalized block.
293        // * next, we populate a vec with parents __at__ the finalized block.
294        // * finally, we iterate through the vec, removing children until the vec is empty
295        // (BFS).
296
297        // We _exclude_ the finalized block because we will be dealing with the blocks __at__
298        // the finalized block later.
299        let blocks_to_remove = self
300            .blocks_by_number
301            .range((Bound::Unbounded, Bound::Excluded(finalized_num)))
302            .flat_map(|(_, blocks)| blocks.iter().map(|b| b.recovered_block().hash()))
303            .collect::<Vec<_>>();
304        for hash in blocks_to_remove {
305            if let Some((removed, _)) = self.remove_by_hash(hash) {
306                debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain block");
307            }
308        }
309
310        // remove trie updates that are below the finalized block
311        self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num > finalized_num);
312
313        // The only block that should remain at the `finalized` number now, is the finalized
314        // block, if it exists.
315        //
316        // For all other blocks, we  first put their children into this vec.
317        // Then, we will iterate over them, removing them, adding their children, etc etc,
318        // until the vec is empty.
319        let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
320
321        // re-insert the finalized hash if we removed it
322        if let Some(position) =
323            blocks_to_remove.iter().position(|b| b.recovered_block().hash() == finalized_hash)
324        {
325            let finalized_block = blocks_to_remove.swap_remove(position);
326            self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
327        }
328
329        let mut blocks_to_remove = blocks_to_remove
330            .into_iter()
331            .map(|e| e.recovered_block().hash())
332            .collect::<VecDeque<_>>();
333        while let Some(block) = blocks_to_remove.pop_front() {
334            if let Some((removed, children)) = self.remove_by_hash(block) {
335                debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed finalized sidechain child block");
336                blocks_to_remove.extend(children);
337            }
338        }
339    }
340
341    /// Remove all blocks up to __and including__ the given block number.
342    ///
343    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
344    /// those which have a fork point at or below the finalized hash.
345    ///
346    /// Canonical blocks below the upper bound will still be removed.
347    ///
348    /// NOTE: if the finalized block is greater than the upper bound, the only blocks that will be
349    /// removed are canonical blocks and sidechains that fork below the `upper_bound`. This is the
350    /// same behavior as if the `finalized_num` were `Some(upper_bound)`.
351    pub(crate) fn remove_until(
352        &mut self,
353        upper_bound: BlockNumHash,
354        last_persisted_hash: B256,
355        finalized_num_hash: Option<BlockNumHash>,
356    ) {
357        debug!(target: "engine::tree", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
358
359        // If the finalized num is ahead of the upper bound, and exists, we need to instead ensure
360        // that the only blocks removed, are canonical blocks less than the upper bound
361        let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
362            if upper_bound.number < finalized.number {
363                finalized = upper_bound;
364                debug!(target: "engine::tree", ?finalized, "Adjusted upper bound");
365            }
366            finalized
367        });
368
369        // We want to do two things:
370        // * remove canonical blocks that are persisted
371        // * remove forks whose root are below the finalized block
372        // We can do this in 2 steps:
373        // * remove all canonical blocks below the upper bound
374        // * fetch the number of the finalized hash, removing any sidechains that are __below__ the
375        // finalized block
376        self.remove_canonical_until(upper_bound.number, last_persisted_hash);
377
378        // Now, we have removed canonical blocks (assuming the upper bound is above the finalized
379        // block) and only have sidechains below the finalized block.
380        if let Some(finalized_num_hash) = finalized_num_hash {
381            self.prune_finalized_sidechains(finalized_num_hash);
382        }
383    }
384
385    /// Determines if the second block is a direct descendant of the first block.
386    ///
387    /// If the two blocks are the same, this returns `false`.
388    fn is_descendant(&self, first: BlockNumHash, second: &N::BlockHeader) -> bool {
389        // If the second block's parent is the first block's hash, then it is a direct descendant
390        // and we can return early.
391        if second.parent_hash() == first.hash {
392            return true
393        }
394
395        // If the second block is lower than, or has the same block number, they are not
396        // descendants.
397        if second.number() <= first.number {
398            return false
399        }
400
401        // iterate through parents of the second until we reach the number
402        let Some(mut current_block) = self.block_by_hash(second.parent_hash()) else {
403            // If we can't find its parent in the tree, we can't continue, so return false
404            return false
405        };
406
407        while current_block.number() > first.number + 1 {
408            let Some(block) = self.block_by_hash(current_block.header().parent_hash()) else {
409                // If we can't find its parent in the tree, we can't continue, so return false
410                return false
411            };
412
413            current_block = block;
414        }
415
416        // Now the block numbers should be equal, so we compare hashes.
417        current_block.parent_hash() == first.hash
418    }
419
420    /// Updates the canonical head to the given block.
421    fn set_canonical_head(&mut self, new_head: BlockNumHash) {
422        self.current_canonical_head = new_head;
423    }
424
425    /// Returns the tracked canonical head.
426    const fn canonical_head(&self) -> &BlockNumHash {
427        &self.current_canonical_head
428    }
429
430    /// Returns the block hash of the canonical head.
431    const fn canonical_block_hash(&self) -> B256 {
432        self.canonical_head().hash
433    }
434
435    /// Returns the block number of the canonical head.
436    const fn canonical_block_number(&self) -> BlockNumber {
437        self.canonical_head().number
438    }
439}
440
441/// A builder for creating state providers that can be used across threads.
442#[derive(Clone, Debug)]
443pub struct StateProviderBuilder<N: NodePrimitives, P> {
444    /// The provider factory used to create providers.
445    provider_factory: P,
446    /// The historical block hash to fetch state from.
447    historical: B256,
448    /// The blocks that form the chain from historical to target and are in memory.
449    overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
450}
451
452impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
453    /// Creates a new state provider from the provider factory, historical block hash and optional
454    /// overlaid blocks.
455    pub fn new(
456        provider_factory: P,
457        historical: B256,
458        overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
459    ) -> Self {
460        Self { provider_factory, historical, overlay }
461    }
462}
463
464impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
465where
466    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
467{
468    /// Creates a new state provider from this builder.
469    pub fn build(&self) -> ProviderResult<StateProviderBox> {
470        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
471        if let Some(overlay) = self.overlay.clone() {
472            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
473        }
474        Ok(provider)
475    }
476}
477
478/// Tracks the state of the engine api internals.
479///
480/// This type is not shareable.
481#[derive(Debug)]
482pub struct EngineApiTreeState<N: NodePrimitives> {
483    /// Tracks the state of the blockchain tree.
484    tree_state: TreeState<N>,
485    /// Tracks the forkchoice state updates received by the CL.
486    forkchoice_state_tracker: ForkchoiceStateTracker,
487    /// Buffer of detached blocks.
488    buffer: BlockBuffer<N::Block>,
489    /// Tracks the header of invalid payloads that were rejected by the engine because they're
490    /// invalid.
491    invalid_headers: InvalidHeaderCache,
492}
493
494impl<N: NodePrimitives> EngineApiTreeState<N> {
495    fn new(
496        block_buffer_limit: u32,
497        max_invalid_header_cache_length: u32,
498        canonical_block: BlockNumHash,
499    ) -> Self {
500        Self {
501            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
502            buffer: BlockBuffer::new(block_buffer_limit),
503            tree_state: TreeState::new(canonical_block),
504            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
505        }
506    }
507}
508
509/// The outcome of a tree operation.
510#[derive(Debug)]
511pub struct TreeOutcome<T> {
512    /// The outcome of the operation.
513    pub outcome: T,
514    /// An optional event to tell the caller to do something.
515    pub event: Option<TreeEvent>,
516}
517
518impl<T> TreeOutcome<T> {
519    /// Create new tree outcome.
520    pub const fn new(outcome: T) -> Self {
521        Self { outcome, event: None }
522    }
523
524    /// Set event on the outcome.
525    pub fn with_event(mut self, event: TreeEvent) -> Self {
526        self.event = Some(event);
527        self
528    }
529}
530
531/// Events that are triggered by Tree Chain
532#[derive(Debug)]
533pub enum TreeEvent {
534    /// Tree action is needed.
535    TreeAction(TreeAction),
536    /// Backfill action is needed.
537    BackfillAction(BackfillAction),
538    /// Block download is needed.
539    Download(DownloadRequest),
540}
541
542impl TreeEvent {
543    /// Returns true if the event is a backfill action.
544    const fn is_backfill_action(&self) -> bool {
545        matches!(self, Self::BackfillAction(_))
546    }
547}
548
549/// The actions that can be performed on the tree.
550#[derive(Debug)]
551pub enum TreeAction {
552    /// Make target canonical.
553    MakeCanonical {
554        /// The sync target head hash
555        sync_target_head: B256,
556    },
557}
558
559/// The engine API tree handler implementation.
560///
561/// This type is responsible for processing engine API requests, maintaining the canonical state and
562/// emitting events.
563pub struct EngineApiTreeHandler<N, P, E, T, V, C>
564where
565    N: NodePrimitives,
566    T: EngineTypes,
567{
568    provider: P,
569    executor_provider: E,
570    evm_config: C,
571    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
572    payload_validator: V,
573    /// Keeps track of internals such as executed and buffered blocks.
574    state: EngineApiTreeState<N>,
575    /// The half for sending messages to the engine.
576    ///
577    /// This is kept so that we can queue in messages to ourself that we can process later, for
578    /// example distributing workload across multiple messages that would otherwise take too long
579    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
580    /// them one by one so that we can handle incoming engine API in between and don't become
581    /// unresponsive. This can happen during live sync transition where we're trying to close the
582    /// gap (up to 3 epochs of blocks in the worst case).
583    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
584    /// Incoming engine API requests.
585    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
586    /// Outgoing events that are emitted to the handler.
587    outgoing: UnboundedSender<EngineApiEvent<N>>,
588    /// Channels to the persistence layer.
589    persistence: PersistenceHandle<N>,
590    /// Tracks the state changes of the persistence task.
591    persistence_state: PersistenceState,
592    /// Flag indicating the state of the node's backfill synchronization process.
593    backfill_sync_state: BackfillSyncState,
594    /// Keeps track of the state of the canonical chain that isn't persisted yet.
595    /// This is intended to be accessed from external sources, such as rpc.
596    canonical_in_memory_state: CanonicalInMemoryState<N>,
597    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
598    /// updates
599    payload_builder: PayloadBuilderHandle<T>,
600    /// Configuration settings.
601    config: TreeConfig,
602    /// Metrics for the engine api.
603    metrics: EngineApiMetrics,
604    /// An invalid block hook.
605    invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
606    /// The engine API variant of this handler
607    engine_kind: EngineApiKind,
608    /// The type responsible for processing new payloads
609    payload_processor: PayloadProcessor<N, C>,
610}
611
612impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug, C: Debug> std::fmt::Debug
613    for EngineApiTreeHandler<N, P, E, T, V, C>
614where
615    N: NodePrimitives,
616{
617    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618        f.debug_struct("EngineApiTreeHandler")
619            .field("provider", &self.provider)
620            .field("evm_config", &self.evm_config)
621            .field("executor_provider", &self.executor_provider)
622            .field("consensus", &self.consensus)
623            .field("payload_validator", &self.payload_validator)
624            .field("state", &self.state)
625            .field("incoming_tx", &self.incoming_tx)
626            .field("persistence", &self.persistence)
627            .field("persistence_state", &self.persistence_state)
628            .field("backfill_sync_state", &self.backfill_sync_state)
629            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
630            .field("payload_builder", &self.payload_builder)
631            .field("config", &self.config)
632            .field("metrics", &self.metrics)
633            .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
634            .field("engine_kind", &self.engine_kind)
635            .finish()
636    }
637}
638
639impl<N, P, E, T, V, C> EngineApiTreeHandler<N, P, E, T, V, C>
640where
641    N: NodePrimitives,
642    P: DatabaseProviderFactory
643        + BlockReader<Block = N::Block, Header = N::BlockHeader>
644        + StateProviderFactory
645        + StateReader<Receipt = N::Receipt>
646        + StateCommitmentProvider
647        + HashedPostStateProvider
648        + Clone
649        + 'static,
650    <P as DatabaseProviderFactory>::Provider:
651        BlockReader<Block = N::Block, Header = N::BlockHeader>,
652    E: BlockExecutorProvider<Primitives = N>,
653    C: ConfigureEvm<Primitives = N> + 'static,
654    T: EngineTypes,
655    V: EngineValidator<T, Block = N::Block>,
656{
657    /// Creates a new [`EngineApiTreeHandler`].
658    #[expect(clippy::too_many_arguments)]
659    pub fn new(
660        provider: P,
661        executor_provider: E,
662        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
663        payload_validator: V,
664        outgoing: UnboundedSender<EngineApiEvent<N>>,
665        state: EngineApiTreeState<N>,
666        canonical_in_memory_state: CanonicalInMemoryState<N>,
667        persistence: PersistenceHandle<N>,
668        persistence_state: PersistenceState,
669        payload_builder: PayloadBuilderHandle<T>,
670        config: TreeConfig,
671        engine_kind: EngineApiKind,
672        evm_config: C,
673    ) -> Self {
674        let (incoming_tx, incoming) = std::sync::mpsc::channel();
675
676        let payload_processor =
677            PayloadProcessor::new(WorkloadExecutor::default(), evm_config.clone(), &config);
678
679        Self {
680            provider,
681            executor_provider,
682            evm_config,
683            consensus,
684            payload_validator,
685            incoming,
686            outgoing,
687            persistence,
688            persistence_state,
689            backfill_sync_state: BackfillSyncState::Idle,
690            state,
691            canonical_in_memory_state,
692            payload_builder,
693            config,
694            metrics: Default::default(),
695            incoming_tx,
696            invalid_block_hook: Box::new(NoopInvalidBlockHook),
697            engine_kind,
698            payload_processor,
699        }
700    }
701
702    /// Sets the invalid block hook.
703    fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
704        self.invalid_block_hook = invalid_block_hook;
705    }
706
707    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
708    /// own thread.
709    ///
710    /// Returns the sender through which incoming requests can be sent to the task and the receiver
711    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
712    #[expect(clippy::complexity)]
713    pub fn spawn_new(
714        provider: P,
715        executor_provider: E,
716        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
717        payload_validator: V,
718        persistence: PersistenceHandle<N>,
719        payload_builder: PayloadBuilderHandle<T>,
720        canonical_in_memory_state: CanonicalInMemoryState<N>,
721        config: TreeConfig,
722        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
723        kind: EngineApiKind,
724        evm_config: C,
725    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
726    {
727        let best_block_number = provider.best_block_number().unwrap_or(0);
728        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
729
730        let persistence_state = PersistenceState {
731            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
732            rx: None,
733            remove_above_state: VecDeque::new(),
734        };
735
736        let (tx, outgoing) = unbounded_channel();
737        let state = EngineApiTreeState::new(
738            config.block_buffer_limit(),
739            config.max_invalid_header_cache_length(),
740            header.num_hash(),
741        );
742
743        let mut task = Self::new(
744            provider,
745            executor_provider,
746            consensus,
747            payload_validator,
748            tx,
749            state,
750            canonical_in_memory_state,
751            persistence,
752            persistence_state,
753            payload_builder,
754            config,
755            kind,
756            evm_config,
757        );
758        task.set_invalid_block_hook(invalid_block_hook);
759        let incoming = task.incoming_tx.clone();
760        std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
761        (incoming, outgoing)
762    }
763
764    /// Returns a new [`Sender`] to send messages to this type.
765    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
766        self.incoming_tx.clone()
767    }
768
769    /// Run the engine API handler.
770    ///
771    /// This will block the current thread and process incoming messages.
772    pub fn run(mut self) {
773        loop {
774            match self.try_recv_engine_message() {
775                Ok(Some(msg)) => {
776                    debug!(target: "engine::tree", %msg, "received new engine message");
777                    if let Err(fatal) = self.on_engine_message(msg) {
778                        error!(target: "engine::tree", %fatal, "insert block fatal error");
779                        return
780                    }
781                }
782                Ok(None) => {
783                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
784                }
785                Err(_err) => {
786                    error!(target: "engine::tree", "Engine channel disconnected");
787                    return
788                }
789            }
790
791            if let Err(err) = self.advance_persistence() {
792                error!(target: "engine::tree", %err, "Advancing persistence failed");
793                return
794            }
795        }
796    }
797
798    /// Invoked when previously requested blocks were downloaded.
799    ///
800    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
801    /// will execute the first batch and send the remaining blocks back through the channel so that
802    /// block request processing isn't blocked for a long time.
803    fn on_downloaded(
804        &mut self,
805        mut blocks: Vec<RecoveredBlock<N::Block>>,
806    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
807        if blocks.is_empty() {
808            // nothing to execute
809            return Ok(None)
810        }
811
812        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
813        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
814        for block in blocks.drain(..batch) {
815            if let Some(event) = self.on_downloaded_block(block)? {
816                let needs_backfill = event.is_backfill_action();
817                self.on_tree_event(event)?;
818                if needs_backfill {
819                    // can exit early if backfill is needed
820                    return Ok(None)
821                }
822            }
823        }
824
825        // if we still have blocks to execute, send them as a followup request
826        if !blocks.is_empty() {
827            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
828        }
829
830        Ok(None)
831    }
832
833    /// When the Consensus layer receives a new block via the consensus gossip protocol,
834    /// the transactions in the block are sent to the execution layer in the form of a
835    /// [`EngineTypes::ExecutionData`]. The Execution layer executes the transactions and validates
836    /// the state in the block header, then passes validation data back to Consensus layer, that
837    /// adds the block to the head of its own blockchain and attests to it. The block is then
838    /// broadcast over the consensus p2p network in the form of a "Beacon block".
839    ///
840    /// These responses should adhere to the [Engine API Spec for
841    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
842    ///
843    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
844    /// returns an error if an internal error occurred.
845    #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
846    fn on_new_payload(
847        &mut self,
848        payload: T::ExecutionData,
849    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
850        trace!(target: "engine::tree", "invoked new payload");
851        self.metrics.engine.new_payload_messages.increment(1);
852
853        // Ensures that the given payload does not violate any consensus rules that concern the
854        // block's layout, like:
855        //    - missing or invalid base fee
856        //    - invalid extra data
857        //    - invalid transactions
858        //    - incorrect hash
859        //    - the versioned hashes passed with the payload do not exactly match transaction
860        //      versioned hashes
861        //    - the block does not contain blob transactions if it is pre-cancun
862        //
863        // This validates the following engine API rule:
864        //
865        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
866        //    validation by taking the following steps:
867        //
868        //   1. Obtain the actual array by concatenating blob versioned hashes lists
869        //      (`tx.blob_versioned_hashes`) of each [blob
870        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
871        //      in the payload, respecting the order of inclusion. If the payload has no blob
872        //      transactions the expected array **MUST** be `[]`.
873        //
874        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
875        //      null}` if the expected and the actual arrays don't match.
876        //
877        // This validation **MUST** be instantly run in all cases even during active sync process.
878        let parent_hash = payload.parent_hash();
879        let block = match self.payload_validator.ensure_well_formed_payload(payload) {
880            Ok(block) => block,
881            Err(error) => {
882                error!(target: "engine::tree", %error, "Invalid payload");
883                // we need to convert the error to a payload status (response to the CL)
884
885                let latest_valid_hash =
886                    if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
887                        // Engine-API rules:
888                        // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
889                        // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
890                        None
891                    } else {
892                        self.latest_valid_hash_for_invalid_payload(parent_hash)?
893                    };
894
895                let status = PayloadStatusEnum::from(error);
896                return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
897            }
898        };
899
900        let block_hash = block.hash();
901        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
902        if lowest_buffered_ancestor == block_hash {
903            lowest_buffered_ancestor = block.parent_hash();
904        }
905
906        // now check the block itself
907        if let Some(status) =
908            self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?
909        {
910            return Ok(TreeOutcome::new(status))
911        }
912
913        let status = if self.backfill_sync_state.is_idle() {
914            let mut latest_valid_hash = None;
915            let num_hash = block.num_hash();
916            match self.insert_block(block) {
917                Ok(status) => {
918                    let status = match status {
919                        InsertPayloadOk::Inserted(BlockStatus::Valid) => {
920                            latest_valid_hash = Some(block_hash);
921                            self.try_connect_buffered_blocks(num_hash)?;
922                            PayloadStatusEnum::Valid
923                        }
924                        InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
925                            latest_valid_hash = Some(block_hash);
926                            PayloadStatusEnum::Valid
927                        }
928                        InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
929                        InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
930                            // not known to be invalid, but we don't know anything else
931                            PayloadStatusEnum::Syncing
932                        }
933                    };
934
935                    PayloadStatus::new(status, latest_valid_hash)
936                }
937                Err(error) => self.on_insert_block_error(error)?,
938            }
939        } else if let Err(error) = self.buffer_block(block) {
940            self.on_insert_block_error(error)?
941        } else {
942            PayloadStatus::from_status(PayloadStatusEnum::Syncing)
943        };
944
945        let mut outcome = TreeOutcome::new(status);
946        // if the block is valid and it is the current sync target head, make it canonical
947        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
948            // but only if it isn't already the canonical head
949            if self.state.tree_state.canonical_block_hash() != block_hash {
950                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
951                    sync_target_head: block_hash,
952                }));
953            }
954        }
955
956        Ok(outcome)
957    }
958
959    /// Returns the new chain for the given head.
960    ///
961    /// This also handles reorgs.
962    ///
963    /// Note: This does not update the tracked state and instead returns the new chain based on the
964    /// given head.
965    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
966        // get the executed new head block
967        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
968            return Ok(None)
969        };
970
971        let new_head_number = new_head_block.recovered_block().number();
972        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
973
974        let mut new_chain = vec![new_head_block.clone()];
975        let mut current_hash = new_head_block.recovered_block().parent_hash();
976        let mut current_number = new_head_number - 1;
977
978        // Walk back the new chain until we reach a block we know about
979        //
980        // This is only done for in-memory blocks, because we should not have persisted any blocks
981        // that are _above_ the current canonical head.
982        while current_number > current_canonical_number {
983            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
984            {
985                current_hash = block.recovered_block().parent_hash();
986                current_number -= 1;
987                new_chain.push(block);
988            } else {
989                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
990                // This should never happen as we're walking back a chain that should connect to
991                // the canonical chain
992                return Ok(None);
993            }
994        }
995
996        // If we have reached the current canonical head by walking back from the target, then we
997        // know this represents an extension of the canonical chain.
998        if current_hash == self.state.tree_state.current_canonical_head.hash {
999            new_chain.reverse();
1000
1001            // Simple extension of the current chain
1002            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
1003        }
1004
1005        // We have a reorg. Walk back both chains to find the fork point.
1006        let mut old_chain = Vec::new();
1007        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
1008
1009        // If the canonical chain is ahead of the new chain,
1010        // gather all blocks until new head number.
1011        while current_canonical_number > current_number {
1012            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1013                old_chain.push(block.clone());
1014                old_hash = block.recovered_block().parent_hash();
1015                current_canonical_number -= 1;
1016            } else {
1017                // This shouldn't happen as we're walking back the canonical chain
1018                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1019                return Ok(None);
1020            }
1021        }
1022
1023        // Both new and old chain pointers are now at the same height.
1024        debug_assert_eq!(current_number, current_canonical_number);
1025
1026        // Walk both chains from specified hashes at same height until
1027        // a common ancestor (fork block) is reached.
1028        while old_hash != current_hash {
1029            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
1030                old_hash = block.recovered_block().parent_hash();
1031                old_chain.push(block);
1032            } else {
1033                // This shouldn't happen as we're walking back the canonical chain
1034                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
1035                return Ok(None);
1036            }
1037
1038            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
1039            {
1040                current_hash = block.recovered_block().parent_hash();
1041                new_chain.push(block);
1042            } else {
1043                // This shouldn't happen as we've already walked this path
1044                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
1045                return Ok(None);
1046            }
1047        }
1048        new_chain.reverse();
1049        old_chain.reverse();
1050
1051        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
1052    }
1053
1054    /// Determines if the given block is part of a fork by checking that these
1055    /// conditions are true:
1056    /// * walking back from the target hash to verify that the target hash is not part of an
1057    ///   extension of the canonical chain.
1058    /// * walking back from the current head to verify that the target hash is not already part of
1059    ///   the canonical chain.
1060    fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
1061        // verify that the given hash is not part of an extension of the canon chain.
1062        let canonical_head = self.state.tree_state.canonical_head();
1063        let mut current_hash = target_hash;
1064        while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
1065            if current_block.hash() == canonical_head.hash {
1066                return Ok(false)
1067            }
1068            // We already passed the canonical head
1069            if current_block.number() <= canonical_head.number {
1070                break
1071            }
1072            current_hash = current_block.parent_hash();
1073        }
1074
1075        // verify that the given hash is not already part of canonical chain stored in memory
1076        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
1077            return Ok(false)
1078        }
1079
1080        // verify that the given hash is not already part of persisted canonical chain
1081        if self.provider.block_number(target_hash)?.is_some() {
1082            return Ok(false)
1083        }
1084
1085        Ok(true)
1086    }
1087
1088    /// Returns the persisting kind for the input block.
1089    fn persisting_kind_for(&self, block: &N::BlockHeader) -> PersistingKind {
1090        // Check that we're currently persisting.
1091        let Some(action) = self.persistence_state.current_action() else {
1092            return PersistingKind::NotPersisting
1093        };
1094        // Check that the persistince action is saving blocks, not removing them.
1095        let CurrentPersistenceAction::SavingBlocks { highest } = action else {
1096            return PersistingKind::PersistingNotDescendant
1097        };
1098
1099        // The block being validated can only be a descendant if its number is higher than
1100        // the highest block persisting. Otherwise, it's likely a fork of a lower block.
1101        if block.number() > highest.number && self.state.tree_state.is_descendant(*highest, block) {
1102            return PersistingKind::PersistingDescendant
1103        }
1104
1105        // In all other cases, the block is not a descendant.
1106        PersistingKind::PersistingNotDescendant
1107    }
1108
1109    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1110    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1111    /// chain.
1112    ///
1113    /// These responses should adhere to the [Engine API Spec for
1114    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1115    ///
1116    /// Returns an error if an internal error occurred like a database error.
1117    #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
1118    fn on_forkchoice_updated(
1119        &mut self,
1120        state: ForkchoiceState,
1121        attrs: Option<T::PayloadAttributes>,
1122        version: EngineApiMessageVersion,
1123    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1124        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1125        self.metrics.engine.forkchoice_updated_messages.increment(1);
1126        self.canonical_in_memory_state.on_forkchoice_update_received();
1127
1128        if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1129            return Ok(TreeOutcome::new(on_updated))
1130        }
1131
1132        let valid_outcome = |head| {
1133            TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1134                PayloadStatusEnum::Valid,
1135                Some(head),
1136            )))
1137        };
1138
1139        // Process the forkchoice update by trying to make the head block canonical
1140        //
1141        // We can only process this forkchoice update if:
1142        // - we have the `head` block
1143        // - the head block is part of a chain that is connected to the canonical chain. This
1144        //   includes reorgs.
1145        //
1146        // Performing a FCU involves:
1147        // - marking the FCU's head block as canonical
1148        // - updating in memory state to reflect the new canonical chain
1149        // - updating canonical state trackers
1150        // - emitting a canonicalization event for the new chain (including reorg)
1151        // - if we have payload attributes, delegate them to the payload service
1152
1153        // 1. ensure we have a new head block
1154        if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1155            trace!(target: "engine::tree", "fcu head hash is already canonical");
1156
1157            // update the safe and finalized blocks and ensure their values are valid
1158            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1159                // safe or finalized hashes are invalid
1160                return Ok(TreeOutcome::new(outcome))
1161            }
1162
1163            // we still need to process payload attributes if the head is already canonical
1164            if let Some(attr) = attrs {
1165                let tip = self
1166                    .block_by_hash(self.state.tree_state.canonical_block_hash())?
1167                    .ok_or_else(|| {
1168                        // If we can't find the canonical block, then something is wrong and we need
1169                        // to return an error
1170                        ProviderError::HeaderNotFound(state.head_block_hash.into())
1171                    })?;
1172                let updated = self.process_payload_attributes(attr, tip.header(), state, version);
1173                return Ok(TreeOutcome::new(updated))
1174            }
1175
1176            // the head block is already canonical
1177            return Ok(valid_outcome(state.head_block_hash))
1178        }
1179
1180        // 2. ensure we can apply a new chain update for the head block
1181        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1182            let tip = chain_update.tip().clone_sealed_header();
1183            self.on_canonical_chain_update(chain_update);
1184
1185            // update the safe and finalized blocks and ensure their values are valid
1186            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1187                // safe or finalized hashes are invalid
1188                return Ok(TreeOutcome::new(outcome))
1189            }
1190
1191            if let Some(attr) = attrs {
1192                let updated = self.process_payload_attributes(attr, &tip, state, version);
1193                return Ok(TreeOutcome::new(updated))
1194            }
1195
1196            return Ok(valid_outcome(state.head_block_hash))
1197        }
1198
1199        // 3. check if the head is already part of the canonical chain
1200        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1201            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1202
1203            // For OpStack the proposers are allowed to reorg their own chain at will, so we need to
1204            // always trigger a new payload job if requested.
1205            if self.engine_kind.is_opstack() {
1206                if let Some(attr) = attrs {
1207                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1208                    let updated =
1209                        self.process_payload_attributes(attr, &canonical_header, state, version);
1210                    return Ok(TreeOutcome::new(updated))
1211                }
1212            }
1213
1214            // 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
1215            //    payload build process if `forkchoiceState.headBlockHash` references a `VALID`
1216            //    ancestor of the head of canonical chain, i.e. the ancestor passed payload
1217            //    validation process and deemed `VALID`. In the case of such an event, client
1218            //    software MUST return `{payloadStatus: {status: VALID, latestValidHash:
1219            //    forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1220
1221            // the head block is already canonical, so we're not triggering a payload job and can
1222            // return right away
1223            return Ok(valid_outcome(state.head_block_hash))
1224        }
1225
1226        // 4. we don't have the block to perform the update
1227        // we assume the FCU is valid and at least the head is missing,
1228        // so we need to start syncing to it
1229        //
1230        // find the appropriate target to sync to, if we don't have the safe block hash then we
1231        // start syncing to the safe block via backfill first
1232        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1233            // check that safe block is valid and missing
1234            !state.safe_block_hash.is_zero() &&
1235            self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1236        {
1237            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1238            state.safe_block_hash
1239        } else {
1240            state.head_block_hash
1241        };
1242
1243        let target = self.lowest_buffered_ancestor_or(target);
1244        trace!(target: "engine::tree", %target, "downloading missing block");
1245
1246        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1247            PayloadStatusEnum::Syncing,
1248        )))
1249        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1250    }
1251
1252    /// Attempts to receive the next engine request.
1253    ///
1254    /// If there's currently no persistence action in progress, this will block until a new request
1255    /// is received. If there's a persistence action in progress, this will try to receive the
1256    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1257    /// received in time.
1258    ///
1259    /// Returns an error if the engine channel is disconnected.
1260    #[expect(clippy::type_complexity)]
1261    fn try_recv_engine_message(
1262        &self,
1263    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1264        if self.persistence_state.in_progress() {
1265            // try to receive the next request with a timeout to not block indefinitely
1266            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1267                Ok(msg) => Ok(Some(msg)),
1268                Err(err) => match err {
1269                    RecvTimeoutError::Timeout => Ok(None),
1270                    RecvTimeoutError::Disconnected => Err(RecvError),
1271                },
1272            }
1273        } else {
1274            self.incoming.recv().map(Some)
1275        }
1276    }
1277
1278    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1279    /// the current persistence action while we're removing blocks.
1280    fn remove_blocks(&mut self, new_tip_num: u64) {
1281        debug!(target: "engine::tree", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1282        if new_tip_num < self.persistence_state.last_persisted_block.number {
1283            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1284            let (tx, rx) = oneshot::channel();
1285            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1286            self.persistence_state.start_remove(new_tip_num, rx);
1287        }
1288    }
1289
1290    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1291    /// the current persistence action while we're saving blocks.
1292    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1293        if blocks_to_persist.is_empty() {
1294            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1295            return
1296        }
1297
1298        // NOTE: checked non-empty above
1299        let highest_num_hash = blocks_to_persist
1300            .iter()
1301            .max_by_key(|block| block.recovered_block().number())
1302            .map(|b| b.recovered_block().num_hash())
1303            .expect("Checked non-empty persisting blocks");
1304
1305        debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1306        let (tx, rx) = oneshot::channel();
1307        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1308
1309        self.persistence_state.start_save(highest_num_hash, rx);
1310    }
1311
1312    /// Attempts to advance the persistence state.
1313    ///
1314    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1315    /// or send a new persistence action if necessary.
1316    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1317        if self.persistence_state.in_progress() {
1318            let (mut rx, start_time, current_action) = self
1319                .persistence_state
1320                .rx
1321                .take()
1322                .expect("if a persistence task is in progress Receiver must be Some");
1323            // Check if persistence has complete
1324            match rx.try_recv() {
1325                Ok(last_persisted_hash_num) => {
1326                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1327                    let Some(BlockNumHash {
1328                        hash: last_persisted_block_hash,
1329                        number: last_persisted_block_number,
1330                    }) = last_persisted_hash_num
1331                    else {
1332                        // if this happened, then we persisted no blocks because we sent an
1333                        // empty vec of blocks
1334                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1335                        return Ok(())
1336                    };
1337
1338                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1339                    self.persistence_state
1340                        .finish(last_persisted_block_hash, last_persisted_block_number);
1341                    self.on_new_persisted_block()?;
1342                }
1343                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1344                Err(TryRecvError::Empty) => {
1345                    self.persistence_state.rx = Some((rx, start_time, current_action))
1346                }
1347            }
1348        }
1349
1350        if !self.persistence_state.in_progress() {
1351            if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
1352                self.remove_blocks(new_tip_num)
1353            } else if self.should_persist() {
1354                let blocks_to_persist = self.get_canonical_blocks_to_persist();
1355                self.persist_blocks(blocks_to_persist);
1356            }
1357        }
1358
1359        Ok(())
1360    }
1361
1362    /// Handles a message from the engine.
1363    fn on_engine_message(
1364        &mut self,
1365        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1366    ) -> Result<(), InsertBlockFatalError> {
1367        match msg {
1368            FromEngine::Event(event) => match event {
1369                FromOrchestrator::BackfillSyncStarted => {
1370                    debug!(target: "engine::tree", "received backfill sync started event");
1371                    self.backfill_sync_state = BackfillSyncState::Active;
1372                }
1373                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1374                    self.on_backfill_sync_finished(ctrl)?;
1375                }
1376            },
1377            FromEngine::Request(request) => {
1378                match request {
1379                    EngineApiRequest::InsertExecutedBlock(block) => {
1380                        let block_num_hash = block.recovered_block().num_hash();
1381                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1382                        let now = Instant::now();
1383
1384                        // if the parent is the canonical head, we can insert the block as the
1385                        // pending block
1386                        if self.state.tree_state.canonical_block_hash() ==
1387                            block.recovered_block().parent_hash()
1388                        {
1389                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1390                            self.canonical_in_memory_state.set_pending_block(block.clone());
1391                        }
1392
1393                        self.state.tree_state.insert_executed(block.clone());
1394                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1395                        self.emit_event(EngineApiEvent::BeaconConsensus(
1396                            BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1397                        ));
1398                    }
1399                    EngineApiRequest::Beacon(request) => {
1400                        match request {
1401                            BeaconEngineMessage::ForkchoiceUpdated {
1402                                state,
1403                                payload_attrs,
1404                                tx,
1405                                version,
1406                            } => {
1407                                let mut output =
1408                                    self.on_forkchoice_updated(state, payload_attrs, version);
1409
1410                                if let Ok(res) = &mut output {
1411                                    // track last received forkchoice state
1412                                    self.state
1413                                        .forkchoice_state_tracker
1414                                        .set_latest(state, res.outcome.forkchoice_status());
1415
1416                                    // emit an event about the handled FCU
1417                                    self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1418                                        state,
1419                                        res.outcome.forkchoice_status(),
1420                                    ));
1421
1422                                    // handle the event if any
1423                                    self.on_maybe_tree_event(res.event.take())?;
1424                                }
1425
1426                                if let Err(err) =
1427                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1428                                {
1429                                    self.metrics
1430                                        .engine
1431                                        .failed_forkchoice_updated_response_deliveries
1432                                        .increment(1);
1433                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1434                                }
1435                            }
1436                            BeaconEngineMessage::NewPayload { payload, tx } => {
1437                                let mut output = self.on_new_payload(payload);
1438
1439                                let maybe_event =
1440                                    output.as_mut().ok().and_then(|out| out.event.take());
1441
1442                                // emit response
1443                                if let Err(err) =
1444                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1445                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1446                                    }))
1447                                {
1448                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1449                                    self.metrics
1450                                        .engine
1451                                        .failed_new_payload_response_deliveries
1452                                        .increment(1);
1453                                }
1454
1455                                // handle the event if any
1456                                self.on_maybe_tree_event(maybe_event)?;
1457                            }
1458                            BeaconEngineMessage::TransitionConfigurationExchanged => {
1459                                // triggering this hook will record that we received a request from
1460                                // the CL
1461                                self.canonical_in_memory_state
1462                                    .on_transition_configuration_exchanged();
1463                            }
1464                        }
1465                    }
1466                }
1467            }
1468            FromEngine::DownloadedBlocks(blocks) => {
1469                if let Some(event) = self.on_downloaded(blocks)? {
1470                    self.on_tree_event(event)?;
1471                }
1472            }
1473        }
1474        Ok(())
1475    }
1476
1477    /// Invoked if the backfill sync has finished to target.
1478    ///
1479    /// At this point we consider the block synced to the backfill target.
1480    ///
1481    /// Checks the tracked finalized block against the block on disk and requests another backfill
1482    /// run if the distance to the tip exceeds the threshold for another backfill run.
1483    ///
1484    /// This will also do the necessary housekeeping of the tree state, this includes:
1485    ///  - removing all blocks below the backfill height
1486    ///  - resetting the canonical in-memory state
1487    ///
1488    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1489    /// target block.
1490    fn on_backfill_sync_finished(
1491        &mut self,
1492        ctrl: ControlFlow,
1493    ) -> Result<(), InsertBlockFatalError> {
1494        debug!(target: "engine::tree", "received backfill sync finished event");
1495        self.backfill_sync_state = BackfillSyncState::Idle;
1496
1497        // backfill height is the block number that the backfill finished at
1498        let mut backfill_height = ctrl.block_number();
1499
1500        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1501        if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1502            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1503            // update the `invalid_headers` cache with the new invalid header
1504            self.state.invalid_headers.insert(**bad_block);
1505
1506            // if this was an unwind then the target is the new height
1507            backfill_height = Some(*target);
1508        }
1509
1510        // backfill height is the block number that the backfill finished at
1511        let Some(backfill_height) = backfill_height else { return Ok(()) };
1512
1513        // state house keeping after backfill sync
1514        // remove all executed blocks below the backfill height
1515        //
1516        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1517        // before that
1518        let Some(backfill_num_hash) = self
1519            .provider
1520            .block_hash(backfill_height)?
1521            .map(|hash| BlockNumHash { hash, number: backfill_height })
1522        else {
1523            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1524            return Ok(())
1525        };
1526
1527        if ctrl.is_unwind() {
1528            // the node reset so we need to clear everything above that height so that backfill
1529            // height is the new canonical block.
1530            self.state.tree_state.reset(backfill_num_hash)
1531        } else {
1532            self.state.tree_state.remove_until(
1533                backfill_num_hash,
1534                self.persistence_state.last_persisted_block.hash,
1535                Some(backfill_num_hash),
1536            );
1537        }
1538
1539        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1540        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1541
1542        // remove all buffered blocks below the backfill height
1543        self.state.buffer.remove_old_blocks(backfill_height);
1544        // we remove all entries because now we're synced to the backfill target and consider this
1545        // the canonical chain
1546        self.canonical_in_memory_state.clear_state();
1547
1548        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1549            // update the tracked chain height, after backfill sync both the canonical height and
1550            // persisted height are the same
1551            self.state.tree_state.set_canonical_head(new_head.num_hash());
1552            self.persistence_state.finish(new_head.hash(), new_head.number());
1553
1554            // update the tracked canonical head
1555            self.canonical_in_memory_state.set_canonical_head(new_head);
1556        }
1557
1558        // check if we need to run backfill again by comparing the most recent finalized height to
1559        // the backfill height
1560        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1561        else {
1562            return Ok(())
1563        };
1564        if sync_target_state.finalized_block_hash.is_zero() {
1565            // no finalized block, can't check distance
1566            return Ok(())
1567        }
1568        // get the block number of the finalized block, if we have it
1569        let newest_finalized = self
1570            .state
1571            .buffer
1572            .block(&sync_target_state.finalized_block_hash)
1573            .map(|block| block.number());
1574
1575        // The block number that the backfill finished at - if the progress or newest
1576        // finalized is None then we can't check the distance anyways.
1577        //
1578        // If both are Some, we perform another distance check and return the desired
1579        // backfill target
1580        if let Some(backfill_target) =
1581            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1582                // Determines whether or not we should run backfill again, in case
1583                // the new gap is still large enough and requires running backfill again
1584                self.backfill_sync_target(progress, finalized_number, None)
1585            })
1586        {
1587            // request another backfill run
1588            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1589                backfill_target.into(),
1590            )));
1591            return Ok(())
1592        };
1593
1594        // try to close the gap by executing buffered blocks that are child blocks of the new head
1595        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1596    }
1597
1598    /// Attempts to make the given target canonical.
1599    ///
1600    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1601    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1602        if let Some(chain_update) = self.on_new_head(target)? {
1603            self.on_canonical_chain_update(chain_update);
1604        }
1605
1606        Ok(())
1607    }
1608
1609    /// Convenience function to handle an optional tree event.
1610    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1611        if let Some(event) = event {
1612            self.on_tree_event(event)?;
1613        }
1614
1615        Ok(())
1616    }
1617
1618    /// Handles a tree event.
1619    ///
1620    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1621    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1622        match event {
1623            TreeEvent::TreeAction(action) => match action {
1624                TreeAction::MakeCanonical { sync_target_head } => {
1625                    self.make_canonical(sync_target_head)?;
1626                }
1627            },
1628            TreeEvent::BackfillAction(action) => {
1629                self.emit_event(EngineApiEvent::BackfillAction(action));
1630            }
1631            TreeEvent::Download(action) => {
1632                self.emit_event(EngineApiEvent::Download(action));
1633            }
1634        }
1635
1636        Ok(())
1637    }
1638
1639    /// Emits an outgoing event to the engine.
1640    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1641        let event = event.into();
1642
1643        if event.is_backfill_action() {
1644            debug_assert_eq!(
1645                self.backfill_sync_state,
1646                BackfillSyncState::Idle,
1647                "backfill action should only be emitted when backfill is idle"
1648            );
1649
1650            if self.persistence_state.in_progress() {
1651                // backfill sync and persisting data are mutually exclusive, so we can't start
1652                // backfill while we're still persisting
1653                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1654                return
1655            }
1656
1657            self.backfill_sync_state = BackfillSyncState::Pending;
1658            self.metrics.engine.pipeline_runs.increment(1);
1659            debug!(target: "engine::tree", "emitting backfill action event");
1660        }
1661
1662        let _ = self.outgoing.send(event).inspect_err(
1663            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1664        );
1665    }
1666
1667    /// Returns true if the canonical chain length minus the last persisted
1668    /// block is greater than or equal to the persistence threshold and
1669    /// backfill is not running.
1670    pub const fn should_persist(&self) -> bool {
1671        if !self.backfill_sync_state.is_idle() {
1672            // can't persist if backfill is running
1673            return false
1674        }
1675
1676        let min_block = self.persistence_state.last_persisted_block.number;
1677        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1678            self.config.persistence_threshold()
1679    }
1680
1681    /// Returns a batch of consecutive canonical blocks to persist in the range
1682    /// `(last_persisted_number .. canonical_head - threshold]` . The expected
1683    /// order is oldest -> newest.
1684    fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
1685        let mut blocks_to_persist = Vec::new();
1686        let mut current_hash = self.state.tree_state.canonical_block_hash();
1687        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1688
1689        let canonical_head_number = self.state.tree_state.canonical_block_number();
1690
1691        let target_number =
1692            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1693
1694        debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1695        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1696            if block.recovered_block().number() <= last_persisted_number {
1697                break;
1698            }
1699
1700            if block.recovered_block().number() <= target_number {
1701                blocks_to_persist.push(block.clone());
1702            }
1703
1704            current_hash = block.recovered_block().parent_hash();
1705        }
1706
1707        // reverse the order so that the oldest block comes first
1708        blocks_to_persist.reverse();
1709
1710        blocks_to_persist
1711    }
1712
1713    /// This clears the blocks from the in-memory tree state that have been persisted to the
1714    /// database.
1715    ///
1716    /// This also updates the canonical in-memory state to reflect the newest persisted block
1717    /// height.
1718    ///
1719    /// Assumes that `finish` has been called on the `persistence_state` at least once
1720    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1721        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1722        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1723        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1724            number: self.persistence_state.last_persisted_block.number,
1725            hash: self.persistence_state.last_persisted_block.hash,
1726        });
1727        Ok(())
1728    }
1729
1730    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1731    ///
1732    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1733    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1734    /// has in memory.
1735    ///
1736    /// For finalized blocks, this will return `None`.
1737    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1738        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1739        // check memory first
1740        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash).cloned() {
1741            return Ok(Some(block.block))
1742        }
1743
1744        let (block, senders) = self
1745            .provider
1746            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1747            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1748            .split_sealed();
1749        let execution_output = self
1750            .provider
1751            .get_state(block.header().number())?
1752            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1753        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1754
1755        Ok(Some(ExecutedBlock {
1756            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1757            execution_output: Arc::new(execution_output),
1758            hashed_state: Arc::new(hashed_state),
1759        }))
1760    }
1761
1762    /// Return sealed block from database or in-memory state by hash.
1763    fn sealed_header_by_hash(
1764        &self,
1765        hash: B256,
1766    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1767        // check memory first
1768        let block = self
1769            .state
1770            .tree_state
1771            .block_by_hash(hash)
1772            .map(|block| block.as_ref().clone_sealed_header());
1773
1774        if block.is_some() {
1775            Ok(block)
1776        } else {
1777            self.provider.sealed_header_by_hash(hash)
1778        }
1779    }
1780
1781    /// Return block from database or in-memory state by hash.
1782    fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1783        // check database first
1784        let mut block = self.provider.block_by_hash(hash)?;
1785        if block.is_none() {
1786            // Note: it's fine to return the unsealed block because the caller already has
1787            // the hash
1788            block = self
1789                .state
1790                .tree_state
1791                .block_by_hash(hash)
1792                // TODO: clone for compatibility. should we return an Arc here?
1793                .map(|block| block.as_ref().clone().into_block());
1794        }
1795        Ok(block)
1796    }
1797
1798    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1799    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1800    /// not exist in the buffer, this returns the hash that is passed in.
1801    ///
1802    /// Returns the parent hash of the block itself if the block is buffered and has no other
1803    /// buffered ancestors.
1804    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1805        self.state
1806            .buffer
1807            .lowest_ancestor(&hash)
1808            .map(|block| block.parent_hash())
1809            .unwrap_or_else(|| hash)
1810    }
1811
1812    /// If validation fails, the response MUST contain the latest valid hash:
1813    ///
1814    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1815    ///     conditions:
1816    ///     - It is fully validated and deemed VALID
1817    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1818    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1819    ///     conditions are satisfied by a `PoW` block.
1820    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1821    ///     the above conditions.
1822    fn latest_valid_hash_for_invalid_payload(
1823        &mut self,
1824        parent_hash: B256,
1825    ) -> ProviderResult<Option<B256>> {
1826        // Check if parent exists in side chain or in canonical chain.
1827        if self.block_by_hash(parent_hash)?.is_some() {
1828            return Ok(Some(parent_hash))
1829        }
1830
1831        // iterate over ancestors in the invalid cache
1832        // until we encounter the first valid ancestor
1833        let mut current_hash = parent_hash;
1834        let mut current_block = self.state.invalid_headers.get(&current_hash);
1835        while let Some(block_with_parent) = current_block {
1836            current_hash = block_with_parent.parent;
1837            current_block = self.state.invalid_headers.get(&current_hash);
1838
1839            // If current_header is None, then the current_hash does not have an invalid
1840            // ancestor in the cache, check its presence in blockchain tree
1841            if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1842                return Ok(Some(current_hash))
1843            }
1844        }
1845        Ok(None)
1846    }
1847
1848    /// Prepares the invalid payload response for the given hash, checking the
1849    /// database for the parent hash and populating the payload status with the latest valid hash
1850    /// according to the engine api spec.
1851    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1852        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1853        // PoW block, which we need to identify by looking at the parent's block difficulty
1854        if let Some(parent) = self.block_by_hash(parent_hash)? {
1855            if !parent.header().difficulty().is_zero() {
1856                parent_hash = B256::ZERO;
1857            }
1858        }
1859
1860        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1861        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1862            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1863        })
1864        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1865    }
1866
1867    /// Returns true if the given hash is the last received sync target block.
1868    ///
1869    /// See [`ForkchoiceStateTracker::sync_target_state`]
1870    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1871        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1872            return target.head_block_hash == block_hash
1873        }
1874        false
1875    }
1876
1877    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1878    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1879    ///
1880    /// Returns a payload status response according to the engine API spec if the block is known to
1881    /// be invalid.
1882    fn check_invalid_ancestor_with_head(
1883        &mut self,
1884        check: B256,
1885        head: &SealedBlock<N::Block>,
1886    ) -> ProviderResult<Option<PayloadStatus>> {
1887        // check if the check hash was previously marked as invalid
1888        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1889
1890        // populate the latest valid hash field
1891        let status = self.prepare_invalid_response(header.parent)?;
1892
1893        // insert the head block into the invalid header cache
1894        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), header);
1895        self.emit_event(BeaconConsensusEngineEvent::InvalidBlock(Box::new(head.clone())));
1896
1897        Ok(Some(status))
1898    }
1899
1900    /// Checks if the given `head` points to an invalid header, which requires a specific response
1901    /// to a forkchoice update.
1902    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1903        // check if the head was previously marked as invalid
1904        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1905        // populate the latest valid hash field
1906        Ok(Some(self.prepare_invalid_response(header.parent)?))
1907    }
1908
1909    /// Validate if block is correct and satisfies all the consensus rules that concern the header
1910    /// and block body itself.
1911    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1912        if let Err(e) =
1913            self.consensus.validate_header_with_total_difficulty(block.header(), U256::MAX)
1914        {
1915            error!(
1916                target: "engine::tree",
1917                ?block,
1918                "Failed to validate total difficulty for block {}: {e}",
1919                block.hash()
1920            );
1921            return Err(e)
1922        }
1923
1924        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1925            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1926            return Err(e)
1927        }
1928
1929        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1930            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1931            return Err(e)
1932        }
1933
1934        Ok(())
1935    }
1936
1937    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
1938    #[instrument(level = "trace", skip(self), target = "engine::tree")]
1939    fn try_connect_buffered_blocks(
1940        &mut self,
1941        parent: BlockNumHash,
1942    ) -> Result<(), InsertBlockFatalError> {
1943        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1944
1945        if blocks.is_empty() {
1946            // nothing to append
1947            return Ok(())
1948        }
1949
1950        let now = Instant::now();
1951        let block_count = blocks.len();
1952        for child in blocks {
1953            let child_num_hash = child.num_hash();
1954            match self.insert_block(child) {
1955                Ok(res) => {
1956                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1957                    if self.is_sync_target_head(child_num_hash.hash) &&
1958                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1959                    {
1960                        self.make_canonical(child_num_hash.hash)?;
1961                    }
1962                }
1963                Err(err) => {
1964                    debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1965                    if let Err(fatal) = self.on_insert_block_error(err) {
1966                        warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1967                        return Err(fatal)
1968                    }
1969                }
1970            }
1971        }
1972
1973        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1974        Ok(())
1975    }
1976
1977    /// Pre-validates the block and inserts it into the buffer.
1978    fn buffer_block(
1979        &mut self,
1980        block: RecoveredBlock<N::Block>,
1981    ) -> Result<(), InsertBlockError<N::Block>> {
1982        if let Err(err) = self.validate_block(&block) {
1983            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1984        }
1985        self.state.buffer.insert_block(block);
1986        Ok(())
1987    }
1988
1989    /// Returns true if the distance from the local tip to the block is greater than the configured
1990    /// threshold.
1991    ///
1992    /// If the `local_tip` is greater than the `block`, then this will return false.
1993    #[inline]
1994    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1995        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1996    }
1997
1998    /// Returns how far the local tip is from the given block. If the local tip is at the same
1999    /// height or its block number is greater than the given block, this returns None.
2000    #[inline]
2001    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2002        if block > local_tip {
2003            Some(block - local_tip)
2004        } else {
2005            None
2006        }
2007    }
2008
2009    /// Returns the target hash to sync to if the distance from the local tip to the block is
2010    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2011    /// that block already).
2012    ///
2013    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2014    /// (missing) finalized block.
2015    fn backfill_sync_target(
2016        &self,
2017        canonical_tip_num: u64,
2018        target_block_number: u64,
2019        downloaded_block: Option<BlockNumHash>,
2020    ) -> Option<B256> {
2021        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2022
2023        // check if the distance exceeds the threshold for backfill sync
2024        let mut exceeds_backfill_threshold =
2025            self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
2026
2027        // check if the downloaded block is the tracked finalized block
2028        if let Some(buffered_finalized) = sync_target_state
2029            .as_ref()
2030            .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2031        {
2032            // if we have buffered the finalized block, we should check how far
2033            // we're off
2034            exceeds_backfill_threshold =
2035                self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
2036        }
2037
2038        // If this is invoked after we downloaded a block we can check if this block is the
2039        // finalized block
2040        if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
2041            if downloaded_block.hash == state.finalized_block_hash {
2042                // we downloaded the finalized block and can now check how far we're off
2043                exceeds_backfill_threshold =
2044                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
2045            }
2046        }
2047
2048        // if the number of missing blocks is greater than the max, trigger backfill
2049        if exceeds_backfill_threshold {
2050            if let Some(state) = sync_target_state {
2051                // if we have already canonicalized the finalized block, we should skip backfill
2052                match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2053                    Err(err) => {
2054                        warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2055                    }
2056                    Ok(None) => {
2057                        // ensure the finalized block is known (not the zero hash)
2058                        if !state.finalized_block_hash.is_zero() {
2059                            // we don't have the block yet and the distance exceeds the allowed
2060                            // threshold
2061                            return Some(state.finalized_block_hash)
2062                        }
2063
2064                        // OPTIMISTIC SYNCING
2065                        //
2066                        // It can happen when the node is doing an
2067                        // optimistic sync, where the CL has no knowledge of the finalized hash,
2068                        // but is expecting the EL to sync as high
2069                        // as possible before finalizing.
2070                        //
2071                        // This usually doesn't happen on ETH mainnet since CLs use the more
2072                        // secure checkpoint syncing.
2073                        //
2074                        // However, optimism chains will do this. The risk of a reorg is however
2075                        // low.
2076                        debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2077                        return Some(state.head_block_hash)
2078                    }
2079                    Ok(Some(_)) => {
2080                        // we're fully synced to the finalized block
2081                    }
2082                }
2083            }
2084        }
2085
2086        None
2087    }
2088
2089    /// This determines whether or not we should remove blocks from the chain, based on a canonical
2090    /// chain update.
2091    ///
2092    /// If the chain update is a reorg:
2093    /// * is the new chain behind the last persisted block, or
2094    /// * if the root of the new chain is at the same height as the last persisted block, is it a
2095    ///   different block
2096    ///
2097    /// If either of these are true, then this returns the height of the first block. Otherwise,
2098    /// this returns [`None`]. This should be used to check whether or not we should be sending a
2099    /// remove command to the persistence task.
2100    fn find_disk_reorg(&self, chain_update: &NewCanonicalChain<N>) -> Option<u64> {
2101        let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };
2102
2103        let BlockNumHash { number: new_num, hash: new_hash } =
2104            new.first().map(|block| block.recovered_block().num_hash())?;
2105
2106        match new_num.cmp(&self.persistence_state.last_persisted_block.number) {
2107            Ordering::Greater => {
2108                // new number is above the last persisted block so the reorg can be performed
2109                // entirely in memory
2110                None
2111            }
2112            Ordering::Equal => {
2113                // new number is the same, if the hash is the same then we should not need to remove
2114                // any blocks
2115                (self.persistence_state.last_persisted_block.hash != new_hash).then_some(new_num)
2116            }
2117            Ordering::Less => {
2118                // this means we are below the last persisted block and must remove on disk blocks
2119                Some(new_num)
2120            }
2121        }
2122    }
2123
2124    /// Invoked when we the canonical chain has been updated.
2125    ///
2126    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2127    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2128        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2129        let start = Instant::now();
2130
2131        // schedule a remove_above call if we have an on-disk reorg
2132        if let Some(height) = self.find_disk_reorg(&chain_update) {
2133            // calculate the new tip by subtracting one from the lowest part of the chain
2134            let new_tip_num = height.saturating_sub(1);
2135            self.persistence_state.schedule_removal(new_tip_num);
2136        }
2137
2138        // update the tracked canonical head
2139        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2140
2141        let tip = chain_update.tip().clone_sealed_header();
2142        let notification = chain_update.to_chain_notification();
2143
2144        // reinsert any missing reorged blocks
2145        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2146            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2147            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2148            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2149
2150            self.update_reorg_metrics(old.len());
2151            self.reinsert_reorged_blocks(new.clone());
2152            // Try reinserting the reorged canonical chain. This is only possible if we have
2153            // `persisted_trie_updates` for those blocks.
2154            let old = old
2155                .iter()
2156                .filter_map(|block| {
2157                    let (_, trie) = self
2158                        .state
2159                        .tree_state
2160                        .persisted_trie_updates
2161                        .get(&block.recovered_block.hash())
2162                        .cloned()?;
2163                    Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
2164                })
2165                .collect::<Vec<_>>();
2166            self.reinsert_reorged_blocks(old);
2167        }
2168
2169        // update the tracked in-memory state with the new chain
2170        self.canonical_in_memory_state.update_chain(chain_update);
2171        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2172
2173        // Update metrics based on new tip
2174        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2175
2176        // sends an event to all active listeners about the new canonical chain
2177        self.canonical_in_memory_state.notify_canon_state(notification);
2178
2179        // emit event
2180        self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
2181            Box::new(tip),
2182            start.elapsed(),
2183        ));
2184    }
2185
2186    /// This updates metrics based on the given reorg length.
2187    fn update_reorg_metrics(&self, old_chain_length: usize) {
2188        self.metrics.tree.reorgs.increment(1);
2189        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2190    }
2191
2192    /// This reinserts any blocks in the new chain that do not already exist in the tree
2193    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2194        for block in new_chain {
2195            if self
2196                .state
2197                .tree_state
2198                .executed_block_by_hash(block.recovered_block().hash())
2199                .is_none()
2200            {
2201                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2202                self.state.tree_state.insert_executed(block);
2203            }
2204        }
2205    }
2206
2207    /// Invoke the invalid block hook if this is a new invalid block.
2208    fn on_invalid_block(
2209        &mut self,
2210        parent_header: &SealedHeader<N::BlockHeader>,
2211        block: &RecoveredBlock<N::Block>,
2212        output: &BlockExecutionOutput<N::Receipt>,
2213        trie_updates: Option<(&TrieUpdates, B256)>,
2214    ) {
2215        if self.state.invalid_headers.get(&block.hash()).is_some() {
2216            // we already marked this block as invalid
2217            return;
2218        }
2219        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
2220    }
2221
2222    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2223    ///
2224    /// This mainly compares the missing parent of the downloaded block with the current canonical
2225    /// tip, and decides whether or not backfill sync should be triggered.
2226    fn on_disconnected_downloaded_block(
2227        &self,
2228        downloaded_block: BlockNumHash,
2229        missing_parent: BlockNumHash,
2230        head: BlockNumHash,
2231    ) -> Option<TreeEvent> {
2232        // compare the missing parent with the canonical tip
2233        if let Some(target) =
2234            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2235        {
2236            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2237            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2238        }
2239
2240        // continue downloading the missing parent
2241        //
2242        // this happens if either:
2243        //  * the missing parent block num < canonical tip num
2244        //    * this case represents a missing block on a fork that is shorter than the canonical
2245        //      chain
2246        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2247        //    less than the backfill threshold
2248        //    * this case represents a potentially long range of blocks to download and execute
2249        let request = if let Some(distance) =
2250            self.distance_from_local_tip(head.number, missing_parent.number)
2251        {
2252            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2253            DownloadRequest::BlockRange(missing_parent.hash, distance)
2254        } else {
2255            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2256            // This happens when the missing parent is on an outdated
2257            // sidechain and we can only download the missing block itself
2258            DownloadRequest::single_block(missing_parent.hash)
2259        };
2260
2261        Some(TreeEvent::Download(request))
2262    }
2263
2264    /// Invoked with a block downloaded from the network
2265    ///
2266    /// Returns an event with the appropriate action to take, such as:
2267    ///  - download more missing blocks
2268    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2269    #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2270    fn on_downloaded_block(
2271        &mut self,
2272        block: RecoveredBlock<N::Block>,
2273    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2274        let block_num_hash = block.num_hash();
2275        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2276        if self
2277            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2278            .is_some()
2279        {
2280            return Ok(None)
2281        }
2282
2283        if !self.backfill_sync_state.is_idle() {
2284            return Ok(None)
2285        }
2286
2287        // try to append the block
2288        match self.insert_block(block) {
2289            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2290                if self.is_sync_target_head(block_num_hash.hash) {
2291                    trace!(target: "engine::tree", "appended downloaded sync target block");
2292
2293                    // we just inserted the current sync target block, we can try to make it
2294                    // canonical
2295                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2296                        sync_target_head: block_num_hash.hash,
2297                    })))
2298                }
2299                trace!(target: "engine::tree", "appended downloaded block");
2300                self.try_connect_buffered_blocks(block_num_hash)?;
2301            }
2302            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2303                // block is not connected to the canonical head, we need to download
2304                // its missing branch first
2305                return Ok(self.on_disconnected_downloaded_block(
2306                    block_num_hash,
2307                    missing_ancestor,
2308                    head,
2309                ))
2310            }
2311            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2312                trace!(target: "engine::tree", "downloaded block already executed");
2313            }
2314            Err(err) => {
2315                debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2316                if let Err(fatal) = self.on_insert_block_error(err) {
2317                    warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2318                    return Err(fatal)
2319                }
2320            }
2321        }
2322        Ok(None)
2323    }
2324
2325    fn insert_block(
2326        &mut self,
2327        block: RecoveredBlock<N::Block>,
2328    ) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
2329        self.insert_block_inner(block.clone())
2330            .map_err(|kind| InsertBlockError::new(block.into_sealed_block(), kind))
2331    }
2332
2333    fn insert_block_inner(
2334        &mut self,
2335        block: RecoveredBlock<N::Block>,
2336    ) -> Result<InsertPayloadOk, InsertBlockErrorKind> {
2337        let block_num_hash = block.num_hash();
2338        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2339
2340        if self.block_by_hash(block.hash())?.is_some() {
2341            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2342        }
2343
2344        let start = Instant::now();
2345
2346        trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
2347
2348        // validate block consensus rules
2349        self.validate_block(&block)?;
2350
2351        trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
2352        let Some(provider_builder) = self.state_provider_builder(block.parent_hash())? else {
2353            // we don't have the state required to execute this block, buffering it and find the
2354            // missing parent block
2355            let missing_ancestor = self
2356                .state
2357                .buffer
2358                .lowest_ancestor(&block.parent_hash())
2359                .map(|block| block.parent_num_hash())
2360                .unwrap_or_else(|| block.parent_num_hash());
2361
2362            self.state.buffer.insert_block(block);
2363
2364            return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2365                head: self.state.tree_state.current_canonical_head,
2366                missing_ancestor,
2367            }))
2368        };
2369
2370        // now validate against the parent
2371        let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
2372            InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
2373                block.parent_hash().into(),
2374            ))
2375        })?;
2376        if let Err(e) =
2377            self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
2378        {
2379            warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
2380            return Err(e.into())
2381        }
2382
2383        let state_provider = provider_builder.build()?;
2384
2385        // We only run the parallel state root if we are not currently persisting any blocks or
2386        // persisting blocks that are all ancestors of the one we are executing.
2387        //
2388        // If we're committing ancestor blocks, then: any trie updates being committed are a subset
2389        // of the in-memory trie updates collected before fetching reverts. So any diff in
2390        // reverts (pre vs post commit) is already covered by the in-memory trie updates we
2391        // collect in `compute_state_root_parallel`.
2392        //
2393        // See https://github.com/paradigmxyz/reth/issues/12688 for more details
2394        let persisting_kind = self.persisting_kind_for(block.header());
2395        let run_parallel_state_root = persisting_kind.can_run_parallel_state_root();
2396
2397        // use prewarming background task
2398        let header = block.clone_sealed_header();
2399        let txs = block.clone_transactions_recovered().collect();
2400        let mut handle = if run_parallel_state_root && self.config.use_state_root_task() {
2401            // use background tasks for state root calc
2402            let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2403
2404            // Compute trie input
2405            let trie_input_start = Instant::now();
2406            let trie_input = self
2407                .compute_trie_input(
2408                    persisting_kind,
2409                    consistent_view.clone(),
2410                    block.header().parent_hash(),
2411                )
2412                .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
2413
2414            self.metrics
2415                .block_validation
2416                .trie_input_duration
2417                .record(trie_input_start.elapsed().as_secs_f64());
2418
2419            self.payload_processor.spawn(header, txs, provider_builder, consistent_view, trie_input)
2420        } else {
2421            self.payload_processor.spawn_cache_exclusive(header, txs, provider_builder)
2422        };
2423
2424        // Use cached state provider before executing, used in execution after prewarming threads
2425        // complete
2426        let state_provider = CachedStateProvider::new_with_caches(
2427            state_provider,
2428            handle.caches(),
2429            handle.cache_metrics(),
2430        );
2431
2432        trace!(target: "engine::tree", block=?block_num_hash, "Executing block");
2433
2434        let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
2435        let execution_start = Instant::now();
2436        let output = self.metrics.executor.execute_metered(
2437            executor,
2438            &block,
2439            Box::new(handle.state_hook()),
2440        )?;
2441        let execution_finish = Instant::now();
2442        let execution_time = execution_finish.duration_since(execution_start);
2443        trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
2444
2445        // after executing the block we can stop executing transactions
2446        handle.stop_prewarming_execution();
2447
2448        if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
2449            // call post-block hook
2450            self.on_invalid_block(&parent_block, &block, &output, None);
2451            return Err(err.into())
2452        }
2453
2454        let hashed_state = self.provider.hashed_post_state(&output.state);
2455
2456        if let Err(err) = self
2457            .payload_validator
2458            .validate_block_post_execution_with_hashed_state(&hashed_state, &block)
2459        {
2460            // call post-block hook
2461            self.on_invalid_block(&parent_block, &block, &output, None);
2462            return Err(err.into())
2463        }
2464
2465        trace!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
2466
2467        let root_time = Instant::now();
2468
2469        let mut maybe_state_root = None;
2470
2471        if run_parallel_state_root {
2472            // if we new payload extends the current canonical change we attempt to use the
2473            // background task or try to compute it in parallel
2474            if self.config.use_state_root_task() {
2475                match handle.state_root() {
2476                    Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
2477                        let elapsed = execution_finish.elapsed();
2478                        info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
2479                        // we double check the state root here for good measure
2480                        if state_root == block.header().state_root() {
2481                            maybe_state_root = Some((state_root, trie_updates, elapsed))
2482                        } else {
2483                            warn!(
2484                                target: "engine::tree",
2485                                ?state_root,
2486                                block_state_root = ?block.header().state_root(),
2487                                "State root task returned incorrect state root"
2488                            );
2489                        }
2490                    }
2491                    Err(error) => {
2492                        debug!(target: "engine::tree", %error, "Background parallel state root computation failed");
2493                    }
2494                }
2495            } else {
2496                match self.compute_state_root_parallel(
2497                    persisting_kind,
2498                    block.header().parent_hash(),
2499                    &hashed_state,
2500                ) {
2501                    Ok(result) => {
2502                        info!(
2503                            target: "engine::tree",
2504                            block = ?block_num_hash,
2505                            regular_state_root = ?result.0,
2506                            "Regular root task finished"
2507                        );
2508                        maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
2509                    }
2510                    Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2511                        debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
2512                    }
2513                    Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))),
2514                }
2515            }
2516        }
2517
2518        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
2519            maybe_state_root
2520        {
2521            maybe_state_root
2522        } else {
2523            // fallback is to compute the state root regularly in sync
2524            debug!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
2525            let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?;
2526            (root, updates, root_time.elapsed())
2527        };
2528
2529        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2530        debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
2531
2532        // ensure state root matches
2533        if state_root != block.header().state_root() {
2534            // call post-block hook
2535            self.on_invalid_block(&parent_block, &block, &output, Some((&trie_output, state_root)));
2536            return Err(ConsensusError::BodyStateRootDiff(
2537                GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2538            )
2539            .into())
2540        }
2541
2542        // terminate prewarming task with good state output
2543        handle.terminate_caching(Some(output.state.clone()));
2544
2545        let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
2546            block: ExecutedBlock {
2547                recovered_block: Arc::new(block),
2548                execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
2549                hashed_state: Arc::new(hashed_state),
2550            },
2551            trie: Arc::new(trie_output),
2552        };
2553
2554        // if the parent is the canonical head, we can insert the block as the pending block
2555        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2556        {
2557            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2558            self.canonical_in_memory_state.set_pending_block(executed.clone());
2559        }
2560
2561        self.state.tree_state.insert_executed(executed.clone());
2562        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2563
2564        // emit insert event
2565        let elapsed = start.elapsed();
2566        let engine_event = if self.is_fork(block_num_hash.hash)? {
2567            BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2568        } else {
2569            BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2570        };
2571        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2572
2573        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2574        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2575    }
2576
2577    /// Compute state root for the given hashed post state in parallel.
2578    ///
2579    /// # Returns
2580    ///
2581    /// Returns `Ok(_)` if computed successfully.
2582    /// Returns `Err(_)` if error was encountered during computation.
2583    /// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
2584    /// should be used instead.
2585    fn compute_state_root_parallel(
2586        &self,
2587        persisting_kind: PersistingKind,
2588        parent_hash: B256,
2589        hashed_state: &HashedPostState,
2590    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2591        let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2592
2593        let mut input =
2594            self.compute_trie_input(persisting_kind, consistent_view.clone(), parent_hash)?;
2595        // Extend with block we are validating root for.
2596        input.append_ref(hashed_state);
2597
2598        ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2599    }
2600
2601    /// Computes the trie input at the provided parent hash.
2602    ///
2603    /// The goal of this function is to take in-memory blocks and generate a [`TrieInput`] that
2604    /// serves as an overlay to the database blocks.
2605    ///
2606    /// It works as follows:
2607    /// 1. Collect in-memory blocks that are descendants of the provided parent hash using
2608    ///    [`TreeState::blocks_by_hash`].
2609    /// 2. If the persistence is in progress, and the block that we're computing the trie input for
2610    ///    is a descendant of the currently persisting blocks, we need to be sure that in-memory
2611    ///    blocks are not overlapping with the database blocks that may have been already persisted.
2612    ///    To do that, we're filtering out in-memory blocks that are lower than the highest database
2613    ///    block.
2614    /// 3. Once in-memory blocks are collected and optionally filtered, we compute the
2615    ///    [`HashedPostState`] from them.
2616    fn compute_trie_input(
2617        &self,
2618        persisting_kind: PersistingKind,
2619        consistent_view: ConsistentDbView<P>,
2620        parent_hash: B256,
2621    ) -> Result<TrieInput, ParallelStateRootError> {
2622        let mut input = TrieInput::default();
2623
2624        let provider = consistent_view.provider_ro()?;
2625        let best_block_number = provider.best_block_number()?;
2626
2627        let (mut historical, mut blocks) = self
2628            .state
2629            .tree_state
2630            .blocks_by_hash(parent_hash)
2631            .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2632
2633        // If the current block is a descendant of the currently persisting blocks, then we need to
2634        // filter in-memory blocks, so that none of them are already persisted in the database.
2635        if persisting_kind.is_descendant() {
2636            // Iterate over the blocks from oldest to newest.
2637            while let Some(block) = blocks.last() {
2638                let recovered_block = block.recovered_block();
2639                if recovered_block.number() <= best_block_number {
2640                    // Remove those blocks that lower than or equal to the highest database
2641                    // block.
2642                    blocks.pop();
2643                } else {
2644                    // If the block is higher than the best block number, stop filtering, as it's
2645                    // the first block that's not in the database.
2646                    break
2647                }
2648            }
2649
2650            historical = if let Some(block) = blocks.last() {
2651                // If there are any in-memory blocks left after filtering, set the anchor to the
2652                // parent of the oldest block.
2653                (block.recovered_block().number() - 1).into()
2654            } else {
2655                // Otherwise, set the anchor to the original provided parent hash.
2656                parent_hash.into()
2657            };
2658        }
2659
2660        if blocks.is_empty() {
2661            debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2662        } else {
2663            debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2664        }
2665
2666        // Convert the historical block to the block number.
2667        let block_number = provider
2668            .convert_hash_or_number(historical)?
2669            .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2670
2671        // Retrieve revert state for historical block.
2672        let revert_state = if block_number == best_block_number {
2673            // We do not check against the `last_block_number` here because
2674            // `HashedPostState::from_reverts` only uses the database tables, and not static files.
2675            debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2676            HashedPostState::default()
2677        } else {
2678            let revert_state = HashedPostState::from_reverts::<
2679                <P::StateCommitment as StateCommitment>::KeyHasher,
2680            >(provider.tx_ref(), block_number + 1)
2681            .map_err(ProviderError::from)?;
2682            debug!(
2683                target: "engine::tree",
2684                block_number,
2685                best_block_number,
2686                accounts = revert_state.accounts.len(),
2687                storages = revert_state.storages.len(),
2688                "Non-empty revert state"
2689            );
2690            revert_state
2691        };
2692        input.append(revert_state);
2693
2694        // Extend with contents of parent in-memory blocks.
2695        for block in blocks.iter().rev() {
2696            input.append_cached_ref(block.trie_updates(), block.hashed_state())
2697        }
2698
2699        Ok(input)
2700    }
2701
2702    /// Handles an error that occurred while inserting a block.
2703    ///
2704    /// If this is a validation error this will mark the block as invalid.
2705    ///
2706    /// Returns the proper payload status response if the block is invalid.
2707    fn on_insert_block_error(
2708        &mut self,
2709        error: InsertBlockError<N::Block>,
2710    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2711        let (block, error) = error.split();
2712
2713        // if invalid block, we check the validation error. Otherwise return the fatal
2714        // error.
2715        let validation_err = error.ensure_validation_error()?;
2716
2717        // If the error was due to an invalid payload, the payload is added to the
2718        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2719        // returned.
2720        warn!(
2721            target: "engine::tree",
2722            invalid_hash=%block.hash(),
2723            invalid_number=block.number(),
2724            %validation_err,
2725            "Invalid block error on new payload",
2726        );
2727        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2728
2729        // keep track of the invalid header
2730        self.state.invalid_headers.insert(block.block_with_parent());
2731        self.emit_event(EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
2732            Box::new(block),
2733        )));
2734        Ok(PayloadStatus::new(
2735            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2736            latest_valid_hash,
2737        ))
2738    }
2739
2740    /// Attempts to find the header for the given block hash if it is canonical.
2741    pub fn find_canonical_header(
2742        &self,
2743        hash: B256,
2744    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2745        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2746
2747        if canonical.is_none() {
2748            canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2749        }
2750
2751        Ok(canonical)
2752    }
2753
2754    /// Updates the tracked finalized block if we have it.
2755    fn update_finalized_block(
2756        &self,
2757        finalized_block_hash: B256,
2758    ) -> Result<(), OnForkChoiceUpdated> {
2759        if finalized_block_hash.is_zero() {
2760            return Ok(())
2761        }
2762
2763        match self.find_canonical_header(finalized_block_hash) {
2764            Ok(None) => {
2765                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2766                // if the finalized block is not known, we can't update the finalized block
2767                return Err(OnForkChoiceUpdated::invalid_state())
2768            }
2769            Ok(Some(finalized)) => {
2770                if Some(finalized.num_hash()) !=
2771                    self.canonical_in_memory_state.get_finalized_num_hash()
2772                {
2773                    // we're also persisting the finalized block on disk so we can reload it on
2774                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2775                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2776                    self.canonical_in_memory_state.set_finalized(finalized);
2777                }
2778            }
2779            Err(err) => {
2780                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2781            }
2782        }
2783
2784        Ok(())
2785    }
2786
2787    /// Updates the tracked safe block if we have it
2788    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2789        if safe_block_hash.is_zero() {
2790            return Ok(())
2791        }
2792
2793        match self.find_canonical_header(safe_block_hash) {
2794            Ok(None) => {
2795                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2796                // if the safe block is not known, we can't update the safe block
2797                return Err(OnForkChoiceUpdated::invalid_state())
2798            }
2799            Ok(Some(safe)) => {
2800                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2801                    // we're also persisting the safe block on disk so we can reload it on
2802                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2803                    let _ = self.persistence.save_safe_block_number(safe.number());
2804                    self.canonical_in_memory_state.set_safe(safe);
2805                }
2806            }
2807            Err(err) => {
2808                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2809            }
2810        }
2811
2812        Ok(())
2813    }
2814
2815    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2816    /// made canonical.
2817    ///
2818    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2819    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2820    ///
2821    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2822    /// are consistent with the head block.
2823    fn ensure_consistent_forkchoice_state(
2824        &self,
2825        state: ForkchoiceState,
2826    ) -> Result<(), OnForkChoiceUpdated> {
2827        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2828        // after the head block is canonicalized.
2829        //
2830        // This ensures that the finalized block is consistent with the head block, i.e. the
2831        // finalized block is an ancestor of the head block.
2832        self.update_finalized_block(state.finalized_block_hash)?;
2833
2834        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2835        // after the head block is canonicalized.
2836        //
2837        // This ensures that the safe block is consistent with the head block, i.e. the safe
2838        // block is an ancestor of the head block.
2839        self.update_safe_block(state.safe_block_hash)
2840    }
2841
2842    /// Pre-validate forkchoice update and check whether it can be processed.
2843    ///
2844    /// This method returns the update outcome if validation fails or
2845    /// the node is syncing and the update cannot be processed at the moment.
2846    fn pre_validate_forkchoice_update(
2847        &mut self,
2848        state: ForkchoiceState,
2849    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2850        if state.head_block_hash.is_zero() {
2851            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2852        }
2853
2854        // check if the new head hash is connected to any ancestor that we previously marked as
2855        // invalid
2856        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2857        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2858            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2859        }
2860
2861        if !self.backfill_sync_state.is_idle() {
2862            // We can only process new forkchoice updates if the pipeline is idle, since it requires
2863            // exclusive access to the database
2864            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2865            return Ok(Some(OnForkChoiceUpdated::syncing()))
2866        }
2867
2868        Ok(None)
2869    }
2870
2871    /// Validates the payload attributes with respect to the header and fork choice state.
2872    ///
2873    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2874    /// return an error if the payload attributes are invalid.
2875    fn process_payload_attributes(
2876        &self,
2877        attrs: T::PayloadAttributes,
2878        head: &N::BlockHeader,
2879        state: ForkchoiceState,
2880        version: EngineApiMessageVersion,
2881    ) -> OnForkChoiceUpdated {
2882        if let Err(err) =
2883            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2884        {
2885            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2886            return OnForkChoiceUpdated::invalid_payload_attributes()
2887        }
2888
2889        // 8. Client software MUST begin a payload build process building on top of
2890        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2891        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2892        //    The build process is specified in the Payload building section.
2893        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2894            state.head_block_hash,
2895            attrs,
2896            version as u8,
2897        ) {
2898            Ok(attributes) => {
2899                // send the payload to the builder and return the receiver for the pending payload
2900                // id, initiating payload job is handled asynchronously
2901                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2902
2903                // Client software MUST respond to this method call in the following way:
2904                // {
2905                //      payloadStatus: {
2906                //          status: VALID,
2907                //          latestValidHash: forkchoiceState.headBlockHash,
2908                //          validationError: null
2909                //      },
2910                //      payloadId: buildProcessId
2911                // }
2912                //
2913                // if the payload is deemed VALID and the build process has begun.
2914                OnForkChoiceUpdated::updated_with_pending_payload_id(
2915                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2916                    pending_payload_id,
2917                )
2918            }
2919            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2920        }
2921    }
2922
2923    /// Remove all blocks up to __and including__ the given block number.
2924    ///
2925    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2926    /// those which have a fork point at or below the finalized hash.
2927    ///
2928    /// Canonical blocks below the upper bound will still be removed.
2929    pub(crate) fn remove_before(
2930        &mut self,
2931        upper_bound: BlockNumHash,
2932        finalized_hash: Option<B256>,
2933    ) -> ProviderResult<()> {
2934        // first fetch the finalized block number and then call the remove_before method on
2935        // tree_state
2936        let num = if let Some(hash) = finalized_hash {
2937            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2938        } else {
2939            None
2940        };
2941
2942        self.state.tree_state.remove_until(
2943            upper_bound,
2944            self.persistence_state.last_persisted_block.hash,
2945            num,
2946        );
2947        Ok(())
2948    }
2949
2950    /// Returns a builder for creating state providers for the given hash.
2951    ///
2952    /// This is an optimization for parallel execution contexts where we want to avoid
2953    /// creating state providers in the critical path.
2954    pub fn state_provider_builder(
2955        &self,
2956        hash: B256,
2957    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2958    where
2959        P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
2960    {
2961        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2962            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2963            // the block leads back to the canonical chain
2964            return Ok(Some(StateProviderBuilder::new(
2965                self.provider.clone(),
2966                historical,
2967                Some(blocks),
2968            )))
2969        }
2970
2971        // Check if the block is persisted
2972        if let Some(header) = self.provider.header(&hash)? {
2973            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2974            // For persisted blocks, we create a builder that will fetch state directly from the
2975            // database
2976            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2977        }
2978
2979        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2980        Ok(None)
2981    }
2982}
2983
2984/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2985/// variant.
2986///
2987/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2988/// is valid or not.
2989#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2990pub enum BlockStatus {
2991    /// The block is valid and block extends canonical chain.
2992    Valid,
2993    /// The block may be valid and has an unknown missing ancestor.
2994    Disconnected {
2995        /// Current canonical head.
2996        head: BlockNumHash,
2997        /// The lowest ancestor block that is not connected to the canonical chain.
2998        missing_ancestor: BlockNumHash,
2999    },
3000}
3001
3002/// How a payload was inserted if it was valid.
3003///
3004/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen(_)`] is
3005/// returned, otherwise [`InsertPayloadOk::Inserted(_)`] is returned.
3006#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3007pub enum InsertPayloadOk {
3008    /// The payload was valid, but we have already seen it.
3009    AlreadySeen(BlockStatus),
3010    /// The payload was valid and inserted into the tree.
3011    Inserted(BlockStatus),
3012}
3013
3014/// Whether or not the blocks are currently persisting and the input block is a descendant.
3015#[derive(Debug, Clone, Copy)]
3016pub enum PersistingKind {
3017    /// The blocks are not currently persisting.
3018    NotPersisting,
3019    /// The blocks are currently persisting but the input block is not a descendant.
3020    PersistingNotDescendant,
3021    /// The blocks are currently persisting and the input block is a descendant.
3022    PersistingDescendant,
3023}
3024
3025impl PersistingKind {
3026    /// Returns true if the parallel state root can be run.
3027    ///
3028    /// We only run the parallel state root if we are not currently persisting any blocks or
3029    /// persisting blocks that are all ancestors of the one we are calculating the state root for.
3030    pub fn can_run_parallel_state_root(&self) -> bool {
3031        matches!(self, Self::NotPersisting | Self::PersistingDescendant)
3032    }
3033
3034    /// Returns true if the blocks are currently being persisted and the input block is a
3035    /// descendant.
3036    pub fn is_descendant(&self) -> bool {
3037        matches!(self, Self::PersistingDescendant)
3038    }
3039}
3040
3041#[cfg(test)]
3042mod tests {
3043    use super::*;
3044    use crate::persistence::PersistenceAction;
3045    use alloy_consensus::Header;
3046    use alloy_primitives::Bytes;
3047    use alloy_rlp::Decodable;
3048    use alloy_rpc_types_engine::{
3049        CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1,
3050        ExecutionPayloadV3,
3051    };
3052    use assert_matches::assert_matches;
3053    use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
3054    use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
3055    use reth_engine_primitives::ForkchoiceStatus;
3056    use reth_ethereum_consensus::EthBeaconConsensus;
3057    use reth_ethereum_engine_primitives::EthEngineTypes;
3058    use reth_ethereum_primitives::{Block, EthPrimitives};
3059    use reth_evm::test_utils::MockExecutorProvider;
3060    use reth_evm_ethereum::EthEvmConfig;
3061    use reth_node_ethereum::EthereumEngineValidator;
3062    use reth_primitives_traits::Block as _;
3063    use reth_provider::test_utils::MockEthProvider;
3064    use reth_trie::{updates::TrieUpdates, HashedPostState};
3065    use std::{
3066        str::FromStr,
3067        sync::mpsc::{channel, Sender},
3068    };
3069
3070    /// This is a test channel that allows you to `release` any value that is in the channel.
3071    ///
3072    /// If nothing has been sent, then the next value will be immediately sent.
3073    #[allow(dead_code)]
3074    struct TestChannel<T> {
3075        /// If an item is sent to this channel, an item will be released in the wrapped channel
3076        release: Receiver<()>,
3077        /// The sender channel
3078        tx: Sender<T>,
3079        /// The receiver channel
3080        rx: Receiver<T>,
3081    }
3082
3083    impl<T: Send + 'static> TestChannel<T> {
3084        /// Creates a new test channel
3085        #[allow(dead_code)]
3086        fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
3087            let (original_tx, original_rx) = channel();
3088            let (wrapped_tx, wrapped_rx) = channel();
3089            let (release_tx, release_rx) = channel();
3090            let handle = TestChannelHandle::new(release_tx);
3091            let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
3092            // spawn the task that listens and releases stuff
3093            std::thread::spawn(move || test_channel.intercept_loop());
3094            (original_tx, wrapped_rx, handle)
3095        }
3096
3097        /// Runs the intercept loop, waiting for the handle to release a value
3098        fn intercept_loop(&self) {
3099            while self.release.recv() == Ok(()) {
3100                let Ok(value) = self.rx.recv() else { return };
3101
3102                let _ = self.tx.send(value);
3103            }
3104        }
3105    }
3106
3107    struct TestChannelHandle {
3108        /// The sender to use for releasing values
3109        release: Sender<()>,
3110    }
3111
3112    impl TestChannelHandle {
3113        /// Returns a [`TestChannelHandle`]
3114        const fn new(release: Sender<()>) -> Self {
3115            Self { release }
3116        }
3117
3118        /// Signals to the channel task that a value should be released
3119        #[allow(dead_code)]
3120        fn release(&self) {
3121            let _ = self.release.send(());
3122        }
3123    }
3124
3125    struct TestHarness {
3126        tree: EngineApiTreeHandler<
3127            EthPrimitives,
3128            MockEthProvider,
3129            MockExecutorProvider,
3130            EthEngineTypes,
3131            EthereumEngineValidator,
3132            EthEvmConfig,
3133        >,
3134        to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
3135        from_tree_rx: UnboundedReceiver<EngineApiEvent>,
3136        blocks: Vec<ExecutedBlockWithTrieUpdates>,
3137        action_rx: Receiver<PersistenceAction>,
3138        executor_provider: MockExecutorProvider,
3139        block_builder: TestBlockBuilder,
3140        provider: MockEthProvider,
3141    }
3142
3143    impl TestHarness {
3144        fn new(chain_spec: Arc<ChainSpec>) -> Self {
3145            let (action_tx, action_rx) = channel();
3146            Self::with_persistence_channel(chain_spec, action_tx, action_rx)
3147        }
3148
3149        #[allow(dead_code)]
3150        fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
3151            let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
3152            (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
3153        }
3154
3155        fn with_persistence_channel(
3156            chain_spec: Arc<ChainSpec>,
3157            action_tx: Sender<PersistenceAction>,
3158            action_rx: Receiver<PersistenceAction>,
3159        ) -> Self {
3160            let persistence_handle = PersistenceHandle::new(action_tx);
3161
3162            let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
3163
3164            let provider = MockEthProvider::default();
3165            let executor_provider = MockExecutorProvider::default();
3166
3167            let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
3168
3169            let (from_tree_tx, from_tree_rx) = unbounded_channel();
3170
3171            let header = chain_spec.genesis_header().clone();
3172            let header = SealedHeader::seal_slow(header);
3173            let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
3174            let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
3175
3176            let (to_payload_service, _payload_command_rx) = unbounded_channel();
3177            let payload_builder = PayloadBuilderHandle::new(to_payload_service);
3178
3179            let evm_config = EthEvmConfig::new(chain_spec.clone());
3180
3181            let tree = EngineApiTreeHandler::new(
3182                provider.clone(),
3183                executor_provider.clone(),
3184                consensus,
3185                payload_validator,
3186                from_tree_tx,
3187                engine_api_tree_state,
3188                canonical_in_memory_state,
3189                persistence_handle,
3190                PersistenceState::default(),
3191                payload_builder,
3192                // TODO: fix tests for state root task https://github.com/paradigmxyz/reth/issues/14376
3193                // always assume enough parallelism for tests
3194                TreeConfig::default()
3195                    .with_legacy_state_root(true)
3196                    .with_has_enough_parallelism(true),
3197                EngineApiKind::Ethereum,
3198                evm_config,
3199            );
3200
3201            let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3202            Self {
3203                to_tree_tx: tree.incoming_tx.clone(),
3204                tree,
3205                from_tree_rx,
3206                blocks: vec![],
3207                action_rx,
3208                executor_provider,
3209                block_builder,
3210                provider,
3211            }
3212        }
3213
3214        fn with_blocks(mut self, blocks: Vec<ExecutedBlockWithTrieUpdates>) -> Self {
3215            let mut blocks_by_hash = HashMap::default();
3216            let mut blocks_by_number = BTreeMap::new();
3217            let mut state_by_hash = HashMap::default();
3218            let mut hash_by_number = BTreeMap::new();
3219            let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
3220            let mut parent_hash = B256::ZERO;
3221
3222            for block in &blocks {
3223                let sealed_block = block.recovered_block();
3224                let hash = sealed_block.hash();
3225                let number = sealed_block.number;
3226                blocks_by_hash.insert(hash, block.clone());
3227                blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
3228                state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
3229                hash_by_number.insert(number, hash);
3230                parent_to_child.entry(parent_hash).or_default().insert(hash);
3231                parent_hash = hash;
3232            }
3233
3234            self.tree.state.tree_state = TreeState {
3235                blocks_by_hash,
3236                blocks_by_number,
3237                current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
3238                parent_to_child,
3239                persisted_trie_updates: HashMap::default(),
3240            };
3241
3242            let last_executed_block = blocks.last().unwrap().clone();
3243            let pending = Some(BlockState::new(last_executed_block));
3244            self.tree.canonical_in_memory_state =
3245                CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
3246
3247            self.blocks = blocks.clone();
3248
3249            let recovered_blocks =
3250                blocks.iter().map(|b| b.recovered_block().clone()).collect::<Vec<_>>();
3251
3252            self.persist_blocks(recovered_blocks);
3253
3254            self
3255        }
3256
3257        const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
3258            self.tree.backfill_sync_state = state;
3259            self
3260        }
3261
3262        fn extend_execution_outcome(
3263            &self,
3264            execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
3265        ) {
3266            self.executor_provider.extend(execution_outcomes);
3267        }
3268
3269        fn insert_block(
3270            &mut self,
3271            block: RecoveredBlock<reth_ethereum_primitives::Block>,
3272        ) -> Result<InsertPayloadOk, InsertBlockError<Block>> {
3273            let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3274            self.extend_execution_outcome([execution_outcome]);
3275            self.tree.provider.add_state_root(block.state_root);
3276            self.tree.insert_block(block)
3277        }
3278
3279        async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3280            let fcu_status = fcu_status.into();
3281
3282            self.send_fcu(block_hash, fcu_status).await;
3283
3284            self.check_fcu(block_hash, fcu_status).await;
3285        }
3286
3287        async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3288            let fcu_state = self.fcu_state(block_hash);
3289
3290            let (tx, rx) = oneshot::channel();
3291            self.tree
3292                .on_engine_message(FromEngine::Request(
3293                    BeaconEngineMessage::ForkchoiceUpdated {
3294                        state: fcu_state,
3295                        payload_attrs: None,
3296                        tx,
3297                        version: EngineApiMessageVersion::default(),
3298                    }
3299                    .into(),
3300                ))
3301                .unwrap();
3302
3303            let response = rx.await.unwrap().unwrap().await.unwrap();
3304            match fcu_status.into() {
3305                ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
3306                ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
3307                ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
3308            }
3309        }
3310
3311        async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3312            let fcu_state = self.fcu_state(block_hash);
3313
3314            // check for ForkchoiceUpdated event
3315            let event = self.from_tree_rx.recv().await.unwrap();
3316            match event {
3317                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
3318                    state,
3319                    status,
3320                )) => {
3321                    assert_eq!(state, fcu_state);
3322                    assert_eq!(status, fcu_status.into());
3323                }
3324                _ => panic!("Unexpected event: {:#?}", event),
3325            }
3326        }
3327
3328        const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
3329            ForkchoiceState {
3330                head_block_hash: block_hash,
3331                safe_block_hash: block_hash,
3332                finalized_block_hash: block_hash,
3333            }
3334        }
3335
3336        async fn send_new_payload(
3337            &mut self,
3338            block: RecoveredBlock<reth_ethereum_primitives::Block>,
3339        ) {
3340            let payload = ExecutionPayloadV3::from_block_unchecked(
3341                block.hash(),
3342                &block.clone_sealed_block().into_block(),
3343            );
3344            self.tree
3345                .on_new_payload(ExecutionData {
3346                    payload: payload.into(),
3347                    sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
3348                        parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
3349                        versioned_hashes: vec![],
3350                    }),
3351                })
3352                .unwrap();
3353        }
3354
3355        async fn insert_chain(
3356            &mut self,
3357            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3358        ) {
3359            for block in chain.clone() {
3360                self.insert_block(block.clone()).unwrap();
3361            }
3362            self.check_canon_chain_insertion(chain).await;
3363        }
3364
3365        async fn check_canon_commit(&mut self, hash: B256) {
3366            let event = self.from_tree_rx.recv().await.unwrap();
3367            match event {
3368                EngineApiEvent::BeaconConsensus(
3369                    BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3370                ) => {
3371                    assert_eq!(header.hash(), hash);
3372                }
3373                _ => panic!("Unexpected event: {:#?}", event),
3374            }
3375        }
3376
3377        async fn check_fork_chain_insertion(
3378            &mut self,
3379            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3380        ) {
3381            for block in chain {
3382                self.check_fork_block_added(block.hash()).await;
3383            }
3384        }
3385
3386        async fn check_canon_chain_insertion(
3387            &mut self,
3388            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3389        ) {
3390            for block in chain.clone() {
3391                self.check_canon_block_added(block.hash()).await;
3392            }
3393        }
3394
3395        async fn check_canon_block_added(&mut self, expected_hash: B256) {
3396            let event = self.from_tree_rx.recv().await.unwrap();
3397            match event {
3398                EngineApiEvent::BeaconConsensus(
3399                    BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
3400                ) => {
3401                    assert_eq!(executed.recovered_block.hash(), expected_hash);
3402                }
3403                _ => panic!("Unexpected event: {:#?}", event),
3404            }
3405        }
3406
3407        async fn check_fork_block_added(&mut self, expected_hash: B256) {
3408            let event = self.from_tree_rx.recv().await.unwrap();
3409            match event {
3410                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3411                    executed,
3412                    _,
3413                )) => {
3414                    assert_eq!(executed.recovered_block.hash(), expected_hash);
3415                }
3416                _ => panic!("Unexpected event: {:#?}", event),
3417            }
3418        }
3419
3420        async fn check_invalid_block(&mut self, expected_hash: B256) {
3421            let event = self.from_tree_rx.recv().await.unwrap();
3422            match event {
3423                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
3424                    block,
3425                )) => {
3426                    assert_eq!(block.hash(), expected_hash);
3427                }
3428                _ => panic!("Unexpected event: {:#?}", event),
3429            }
3430        }
3431
3432        fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
3433            let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3434            let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3435
3436            for block in &blocks {
3437                block_data.push((block.hash(), block.clone_block()));
3438                headers_data.push((block.hash(), block.header().clone()));
3439            }
3440
3441            self.provider.extend_blocks(block_data);
3442            self.provider.extend_headers(headers_data);
3443        }
3444
3445        fn setup_range_insertion_for_valid_chain(
3446            &mut self,
3447            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3448        ) {
3449            self.setup_range_insertion_for_chain(chain, None)
3450        }
3451
3452        fn setup_range_insertion_for_invalid_chain(
3453            &mut self,
3454            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3455            index: usize,
3456        ) {
3457            self.setup_range_insertion_for_chain(chain, Some(index))
3458        }
3459
3460        fn setup_range_insertion_for_chain(
3461            &mut self,
3462            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3463            invalid_index: Option<usize>,
3464        ) {
3465            // setting up execution outcomes for the chain, the blocks will be
3466            // executed starting from the oldest, so we need to reverse.
3467            let mut chain_rev = chain;
3468            chain_rev.reverse();
3469
3470            let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3471            for (index, block) in chain_rev.iter().enumerate() {
3472                let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3473                let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3474                    B256::random()
3475                } else {
3476                    block.state_root
3477                };
3478                self.tree.provider.add_state_root(state_root);
3479                execution_outcomes.push(execution_outcome);
3480            }
3481            self.extend_execution_outcome(execution_outcomes);
3482        }
3483
3484        fn check_canon_head(&self, head_hash: B256) {
3485            assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3486        }
3487    }
3488
3489    #[test]
3490    fn test_tree_persist_block_batch() {
3491        let tree_config = TreeConfig::default();
3492        let chain_spec = MAINNET.clone();
3493        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3494
3495        // we need more than tree_config.persistence_threshold() +1 blocks to
3496        // trigger the persistence task.
3497        let blocks: Vec<_> = test_block_builder
3498            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3499            .collect();
3500        let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3501
3502        let mut blocks = vec![];
3503        for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3504            blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3505        }
3506
3507        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3508
3509        // process the message
3510        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3511        test_harness.tree.on_engine_message(msg).unwrap();
3512
3513        // we now should receive the other batch
3514        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3515        match msg {
3516            FromEngine::DownloadedBlocks(blocks) => {
3517                assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3518            }
3519            _ => panic!("unexpected message: {:#?}", msg),
3520        }
3521    }
3522
3523    #[tokio::test]
3524    async fn test_tree_persist_blocks() {
3525        let tree_config = TreeConfig::default();
3526        let chain_spec = MAINNET.clone();
3527        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3528
3529        // we need more than tree_config.persistence_threshold() +1 blocks to
3530        // trigger the persistence task.
3531        let blocks: Vec<_> = test_block_builder
3532            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3533            .collect();
3534        let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3535        std::thread::Builder::new()
3536            .name("Tree Task".to_string())
3537            .spawn(|| test_harness.tree.run())
3538            .unwrap();
3539
3540        // send a message to the tree to enter the main loop.
3541        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3542
3543        let received_action =
3544            test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3545        if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3546            // only blocks.len() - tree_config.memory_block_buffer_target() will be
3547            // persisted
3548            let expected_persist_len =
3549                blocks.len() - tree_config.memory_block_buffer_target() as usize;
3550            assert_eq!(saved_blocks.len(), expected_persist_len);
3551            assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3552        } else {
3553            panic!("unexpected action received {received_action:?}");
3554        }
3555    }
3556
3557    #[tokio::test]
3558    async fn test_in_memory_state_trait_impl() {
3559        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect();
3560        let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3561
3562        for executed_block in blocks {
3563            let sealed_block = executed_block.recovered_block();
3564
3565            let expected_state = BlockState::new(executed_block.clone());
3566
3567            let actual_state_by_hash = test_harness
3568                .tree
3569                .canonical_in_memory_state
3570                .state_by_hash(sealed_block.hash())
3571                .unwrap();
3572            assert_eq!(expected_state, *actual_state_by_hash);
3573
3574            let actual_state_by_number = test_harness
3575                .tree
3576                .canonical_in_memory_state
3577                .state_by_number(sealed_block.number)
3578                .unwrap();
3579            assert_eq!(expected_state, *actual_state_by_number);
3580        }
3581    }
3582
3583    #[tokio::test]
3584    async fn test_engine_request_during_backfill() {
3585        let tree_config = TreeConfig::default();
3586        let blocks: Vec<_> = TestBlockBuilder::eth()
3587            .get_executed_blocks(0..tree_config.persistence_threshold())
3588            .collect();
3589        let mut test_harness = TestHarness::new(MAINNET.clone())
3590            .with_blocks(blocks)
3591            .with_backfill_state(BackfillSyncState::Active);
3592
3593        let (tx, rx) = oneshot::channel();
3594        test_harness
3595            .tree
3596            .on_engine_message(FromEngine::Request(
3597                BeaconEngineMessage::ForkchoiceUpdated {
3598                    state: ForkchoiceState {
3599                        head_block_hash: B256::random(),
3600                        safe_block_hash: B256::random(),
3601                        finalized_block_hash: B256::random(),
3602                    },
3603                    payload_attrs: None,
3604                    tx,
3605                    version: EngineApiMessageVersion::default(),
3606                }
3607                .into(),
3608            ))
3609            .unwrap();
3610
3611        let resp = rx.await.unwrap().unwrap().await.unwrap();
3612        assert!(resp.payload_status.is_syncing());
3613    }
3614
3615    #[test]
3616    fn test_disconnected_payload() {
3617        let s = include_str!("../../test-data/holesky/2.rlp");
3618        let data = Bytes::from_str(s).unwrap();
3619        let block = Block::decode(&mut data.as_ref()).unwrap();
3620        let sealed = block.seal_slow();
3621        let hash = sealed.hash();
3622        let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block());
3623
3624        let mut test_harness = TestHarness::new(HOLESKY.clone());
3625
3626        let outcome = test_harness
3627            .tree
3628            .on_new_payload(ExecutionData {
3629                payload: payload.into(),
3630                sidecar: ExecutionPayloadSidecar::none(),
3631            })
3632            .unwrap();
3633        assert!(outcome.outcome.is_syncing());
3634
3635        // ensure block is buffered
3636        let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3637        assert_eq!(buffered.clone_sealed_block(), sealed);
3638    }
3639
3640    #[test]
3641    fn test_disconnected_block() {
3642        let s = include_str!("../../test-data/holesky/2.rlp");
3643        let data = Bytes::from_str(s).unwrap();
3644        let block = Block::decode(&mut data.as_ref()).unwrap();
3645        let sealed = block.seal_slow().try_recover().unwrap();
3646
3647        let mut test_harness = TestHarness::new(HOLESKY.clone());
3648
3649        let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap();
3650        assert_eq!(
3651            outcome,
3652            InsertPayloadOk::Inserted(BlockStatus::Disconnected {
3653                head: test_harness.tree.state.tree_state.current_canonical_head,
3654                missing_ancestor: sealed.parent_num_hash()
3655            })
3656        );
3657    }
3658
3659    #[tokio::test]
3660    async fn test_holesky_payload() {
3661        let s = include_str!("../../test-data/holesky/1.rlp");
3662        let data = Bytes::from_str(s).unwrap();
3663        let block: Block = Block::decode(&mut data.as_ref()).unwrap();
3664        let sealed = block.seal_slow();
3665        let payload =
3666            ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
3667
3668        let mut test_harness =
3669            TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3670
3671        let (tx, rx) = oneshot::channel();
3672        test_harness
3673            .tree
3674            .on_engine_message(FromEngine::Request(
3675                BeaconEngineMessage::NewPayload {
3676                    payload: ExecutionData {
3677                        payload: payload.clone().into(),
3678                        sidecar: ExecutionPayloadSidecar::none(),
3679                    },
3680                    tx,
3681                }
3682                .into(),
3683            ))
3684            .unwrap();
3685
3686        let resp = rx.await.unwrap().unwrap();
3687        assert!(resp.is_syncing());
3688    }
3689
3690    #[test]
3691    fn test_tree_state_normal_descendant() {
3692        let mut tree_state = TreeState::new(BlockNumHash::default());
3693        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3694
3695        tree_state.insert_executed(blocks[0].clone());
3696        assert!(tree_state.is_descendant(
3697            blocks[0].recovered_block().num_hash(),
3698            blocks[1].recovered_block().header()
3699        ));
3700
3701        tree_state.insert_executed(blocks[1].clone());
3702
3703        assert!(tree_state.is_descendant(
3704            blocks[0].recovered_block().num_hash(),
3705            blocks[2].recovered_block().header()
3706        ));
3707        assert!(tree_state.is_descendant(
3708            blocks[1].recovered_block().num_hash(),
3709            blocks[2].recovered_block().header()
3710        ));
3711    }
3712
3713    #[tokio::test]
3714    async fn test_tree_state_insert_executed() {
3715        let mut tree_state = TreeState::new(BlockNumHash::default());
3716        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
3717
3718        tree_state.insert_executed(blocks[0].clone());
3719        tree_state.insert_executed(blocks[1].clone());
3720
3721        assert_eq!(
3722            tree_state.parent_to_child.get(&blocks[0].recovered_block().hash()),
3723            Some(&HashSet::from_iter([blocks[1].recovered_block().hash()]))
3724        );
3725
3726        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3727
3728        tree_state.insert_executed(blocks[2].clone());
3729
3730        assert_eq!(
3731            tree_state.parent_to_child.get(&blocks[1].recovered_block().hash()),
3732            Some(&HashSet::from_iter([blocks[2].recovered_block().hash()]))
3733        );
3734        assert!(tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3735
3736        assert!(!tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3737    }
3738
3739    #[tokio::test]
3740    async fn test_tree_state_insert_executed_with_reorg() {
3741        let mut tree_state = TreeState::new(BlockNumHash::default());
3742        let mut test_block_builder = TestBlockBuilder::eth();
3743        let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3744
3745        for block in &blocks {
3746            tree_state.insert_executed(block.clone());
3747        }
3748        assert_eq!(tree_state.blocks_by_hash.len(), 5);
3749
3750        let fork_block_3 = test_block_builder
3751            .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3752        let fork_block_4 = test_block_builder
3753            .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3754        let fork_block_5 = test_block_builder
3755            .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3756
3757        tree_state.insert_executed(fork_block_3.clone());
3758        tree_state.insert_executed(fork_block_4.clone());
3759        tree_state.insert_executed(fork_block_5.clone());
3760
3761        assert_eq!(tree_state.blocks_by_hash.len(), 8);
3762        assert_eq!(tree_state.blocks_by_number[&3].len(), 2); // two blocks at height 3 (original and fork)
3763        assert_eq!(tree_state.parent_to_child[&blocks[1].recovered_block().hash()].len(), 2); // block 2 should have two children
3764
3765        // verify that we can insert the same block again without issues
3766        tree_state.insert_executed(fork_block_4.clone());
3767        assert_eq!(tree_state.blocks_by_hash.len(), 8);
3768
3769        assert!(tree_state.parent_to_child[&fork_block_3.recovered_block().hash()]
3770            .contains(&fork_block_4.recovered_block().hash()));
3771        assert!(tree_state.parent_to_child[&fork_block_4.recovered_block().hash()]
3772            .contains(&fork_block_5.recovered_block().hash()));
3773
3774        assert_eq!(tree_state.blocks_by_number[&4].len(), 2);
3775        assert_eq!(tree_state.blocks_by_number[&5].len(), 2);
3776    }
3777
3778    #[tokio::test]
3779    async fn test_tree_state_remove_before() {
3780        let start_num_hash = BlockNumHash::default();
3781        let mut tree_state = TreeState::new(start_num_hash);
3782        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3783
3784        for block in &blocks {
3785            tree_state.insert_executed(block.clone());
3786        }
3787
3788        let last = blocks.last().unwrap();
3789
3790        // set the canonical head
3791        tree_state.set_canonical_head(last.recovered_block().num_hash());
3792
3793        // inclusive bound, so we should remove anything up to and including 2
3794        tree_state.remove_until(
3795            BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3796            start_num_hash.hash,
3797            Some(blocks[1].recovered_block().num_hash()),
3798        );
3799
3800        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3801        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3802        assert!(!tree_state.blocks_by_number.contains_key(&1));
3803        assert!(!tree_state.blocks_by_number.contains_key(&2));
3804
3805        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3806        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3807        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3808        assert!(tree_state.blocks_by_number.contains_key(&3));
3809        assert!(tree_state.blocks_by_number.contains_key(&4));
3810        assert!(tree_state.blocks_by_number.contains_key(&5));
3811
3812        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3813        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3814        assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3815        assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3816        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3817
3818        assert_eq!(
3819            tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3820            Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3821        );
3822        assert_eq!(
3823            tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3824            Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3825        );
3826    }
3827
3828    #[tokio::test]
3829    async fn test_tree_state_remove_before_finalized() {
3830        let start_num_hash = BlockNumHash::default();
3831        let mut tree_state = TreeState::new(start_num_hash);
3832        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3833
3834        for block in &blocks {
3835            tree_state.insert_executed(block.clone());
3836        }
3837
3838        let last = blocks.last().unwrap();
3839
3840        // set the canonical head
3841        tree_state.set_canonical_head(last.recovered_block().num_hash());
3842
3843        // we should still remove everything up to and including 2
3844        tree_state.remove_until(
3845            BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3846            start_num_hash.hash,
3847            None,
3848        );
3849
3850        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3851        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3852        assert!(!tree_state.blocks_by_number.contains_key(&1));
3853        assert!(!tree_state.blocks_by_number.contains_key(&2));
3854
3855        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3856        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3857        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3858        assert!(tree_state.blocks_by_number.contains_key(&3));
3859        assert!(tree_state.blocks_by_number.contains_key(&4));
3860        assert!(tree_state.blocks_by_number.contains_key(&5));
3861
3862        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3863        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3864        assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3865        assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3866        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3867
3868        assert_eq!(
3869            tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3870            Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3871        );
3872        assert_eq!(
3873            tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3874            Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3875        );
3876    }
3877
3878    #[tokio::test]
3879    async fn test_tree_state_remove_before_lower_finalized() {
3880        let start_num_hash = BlockNumHash::default();
3881        let mut tree_state = TreeState::new(start_num_hash);
3882        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..6).collect();
3883
3884        for block in &blocks {
3885            tree_state.insert_executed(block.clone());
3886        }
3887
3888        let last = blocks.last().unwrap();
3889
3890        // set the canonical head
3891        tree_state.set_canonical_head(last.recovered_block().num_hash());
3892
3893        // we have no forks so we should still remove anything up to and including 2
3894        tree_state.remove_until(
3895            BlockNumHash::new(2, blocks[1].recovered_block().hash()),
3896            start_num_hash.hash,
3897            Some(blocks[0].recovered_block().num_hash()),
3898        );
3899
3900        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].recovered_block().hash()));
3901        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].recovered_block().hash()));
3902        assert!(!tree_state.blocks_by_number.contains_key(&1));
3903        assert!(!tree_state.blocks_by_number.contains_key(&2));
3904
3905        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].recovered_block().hash()));
3906        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].recovered_block().hash()));
3907        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].recovered_block().hash()));
3908        assert!(tree_state.blocks_by_number.contains_key(&3));
3909        assert!(tree_state.blocks_by_number.contains_key(&4));
3910        assert!(tree_state.blocks_by_number.contains_key(&5));
3911
3912        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].recovered_block().hash()));
3913        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
3914        assert!(tree_state.parent_to_child.contains_key(&blocks[2].recovered_block().hash()));
3915        assert!(tree_state.parent_to_child.contains_key(&blocks[3].recovered_block().hash()));
3916        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].recovered_block().hash()));
3917
3918        assert_eq!(
3919            tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
3920            Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
3921        );
3922        assert_eq!(
3923            tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
3924            Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
3925        );
3926    }
3927
3928    #[tokio::test]
3929    async fn test_tree_state_on_new_head_reorg() {
3930        reth_tracing::init_test_tracing();
3931        let chain_spec = MAINNET.clone();
3932
3933        // Set persistence_threshold to 1
3934        let mut test_harness = TestHarness::new(chain_spec);
3935        test_harness.tree.config = test_harness
3936            .tree
3937            .config
3938            .with_persistence_threshold(1)
3939            .with_memory_block_buffer_target(1);
3940        let mut test_block_builder = TestBlockBuilder::eth();
3941        let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3942
3943        for block in &blocks {
3944            test_harness.tree.state.tree_state.insert_executed(block.clone());
3945        }
3946
3947        // set block 3 as the current canonical head
3948        test_harness
3949            .tree
3950            .state
3951            .tree_state
3952            .set_canonical_head(blocks[2].recovered_block().num_hash());
3953
3954        // create a fork from block 2
3955        let fork_block_3 = test_block_builder
3956            .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3957        let fork_block_4 = test_block_builder
3958            .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3959        let fork_block_5 = test_block_builder
3960            .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3961
3962        test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3963        test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3964        test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3965
3966        // normal (non-reorg) case
3967        let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap();
3968        assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3969        if let Some(NewCanonicalChain::Commit { new }) = result {
3970            assert_eq!(new.len(), 2);
3971            assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash());
3972            assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash());
3973        }
3974
3975        // should be a None persistence action before we advance persistence
3976        let current_action = test_harness.tree.persistence_state.current_action();
3977        assert_eq!(current_action, None);
3978
3979        // let's attempt to persist and check that it attempts to save blocks
3980        //
3981        // since in-memory block buffer target and persistence_threshold are both 1, this should
3982        // save all but the current tip of the canonical chain (up to blocks[1])
3983        test_harness.tree.advance_persistence().unwrap();
3984        let current_action = test_harness.tree.persistence_state.current_action().cloned();
3985        assert_eq!(
3986            current_action,
3987            Some(CurrentPersistenceAction::SavingBlocks {
3988                highest: blocks[1].recovered_block().num_hash()
3989            })
3990        );
3991
3992        // get rid of the prev action
3993        let received_action = test_harness.action_rx.recv().unwrap();
3994        let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
3995            panic!("received wrong action");
3996        };
3997        assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
3998
3999        // send the response so we can advance again
4000        sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap();
4001
4002        // we should be persisting blocks[1] because we threw out the prev action
4003        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4004        assert_eq!(
4005            current_action,
4006            Some(CurrentPersistenceAction::SavingBlocks {
4007                highest: blocks[1].recovered_block().num_hash()
4008            })
4009        );
4010
4011        // after advancing persistence, we should be at `None` for the next action
4012        test_harness.tree.advance_persistence().unwrap();
4013        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4014        assert_eq!(current_action, None);
4015
4016        // reorg case
4017        let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap();
4018        assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
4019
4020        if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4021            assert_eq!(new.len(), 3);
4022            assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash());
4023            assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash());
4024            assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash());
4025
4026            assert_eq!(old.len(), 1);
4027            assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash());
4028        }
4029
4030        // The canonical block has not changed, so we will not get any active persistence action
4031        test_harness.tree.advance_persistence().unwrap();
4032        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4033        assert_eq!(current_action, None);
4034
4035        // Let's change the canonical head and advance persistence
4036        test_harness
4037            .tree
4038            .state
4039            .tree_state
4040            .set_canonical_head(fork_block_5.recovered_block().num_hash());
4041
4042        // The canonical block has changed now, we should get fork_block_4 due to the persistence
4043        // threshold and in memory block buffer target
4044        test_harness.tree.advance_persistence().unwrap();
4045        let current_action = test_harness.tree.persistence_state.current_action().cloned();
4046        assert_eq!(
4047            current_action,
4048            Some(CurrentPersistenceAction::SavingBlocks {
4049                highest: fork_block_4.recovered_block().num_hash()
4050            })
4051        );
4052    }
4053
4054    #[test]
4055    fn test_tree_state_on_new_head_deep_fork() {
4056        reth_tracing::init_test_tracing();
4057
4058        let chain_spec = MAINNET.clone();
4059        let mut test_harness = TestHarness::new(chain_spec);
4060        let mut test_block_builder = TestBlockBuilder::eth();
4061
4062        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4063
4064        for block in &blocks {
4065            test_harness.tree.state.tree_state.insert_executed(block.clone());
4066        }
4067
4068        // set last block as the current canonical head
4069        let last_block = blocks.last().unwrap().recovered_block().clone();
4070
4071        test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
4072
4073        // create a fork chain from last_block
4074        let chain_a = test_block_builder.create_fork(&last_block, 10);
4075        let chain_b = test_block_builder.create_fork(&last_block, 10);
4076
4077        for block in &chain_a {
4078            test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4079                block: ExecutedBlock {
4080                    recovered_block: Arc::new(block.clone()),
4081                    execution_output: Arc::new(ExecutionOutcome::default()),
4082                    hashed_state: Arc::new(HashedPostState::default()),
4083                },
4084                trie: Arc::new(TrieUpdates::default()),
4085            });
4086        }
4087        test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
4088
4089        for block in &chain_b {
4090            test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
4091                block: ExecutedBlock {
4092                    recovered_block: Arc::new(block.clone()),
4093                    execution_output: Arc::new(ExecutionOutcome::default()),
4094                    hashed_state: Arc::new(HashedPostState::default()),
4095                },
4096                trie: Arc::new(TrieUpdates::default()),
4097            });
4098        }
4099
4100        // for each block in chain_b, reorg to it and then back to canonical
4101        let mut expected_new = Vec::new();
4102        for block in &chain_b {
4103            // reorg to chain from block b
4104            let result = test_harness.tree.on_new_head(block.hash()).unwrap();
4105            assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
4106
4107            expected_new.push(block);
4108            if let Some(NewCanonicalChain::Reorg { new, old }) = result {
4109                assert_eq!(new.len(), expected_new.len());
4110                for (index, block) in expected_new.iter().enumerate() {
4111                    assert_eq!(new[index].recovered_block().hash(), block.hash());
4112                }
4113
4114                assert_eq!(old.len(), chain_a.len());
4115                for (index, block) in chain_a.iter().enumerate() {
4116                    assert_eq!(old[index].recovered_block().hash(), block.hash());
4117                }
4118            }
4119
4120            // set last block of chain a as canonical head
4121            test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
4122        }
4123    }
4124
4125    #[tokio::test]
4126    async fn test_get_canonical_blocks_to_persist() {
4127        let chain_spec = MAINNET.clone();
4128        let mut test_harness = TestHarness::new(chain_spec);
4129        let mut test_block_builder = TestBlockBuilder::eth();
4130
4131        let canonical_head_number = 9;
4132        let blocks: Vec<_> =
4133            test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
4134        test_harness = test_harness.with_blocks(blocks.clone());
4135
4136        let last_persisted_block_number = 3;
4137        test_harness.tree.persistence_state.last_persisted_block.number =
4138            last_persisted_block_number;
4139
4140        let persistence_threshold = 4;
4141        let memory_block_buffer_target = 3;
4142        test_harness.tree.config = TreeConfig::default()
4143            .with_persistence_threshold(persistence_threshold)
4144            .with_memory_block_buffer_target(memory_block_buffer_target);
4145
4146        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4147
4148        let expected_blocks_to_persist_length: usize =
4149            (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
4150                .try_into()
4151                .unwrap();
4152
4153        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4154        for (i, item) in
4155            blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
4156        {
4157            assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
4158        }
4159
4160        // make sure only canonical blocks are included
4161        let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
4162        let fork_block_hash = fork_block.recovered_block().hash();
4163        test_harness.tree.state.tree_state.insert_executed(fork_block);
4164
4165        assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
4166
4167        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
4168        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
4169
4170        // check that the fork block is not included in the blocks to persist
4171        assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
4172
4173        // check that the original block 4 is still included
4174        assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
4175            b.recovered_block().hash() == blocks[4].recovered_block().hash()));
4176
4177        // check that if we advance persistence, the persistence action is the correct value
4178        test_harness.tree.advance_persistence().expect("advancing persistence should succeed");
4179        assert_eq!(
4180            test_harness.tree.persistence_state.current_action().cloned(),
4181            Some(CurrentPersistenceAction::SavingBlocks {
4182                highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
4183            })
4184        );
4185    }
4186
4187    #[tokio::test]
4188    async fn test_engine_tree_fcu_missing_head() {
4189        let chain_spec = MAINNET.clone();
4190        let mut test_harness = TestHarness::new(chain_spec.clone());
4191
4192        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
4193
4194        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
4195        test_harness = test_harness.with_blocks(blocks);
4196
4197        let missing_block = test_block_builder
4198            .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash());
4199
4200        test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
4201
4202        // after FCU we receive an EngineApiEvent::Download event to get the missing block.
4203        let event = test_harness.from_tree_rx.recv().await.unwrap();
4204        match event {
4205            EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
4206                let expected_block_set = HashSet::from_iter([missing_block.hash()]);
4207                assert_eq!(actual_block_set, expected_block_set);
4208            }
4209            _ => panic!("Unexpected event: {:#?}", event),
4210        }
4211    }
4212
4213    #[tokio::test]
4214    async fn test_engine_tree_fcu_canon_chain_insertion() {
4215        let chain_spec = MAINNET.clone();
4216        let mut test_harness = TestHarness::new(chain_spec.clone());
4217
4218        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4219        test_harness = test_harness.with_blocks(base_chain.clone());
4220
4221        test_harness
4222            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4223            .await;
4224
4225        // extend main chain
4226        let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3);
4227
4228        test_harness.insert_chain(main_chain).await;
4229    }
4230
4231    #[tokio::test]
4232    async fn test_engine_tree_fcu_reorg_with_all_blocks() {
4233        let chain_spec = MAINNET.clone();
4234        let mut test_harness = TestHarness::new(chain_spec.clone());
4235
4236        let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
4237        test_harness = test_harness.with_blocks(main_chain.clone());
4238
4239        let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3);
4240        let fork_chain_last_hash = fork_chain.last().unwrap().hash();
4241
4242        // add fork blocks to the tree
4243        for block in &fork_chain {
4244            test_harness.insert_block(block.clone()).unwrap();
4245        }
4246
4247        test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4248
4249        // check for ForkBlockAdded events, we expect fork_chain.len() blocks added
4250        test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
4251
4252        // check for CanonicalChainCommitted event
4253        test_harness.check_canon_commit(fork_chain_last_hash).await;
4254
4255        test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
4256
4257        // new head is the tip of the fork chain
4258        test_harness.check_canon_head(fork_chain_last_hash);
4259    }
4260
4261    #[tokio::test]
4262    async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
4263        reth_tracing::init_test_tracing();
4264
4265        let chain_spec = MAINNET.clone();
4266        let mut test_harness = TestHarness::new(chain_spec.clone());
4267
4268        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4269        test_harness = test_harness.with_blocks(base_chain.clone());
4270
4271        test_harness
4272            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4273            .await;
4274
4275        // extend main chain with enough blocks to trigger pipeline run but don't insert them
4276        let main_chain = test_harness
4277            .block_builder
4278            .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4279
4280        let main_chain_last_hash = main_chain.last().unwrap().hash();
4281        test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4282
4283        test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4284
4285        // create event for backfill finished
4286        let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
4287        let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
4288            block_number: backfill_finished_block_number,
4289        });
4290
4291        let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
4292        // add block to mock provider to enable persistence clean up.
4293        test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
4294        test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
4295
4296        let event = test_harness.from_tree_rx.recv().await.unwrap();
4297        match event {
4298            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4299                assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
4300            }
4301            _ => panic!("Unexpected event: {:#?}", event),
4302        }
4303
4304        test_harness
4305            .tree
4306            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
4307                .last()
4308                .unwrap()
4309                .clone()]))
4310            .unwrap();
4311
4312        let event = test_harness.from_tree_rx.recv().await.unwrap();
4313        match event {
4314            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4315                assert_eq!(
4316                    total_blocks,
4317                    (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
4318                );
4319                assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
4320            }
4321            _ => panic!("Unexpected event: {:#?}", event),
4322        }
4323    }
4324
4325    #[tokio::test]
4326    async fn test_engine_tree_live_sync_transition_eventually_canonical() {
4327        reth_tracing::init_test_tracing();
4328
4329        let chain_spec = MAINNET.clone();
4330        let mut test_harness = TestHarness::new(chain_spec.clone());
4331        test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
4332
4333        // create base chain and setup test harness with it
4334        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4335        test_harness = test_harness.with_blocks(base_chain.clone());
4336
4337        // fcu to the tip of base chain
4338        test_harness
4339            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4340            .await;
4341
4342        // create main chain, extension of base chain, with enough blocks to
4343        // trigger backfill sync
4344        let main_chain = test_harness
4345            .block_builder
4346            .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
4347
4348        let main_chain_last = main_chain.last().unwrap();
4349        let main_chain_last_hash = main_chain_last.hash();
4350        let main_chain_backfill_target =
4351            main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
4352        let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
4353
4354        // fcu to the element of main chain that should trigger backfill sync
4355        test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4356        test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
4357
4358        // check download request for target
4359        let event = test_harness.from_tree_rx.recv().await.unwrap();
4360        match event {
4361            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
4362                assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
4363            }
4364            _ => panic!("Unexpected event: {:#?}", event),
4365        }
4366
4367        // send message to tell the engine the requested block was downloaded
4368        test_harness
4369            .tree
4370            .on_engine_message(FromEngine::DownloadedBlocks(vec![
4371                main_chain_backfill_target.clone()
4372            ]))
4373            .unwrap();
4374
4375        // check that backfill is triggered
4376        let event = test_harness.from_tree_rx.recv().await.unwrap();
4377        match event {
4378            EngineApiEvent::BackfillAction(BackfillAction::Start(
4379                reth_stages::PipelineTarget::Sync(target_hash),
4380            )) => {
4381                assert_eq!(target_hash, main_chain_backfill_target_hash);
4382            }
4383            _ => panic!("Unexpected event: {:#?}", event),
4384        }
4385
4386        // persist blocks of main chain, same as the backfill operation would do
4387        let backfilled_chain: Vec<_> =
4388            main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
4389        test_harness.persist_blocks(backfilled_chain.clone());
4390
4391        test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
4392
4393        // send message to mark backfill finished
4394        test_harness
4395            .tree
4396            .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
4397                ControlFlow::Continue { block_number: main_chain_backfill_target.number },
4398            )))
4399            .unwrap();
4400
4401        // send fcu to the tip of main
4402        test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
4403
4404        let event = test_harness.from_tree_rx.recv().await.unwrap();
4405        match event {
4406            EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
4407                assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
4408            }
4409            _ => panic!("Unexpected event: {:#?}", event),
4410        }
4411
4412        // tell engine main chain tip downloaded
4413        test_harness
4414            .tree
4415            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
4416            .unwrap();
4417
4418        // check download range request
4419        let event = test_harness.from_tree_rx.recv().await.unwrap();
4420        match event {
4421            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
4422                assert_eq!(
4423                    total_blocks,
4424                    (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
4425                );
4426                assert_eq!(initial_hash, main_chain_last.parent_hash);
4427            }
4428            _ => panic!("Unexpected event: {:#?}", event),
4429        }
4430
4431        let remaining: Vec<_> = main_chain
4432            .clone()
4433            .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
4434            .collect();
4435
4436        test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
4437
4438        // tell engine block range downloaded
4439        test_harness
4440            .tree
4441            .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
4442            .unwrap();
4443
4444        test_harness.check_canon_chain_insertion(remaining).await;
4445
4446        // check canonical chain committed event with the hash of the latest block
4447        test_harness.check_canon_commit(main_chain_last_hash).await;
4448
4449        // new head is the tip of the main chain
4450        test_harness.check_canon_head(main_chain_last_hash);
4451    }
4452
4453    #[tokio::test]
4454    async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
4455        reth_tracing::init_test_tracing();
4456
4457        let chain_spec = MAINNET.clone();
4458        let mut test_harness = TestHarness::new(chain_spec.clone());
4459
4460        // create base chain and setup test harness with it
4461        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4462        test_harness = test_harness.with_blocks(base_chain.clone());
4463
4464        // fcu to the tip of base chain
4465        test_harness
4466            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4467            .await;
4468
4469        // create main chain, extension of base chain
4470        let main_chain =
4471            test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10);
4472        // determine target in the middle of main hain
4473        let target = main_chain.get(5).unwrap();
4474        let target_hash = target.hash();
4475        let main_last = main_chain.last().unwrap();
4476        let main_last_hash = main_last.hash();
4477
4478        // insert main chain
4479        test_harness.insert_chain(main_chain).await;
4480
4481        // send fcu to target
4482        test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4483
4484        test_harness.check_canon_commit(target_hash).await;
4485        test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4486
4487        // send fcu to main tip
4488        test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4489
4490        test_harness.check_canon_commit(main_last_hash).await;
4491        test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4492        test_harness.check_canon_head(main_last_hash);
4493    }
4494
4495    #[tokio::test]
4496    async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4497        reth_tracing::init_test_tracing();
4498
4499        let chain_spec = MAINNET.clone();
4500        let mut test_harness = TestHarness::new(chain_spec.clone());
4501
4502        // create base chain and setup test harness with it
4503        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4504        test_harness = test_harness.with_blocks(base_chain.clone());
4505
4506        let old_head = base_chain.first().unwrap().recovered_block();
4507
4508        // extend base chain
4509        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4510        let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4511
4512        test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4513        test_harness.insert_chain(extension_chain).await;
4514
4515        // fcu to old_head
4516        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4517
4518        // create two competing chains starting from fork_block
4519        let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4520        let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4521
4522        // insert chain A blocks using newPayload
4523        test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4524        for block in &chain_a {
4525            test_harness.send_new_payload(block.clone()).await;
4526        }
4527
4528        test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4529
4530        // insert chain B blocks using newPayload
4531        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4532        for block in &chain_b {
4533            test_harness.send_new_payload(block.clone()).await;
4534        }
4535
4536        test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4537
4538        // send FCU to make the tip of chain B the new head
4539        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4540        test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4541
4542        // check for CanonicalChainCommitted event
4543        test_harness.check_canon_commit(chain_b_tip_hash).await;
4544
4545        // verify FCU was processed
4546        test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4547
4548        // verify the new canonical head
4549        test_harness.check_canon_head(chain_b_tip_hash);
4550
4551        // verify that chain A is now considered a fork
4552        assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4553    }
4554
4555    #[tokio::test]
4556    async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4557        let chain_spec = MAINNET.clone();
4558        let mut test_harness = TestHarness::new(chain_spec.clone());
4559
4560        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4561        test_harness = test_harness.with_blocks(base_chain.clone());
4562
4563        // side chain consisting of two blocks, the last will be inserted first
4564        // so that we force it to be buffered
4565        let side_chain =
4566            test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2);
4567
4568        // buffer last block of side chain
4569        let buffered_block = side_chain.last().unwrap();
4570        let buffered_block_hash = buffered_block.hash();
4571
4572        test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4573        test_harness.send_new_payload(buffered_block.clone()).await;
4574
4575        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4576
4577        let non_buffered_block = side_chain.first().unwrap();
4578        let non_buffered_block_hash = non_buffered_block.hash();
4579
4580        // insert block that continues the canon chain, should not be buffered
4581        test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4582        test_harness.send_new_payload(non_buffered_block.clone()).await;
4583        assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4584
4585        // the previously buffered block should be connected now
4586        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4587
4588        // both blocks are added to the canon chain in order
4589        test_harness.check_canon_block_added(non_buffered_block_hash).await;
4590        test_harness.check_canon_block_added(buffered_block_hash).await;
4591    }
4592
4593    #[tokio::test]
4594    async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4595        reth_tracing::init_test_tracing();
4596
4597        let chain_spec = MAINNET.clone();
4598        let mut test_harness = TestHarness::new(chain_spec.clone());
4599
4600        // create base chain and setup test harness with it
4601        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4602        test_harness = test_harness.with_blocks(base_chain.clone());
4603
4604        let old_head = base_chain.first().unwrap().recovered_block();
4605
4606        // extend base chain
4607        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4608        let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4609        test_harness.insert_chain(extension_chain).await;
4610
4611        // fcu to old_head
4612        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4613
4614        // create two competing chains starting from fork_block, one of them invalid
4615        let total_fork_elements = 10;
4616        let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4617        let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4618
4619        // insert chain B blocks using newPayload
4620        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4621        for block in &chain_b {
4622            test_harness.send_new_payload(block.clone()).await;
4623            test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4624            test_harness.check_canon_block_added(block.hash()).await;
4625            test_harness.check_canon_commit(block.hash()).await;
4626            test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4627        }
4628
4629        // insert chain A blocks using newPayload, one of the blocks will be invalid
4630        let invalid_index = 3;
4631        test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4632        for block in &chain_a {
4633            test_harness.send_new_payload(block.clone()).await;
4634        }
4635
4636        // check canon chain insertion up to the invalid index and taking into
4637        // account reversed ordering
4638        test_harness
4639            .check_fork_chain_insertion(
4640                chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4641            )
4642            .await;
4643        for block in &chain_a[chain_a.len() - invalid_index - 1..] {
4644            test_harness.check_invalid_block(block.hash()).await;
4645        }
4646
4647        // send FCU to make the tip of chain A, expect invalid
4648        let chain_a_tip_hash = chain_a.last().unwrap().hash();
4649        test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4650
4651        // send FCU to make the tip of chain B the new head
4652        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4653
4654        // verify the new canonical head
4655        test_harness.check_canon_head(chain_b_tip_hash);
4656
4657        // verify the canonical head didn't change
4658        test_harness.check_canon_head(chain_b_tip_hash);
4659    }
4660
4661    #[tokio::test]
4662    async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4663        reth_tracing::init_test_tracing();
4664        let chain_spec = MAINNET.clone();
4665        let mut test_harness = TestHarness::new(chain_spec.clone());
4666
4667        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4668        test_harness = test_harness.with_blocks(base_chain.clone());
4669
4670        // create a side chain with an invalid block
4671        let side_chain = test_harness
4672            .block_builder
4673            .create_fork(base_chain.last().unwrap().recovered_block(), 15);
4674        let invalid_index = 9;
4675
4676        test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4677
4678        for (index, block) in side_chain.iter().enumerate() {
4679            test_harness.send_new_payload(block.clone()).await;
4680
4681            if index < side_chain.len() - invalid_index - 1 {
4682                test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4683            }
4684        }
4685
4686        // Try to do a forkchoice update to a block after the invalid one
4687        let fork_tip_hash = side_chain.last().unwrap().hash();
4688        test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4689    }
4690}