Skip to main content

reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
17    NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, OnForkChoiceUpdated,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{
28    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
29};
30use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
31use reth_provider::{
32    BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33    DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34    StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35    StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::spawn_os_thread;
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{fmt::Debug, ops, sync::Arc, time::Instant};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48    oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53mod cached_state;
54pub mod error;
55pub mod instrumented_state;
56mod invalid_headers;
57mod metrics;
58pub mod payload_processor;
59pub mod payload_validator;
60mod persistence_state;
61pub mod precompile_cache;
62#[cfg(test)]
63mod tests;
64mod trie_updates;
65
66use crate::tree::error::AdvancePersistenceError;
67pub use block_buffer::BlockBuffer;
68pub use cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache};
69pub use invalid_headers::InvalidHeaderCache;
70pub use metrics::EngineApiMetrics;
71pub use payload_processor::*;
72pub use payload_validator::{BasicEngineValidator, EngineValidator};
73pub use persistence_state::PersistenceState;
74pub use reth_engine_primitives::TreeConfig;
75
76pub mod state;
77
78/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
79///
80/// This is the default threshold, and represents the distance (gap) from the local head to a
81/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
82/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
83///
84/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
85/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
86/// backfill this gap.
87pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
88
89/// The minimum number of blocks to retain in the changeset cache after eviction.
90///
91/// This ensures that recent trie changesets are kept in memory for potential reorgs,
92/// even when the finalized block is not set (e.g., on L2s like Optimism).
93const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
94
95/// A builder for creating state providers that can be used across threads.
96#[derive(Clone, Debug)]
97pub struct StateProviderBuilder<N: NodePrimitives, P> {
98    /// The provider factory used to create providers.
99    provider_factory: P,
100    /// The historical block hash to fetch state from.
101    historical: B256,
102    /// The blocks that form the chain from historical to target and are in memory.
103    overlay: Option<Vec<ExecutedBlock<N>>>,
104}
105
106impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
107    /// Creates a new state provider from the provider factory, historical block hash and optional
108    /// overlaid blocks.
109    pub const fn new(
110        provider_factory: P,
111        historical: B256,
112        overlay: Option<Vec<ExecutedBlock<N>>>,
113    ) -> Self {
114        Self { provider_factory, historical, overlay }
115    }
116}
117
118impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
119where
120    P: BlockReader + StateProviderFactory + StateReader + Clone,
121{
122    /// Creates a new state provider from this builder.
123    pub fn build(&self) -> ProviderResult<StateProviderBox> {
124        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
125        if let Some(overlay) = self.overlay.clone() {
126            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
127        }
128        Ok(provider)
129    }
130}
131
132/// Tracks the state of the engine api internals.
133///
134/// This type is not shareable.
135#[derive(Debug)]
136pub struct EngineApiTreeState<N: NodePrimitives> {
137    /// Tracks the state of the blockchain tree.
138    tree_state: TreeState<N>,
139    /// Tracks the forkchoice state updates received by the CL.
140    forkchoice_state_tracker: ForkchoiceStateTracker,
141    /// Buffer of detached blocks.
142    buffer: BlockBuffer<N::Block>,
143    /// Tracks the header of invalid payloads that were rejected by the engine because they're
144    /// invalid.
145    invalid_headers: InvalidHeaderCache,
146}
147
148impl<N: NodePrimitives> EngineApiTreeState<N> {
149    fn new(
150        block_buffer_limit: u32,
151        max_invalid_header_cache_length: u32,
152        canonical_block: BlockNumHash,
153        engine_kind: EngineApiKind,
154    ) -> Self {
155        Self {
156            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
157            buffer: BlockBuffer::new(block_buffer_limit),
158            tree_state: TreeState::new(canonical_block, engine_kind),
159            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
160        }
161    }
162
163    /// Returns a reference to the tree state.
164    pub const fn tree_state(&self) -> &TreeState<N> {
165        &self.tree_state
166    }
167
168    /// Returns true if the block has been marked as invalid.
169    pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
170        self.invalid_headers.get(hash).is_some()
171    }
172}
173
174/// The outcome of a tree operation.
175#[derive(Debug)]
176pub struct TreeOutcome<T> {
177    /// The outcome of the operation.
178    pub outcome: T,
179    /// An optional event to tell the caller to do something.
180    pub event: Option<TreeEvent>,
181}
182
183impl<T> TreeOutcome<T> {
184    /// Create new tree outcome.
185    pub const fn new(outcome: T) -> Self {
186        Self { outcome, event: None }
187    }
188
189    /// Set event on the outcome.
190    pub fn with_event(mut self, event: TreeEvent) -> Self {
191        self.event = Some(event);
192        self
193    }
194}
195
196/// Events that are triggered by Tree Chain
197#[derive(Debug)]
198pub enum TreeEvent {
199    /// Tree action is needed.
200    TreeAction(TreeAction),
201    /// Backfill action is needed.
202    BackfillAction(BackfillAction),
203    /// Block download is needed.
204    Download(DownloadRequest),
205}
206
207impl TreeEvent {
208    /// Returns true if the event is a backfill action.
209    const fn is_backfill_action(&self) -> bool {
210        matches!(self, Self::BackfillAction(_))
211    }
212}
213
214/// The actions that can be performed on the tree.
215#[derive(Debug)]
216pub enum TreeAction {
217    /// Make target canonical.
218    MakeCanonical {
219        /// The sync target head hash
220        sync_target_head: B256,
221    },
222}
223
224/// The engine API tree handler implementation.
225///
226/// This type is responsible for processing engine API requests, maintaining the canonical state and
227/// emitting events.
228pub struct EngineApiTreeHandler<N, P, T, V, C>
229where
230    N: NodePrimitives,
231    T: PayloadTypes,
232    C: ConfigureEvm<Primitives = N> + 'static,
233{
234    provider: P,
235    consensus: Arc<dyn FullConsensus<N>>,
236    payload_validator: V,
237    /// Keeps track of internals such as executed and buffered blocks.
238    state: EngineApiTreeState<N>,
239    /// The half for sending messages to the engine.
240    ///
241    /// This is kept so that we can queue in messages to ourself that we can process later, for
242    /// example distributing workload across multiple messages that would otherwise take too long
243    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
244    /// them one by one so that we can handle incoming engine API in between and don't become
245    /// unresponsive. This can happen during live sync transition where we're trying to close the
246    /// gap (up to 3 epochs of blocks in the worst case).
247    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
248    /// Incoming engine API requests.
249    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
250    /// Outgoing events that are emitted to the handler.
251    outgoing: UnboundedSender<EngineApiEvent<N>>,
252    /// Channels to the persistence layer.
253    persistence: PersistenceHandle<N>,
254    /// Tracks the state changes of the persistence task.
255    persistence_state: PersistenceState,
256    /// Flag indicating the state of the node's backfill synchronization process.
257    backfill_sync_state: BackfillSyncState,
258    /// Keeps track of the state of the canonical chain that isn't persisted yet.
259    /// This is intended to be accessed from external sources, such as rpc.
260    canonical_in_memory_state: CanonicalInMemoryState<N>,
261    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
262    /// updates
263    payload_builder: PayloadBuilderHandle<T>,
264    /// Configuration settings.
265    config: TreeConfig,
266    /// Metrics for the engine api.
267    metrics: EngineApiMetrics,
268    /// The engine API variant of this handler
269    engine_kind: EngineApiKind,
270    /// The EVM configuration.
271    evm_config: C,
272    /// Changeset cache for in-memory trie changesets
273    changeset_cache: ChangesetCache,
274    /// Whether the node uses hashed state as canonical storage (v2 mode).
275    /// Cached at construction to avoid threading `StorageSettingsCache` bounds everywhere.
276    use_hashed_state: bool,
277}
278
279impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
280    for EngineApiTreeHandler<N, P, T, V, C>
281where
282    N: NodePrimitives,
283    C: Debug + ConfigureEvm<Primitives = N>,
284{
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        f.debug_struct("EngineApiTreeHandler")
287            .field("provider", &self.provider)
288            .field("consensus", &self.consensus)
289            .field("payload_validator", &self.payload_validator)
290            .field("state", &self.state)
291            .field("incoming_tx", &self.incoming_tx)
292            .field("persistence", &self.persistence)
293            .field("persistence_state", &self.persistence_state)
294            .field("backfill_sync_state", &self.backfill_sync_state)
295            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
296            .field("payload_builder", &self.payload_builder)
297            .field("config", &self.config)
298            .field("metrics", &self.metrics)
299            .field("engine_kind", &self.engine_kind)
300            .field("evm_config", &self.evm_config)
301            .field("changeset_cache", &self.changeset_cache)
302            .field("use_hashed_state", &self.use_hashed_state)
303            .finish()
304    }
305}
306
307impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
308where
309    N: NodePrimitives,
310    P: DatabaseProviderFactory
311        + BlockReader<Block = N::Block, Header = N::BlockHeader>
312        + StateProviderFactory
313        + StateReader<Receipt = N::Receipt>
314        + HashedPostStateProvider
315        + Clone
316        + 'static,
317    P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
318        + StageCheckpointReader
319        + ChangeSetReader
320        + StorageChangeSetReader
321        + StorageSettingsCache,
322    C: ConfigureEvm<Primitives = N> + 'static,
323    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
324    V: EngineValidator<T>,
325{
326    /// Creates a new [`EngineApiTreeHandler`].
327    #[expect(clippy::too_many_arguments)]
328    pub fn new(
329        provider: P,
330        consensus: Arc<dyn FullConsensus<N>>,
331        payload_validator: V,
332        outgoing: UnboundedSender<EngineApiEvent<N>>,
333        state: EngineApiTreeState<N>,
334        canonical_in_memory_state: CanonicalInMemoryState<N>,
335        persistence: PersistenceHandle<N>,
336        persistence_state: PersistenceState,
337        payload_builder: PayloadBuilderHandle<T>,
338        config: TreeConfig,
339        engine_kind: EngineApiKind,
340        evm_config: C,
341        changeset_cache: ChangesetCache,
342        use_hashed_state: bool,
343    ) -> Self {
344        let (incoming_tx, incoming) = crossbeam_channel::unbounded();
345
346        Self {
347            provider,
348            consensus,
349            payload_validator,
350            incoming,
351            outgoing,
352            persistence,
353            persistence_state,
354            backfill_sync_state: BackfillSyncState::Idle,
355            state,
356            canonical_in_memory_state,
357            payload_builder,
358            config,
359            metrics: Default::default(),
360            incoming_tx,
361            engine_kind,
362            evm_config,
363            changeset_cache,
364            use_hashed_state,
365        }
366    }
367
368    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
369    /// own thread.
370    ///
371    /// Returns the sender through which incoming requests can be sent to the task and the receiver
372    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
373    #[expect(clippy::complexity)]
374    pub fn spawn_new(
375        provider: P,
376        consensus: Arc<dyn FullConsensus<N>>,
377        payload_validator: V,
378        persistence: PersistenceHandle<N>,
379        payload_builder: PayloadBuilderHandle<T>,
380        canonical_in_memory_state: CanonicalInMemoryState<N>,
381        config: TreeConfig,
382        kind: EngineApiKind,
383        evm_config: C,
384        changeset_cache: ChangesetCache,
385        use_hashed_state: bool,
386    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
387    {
388        let best_block_number = provider.best_block_number().unwrap_or(0);
389        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
390
391        let persistence_state = PersistenceState {
392            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
393            rx: None,
394        };
395
396        let (tx, outgoing) = unbounded_channel();
397        let state = EngineApiTreeState::new(
398            config.block_buffer_limit(),
399            config.max_invalid_header_cache_length(),
400            header.num_hash(),
401            kind,
402        );
403
404        let task = Self::new(
405            provider,
406            consensus,
407            payload_validator,
408            tx,
409            state,
410            canonical_in_memory_state,
411            persistence,
412            persistence_state,
413            payload_builder,
414            config,
415            kind,
416            evm_config,
417            changeset_cache,
418            use_hashed_state,
419        );
420        let incoming = task.incoming_tx.clone();
421        spawn_os_thread("engine", || task.run());
422        (incoming, outgoing)
423    }
424
425    /// Returns a new [`Sender`] to send messages to this type.
426    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
427        self.incoming_tx.clone()
428    }
429
430    /// Run the engine API handler.
431    ///
432    /// This will block the current thread and process incoming messages.
433    pub fn run(mut self) {
434        loop {
435            match self.wait_for_event() {
436                LoopEvent::EngineMessage(msg) => {
437                    debug!(target: "engine::tree", %msg, "received new engine message");
438                    match self.on_engine_message(msg) {
439                        Ok(ops::ControlFlow::Break(())) => return,
440                        Ok(ops::ControlFlow::Continue(())) => {}
441                        Err(fatal) => {
442                            error!(target: "engine::tree", %fatal, "insert block fatal error");
443                            return
444                        }
445                    }
446                }
447                LoopEvent::PersistenceComplete { result, start_time } => {
448                    if let Err(err) = self.on_persistence_complete(result, start_time) {
449                        error!(target: "engine::tree", %err, "Persistence complete handling failed");
450                        return
451                    }
452                }
453                LoopEvent::Disconnected => {
454                    error!(target: "engine::tree", "Channel disconnected");
455                    return
456                }
457            }
458
459            // Always check if we need to trigger new persistence after any event:
460            // - After engine messages: new blocks may have been inserted that exceed the
461            //   persistence threshold
462            // - After persistence completion: we can now persist more blocks if needed
463            if let Err(err) = self.advance_persistence() {
464                error!(target: "engine::tree", %err, "Advancing persistence failed");
465                return
466            }
467        }
468    }
469
470    /// Blocks until the next event is ready: either an incoming engine message or a persistence
471    /// completion (if one is in progress).
472    ///
473    /// Uses biased selection to prioritize persistence completion to update in-memory state and
474    /// unblock further writes.
475    fn wait_for_event(&mut self) -> LoopEvent<T, N> {
476        // Take ownership of persistence rx if present
477        let maybe_persistence = self.persistence_state.rx.take();
478
479        if let Some((persistence_rx, start_time, action)) = maybe_persistence {
480            // Biased select prioritizes persistence completion to update in memory state and
481            // unblock further writes
482            crossbeam_channel::select_biased! {
483                recv(persistence_rx) -> result => {
484                    // Don't put it back - consumed (oneshot-like behavior)
485                    match result {
486                        Ok(value) => LoopEvent::PersistenceComplete {
487                            result: value,
488                            start_time,
489                        },
490                        Err(_) => LoopEvent::Disconnected,
491                    }
492                },
493                recv(self.incoming) -> msg => {
494                    // Put the persistence rx back - we didn't consume it
495                    self.persistence_state.rx = Some((persistence_rx, start_time, action));
496                    match msg {
497                        Ok(m) => LoopEvent::EngineMessage(m),
498                        Err(_) => LoopEvent::Disconnected,
499                    }
500                },
501            }
502        } else {
503            // No persistence in progress - just wait on incoming
504            match self.incoming.recv() {
505                Ok(m) => LoopEvent::EngineMessage(m),
506                Err(_) => LoopEvent::Disconnected,
507            }
508        }
509    }
510
511    /// Invoked when previously requested blocks were downloaded.
512    ///
513    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
514    /// will execute the first batch and send the remaining blocks back through the channel so that
515    /// block request processing isn't blocked for a long time.
516    fn on_downloaded(
517        &mut self,
518        mut blocks: Vec<SealedBlock<N::Block>>,
519    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
520        if blocks.is_empty() {
521            // nothing to execute
522            return Ok(None)
523        }
524
525        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
526        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
527        for block in blocks.drain(..batch) {
528            if let Some(event) = self.on_downloaded_block(block)? {
529                let needs_backfill = event.is_backfill_action();
530                self.on_tree_event(event)?;
531                if needs_backfill {
532                    // can exit early if backfill is needed
533                    return Ok(None)
534                }
535            }
536        }
537
538        // if we still have blocks to execute, send them as a followup request
539        if !blocks.is_empty() {
540            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
541        }
542
543        Ok(None)
544    }
545
546    /// When the Consensus layer receives a new block via the consensus gossip protocol,
547    /// the transactions in the block are sent to the execution layer in the form of a
548    /// [`PayloadTypes::ExecutionData`], for example
549    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
550    /// Execution layer executes the transactions and validates the state in the block header,
551    /// then passes validation data back to Consensus layer, that adds the block to the head of
552    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
553    /// network in the form of a "Beacon block".
554    ///
555    /// These responses should adhere to the [Engine API Spec for
556    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
557    ///
558    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
559    /// returns an error if an internal error occurred.
560    #[instrument(
561        level = "debug",
562        target = "engine::tree",
563        skip_all,
564        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
565    )]
566    fn on_new_payload(
567        &mut self,
568        payload: T::ExecutionData,
569    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
570        trace!(target: "engine::tree", "invoked new payload");
571
572        // start timing for the new payload process
573        let start = Instant::now();
574
575        // Ensures that the given payload does not violate any consensus rules that concern the
576        // block's layout, like:
577        //    - missing or invalid base fee
578        //    - invalid extra data
579        //    - invalid transactions
580        //    - incorrect hash
581        //    - the versioned hashes passed with the payload do not exactly match transaction
582        //      versioned hashes
583        //    - the block does not contain blob transactions if it is pre-cancun
584        //
585        // This validates the following engine API rule:
586        //
587        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
588        //    validation by taking the following steps:
589        //
590        //   1. Obtain the actual array by concatenating blob versioned hashes lists
591        //      (`tx.blob_versioned_hashes`) of each [blob
592        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
593        //      in the payload, respecting the order of inclusion. If the payload has no blob
594        //      transactions the expected array **MUST** be `[]`.
595        //
596        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
597        //      null}` if the expected and the actual arrays don't match.
598        //
599        // This validation **MUST** be instantly run in all cases even during active sync process.
600
601        let num_hash = payload.num_hash();
602        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
603        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
604
605        let block_hash = num_hash.hash;
606
607        // Check for invalid ancestors
608        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
609            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
610            return Ok(TreeOutcome::new(status));
611        }
612
613        // record pre-execution phase duration
614        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
615
616        let status = if self.backfill_sync_state.is_idle() {
617            self.try_insert_payload(payload)?
618        } else {
619            self.try_buffer_payload(payload)?
620        };
621
622        let mut outcome = TreeOutcome::new(status);
623        // if the block is valid and it is the current sync target head, make it canonical
624        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
625            // Only create the canonical event if this block isn't already the canonical head
626            if self.state.tree_state.canonical_block_hash() != block_hash {
627                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
628                    sync_target_head: block_hash,
629                }));
630            }
631        }
632
633        // record total newPayload duration
634        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
635
636        Ok(outcome)
637    }
638
639    /// Processes a payload during normal sync operation.
640    ///
641    /// Returns:
642    /// - `Valid`: Payload successfully validated and inserted
643    /// - `Syncing`: Parent missing, payload buffered for later
644    /// - Error status: Payload is invalid
645    #[instrument(level = "debug", target = "engine::tree", skip_all)]
646    fn try_insert_payload(
647        &mut self,
648        payload: T::ExecutionData,
649    ) -> Result<PayloadStatus, InsertBlockFatalError> {
650        let block_hash = payload.block_hash();
651        let num_hash = payload.num_hash();
652        let parent_hash = payload.parent_hash();
653        let mut latest_valid_hash = None;
654
655        match self.insert_payload(payload) {
656            Ok(status) => {
657                let status = match status {
658                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
659                        latest_valid_hash = Some(block_hash);
660                        self.try_connect_buffered_blocks(num_hash)?;
661                        PayloadStatusEnum::Valid
662                    }
663                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
664                        latest_valid_hash = Some(block_hash);
665                        PayloadStatusEnum::Valid
666                    }
667                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
668                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
669                        // not known to be invalid, but we don't know anything else
670                        PayloadStatusEnum::Syncing
671                    }
672                };
673
674                Ok(PayloadStatus::new(status, latest_valid_hash))
675            }
676            Err(error) => match error {
677                InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
678                InsertPayloadError::Payload(error) => {
679                    Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
680                }
681            },
682        }
683    }
684
685    /// Stores a payload for later processing during backfill sync.
686    ///
687    /// During backfill, the node lacks the state needed to validate payloads,
688    /// so they are buffered (stored in memory) until their parent blocks are synced.
689    ///
690    /// Returns:
691    /// - `Syncing`: Payload successfully buffered
692    /// - Error status: Payload is malformed or invalid
693    fn try_buffer_payload(
694        &mut self,
695        payload: T::ExecutionData,
696    ) -> Result<PayloadStatus, InsertBlockFatalError> {
697        let parent_hash = payload.parent_hash();
698        let num_hash = payload.num_hash();
699
700        match self.payload_validator.convert_payload_to_block(payload) {
701            // if the block is well-formed, buffer it for later
702            Ok(block) => {
703                if let Err(error) = self.buffer_block(block) {
704                    Ok(self.on_insert_block_error(error)?)
705                } else {
706                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
707                }
708            }
709            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
710        }
711    }
712
713    /// Returns the new chain for the given head.
714    ///
715    /// This also handles reorgs.
716    ///
717    /// Note: This does not update the tracked state and instead returns the new chain based on the
718    /// given head.
719    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
720        // get the executed new head block
721        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
722            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
723            self.metrics.engine.executed_new_block_cache_miss.increment(1);
724            return Ok(None)
725        };
726
727        let new_head_number = new_head_block.recovered_block().number();
728        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
729
730        let mut new_chain = vec![new_head_block.clone()];
731        let mut current_hash = new_head_block.recovered_block().parent_hash();
732        let mut current_number = new_head_number - 1;
733
734        // Walk back the new chain until we reach a block we know about
735        //
736        // This is only done for in-memory blocks, because we should not have persisted any blocks
737        // that are _above_ the current canonical head.
738        while current_number > current_canonical_number {
739            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
740            {
741                current_hash = block.recovered_block().parent_hash();
742                current_number -= 1;
743                new_chain.push(block);
744            } else {
745                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
746                // This should never happen as we're walking back a chain that should connect to
747                // the canonical chain
748                return Ok(None)
749            }
750        }
751
752        // If we have reached the current canonical head by walking back from the target, then we
753        // know this represents an extension of the canonical chain.
754        if current_hash == self.state.tree_state.current_canonical_head.hash {
755            new_chain.reverse();
756
757            // Simple extension of the current chain
758            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
759        }
760
761        // We have a reorg. Walk back both chains to find the fork point.
762        let mut old_chain = Vec::new();
763        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
764
765        // If the canonical chain is ahead of the new chain,
766        // gather all blocks until new head number.
767        while current_canonical_number > current_number {
768            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
769                old_hash = block.recovered_block().parent_hash();
770                old_chain.push(block);
771                current_canonical_number -= 1;
772            } else {
773                // This shouldn't happen as we're walking back the canonical chain
774                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
775                return Ok(None)
776            }
777        }
778
779        // Both new and old chain pointers are now at the same height.
780        debug_assert_eq!(current_number, current_canonical_number);
781
782        // Walk both chains from specified hashes at same height until
783        // a common ancestor (fork block) is reached.
784        while old_hash != current_hash {
785            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
786                old_hash = block.recovered_block().parent_hash();
787                old_chain.push(block);
788            } else {
789                // This shouldn't happen as we're walking back the canonical chain
790                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
791                return Ok(None)
792            }
793
794            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
795            {
796                current_hash = block.recovered_block().parent_hash();
797                new_chain.push(block);
798            } else {
799                // This shouldn't happen as we've already walked this path
800                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
801                return Ok(None)
802            }
803        }
804        new_chain.reverse();
805        old_chain.reverse();
806
807        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
808    }
809
810    /// Updates the latest block state to the specified canonical ancestor.
811    ///
812    /// This method ensures that the latest block tracks the given canonical header by resetting
813    ///
814    /// # Arguments
815    /// * `canonical_header` - The canonical header to set as the new head
816    ///
817    /// # Returns
818    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
819    ///
820    /// Caution: This unwinds the canonical chain
821    fn update_latest_block_to_canonical_ancestor(
822        &mut self,
823        canonical_header: &SealedHeader<N::BlockHeader>,
824    ) -> ProviderResult<()> {
825        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
826        let current_head_number = self.state.tree_state.canonical_block_number();
827        let new_head_number = canonical_header.number();
828        let new_head_hash = canonical_header.hash();
829
830        // Update tree state with the new canonical head
831        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
832
833        // Handle the state update based on whether this is an unwind scenario
834        if new_head_number < current_head_number {
835            debug!(
836                target: "engine::tree",
837                current_head = current_head_number,
838                new_head = new_head_number,
839                new_head_hash = ?new_head_hash,
840                "FCU unwind detected: reverting to canonical ancestor"
841            );
842
843            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
844        } else {
845            debug!(
846                target: "engine::tree",
847                previous_head = current_head_number,
848                new_head = new_head_number,
849                new_head_hash = ?new_head_hash,
850                "Advancing latest block to canonical ancestor"
851            );
852            self.handle_chain_advance_or_same_height(canonical_header)
853        }
854    }
855
856    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
857    /// to the canonical header
858    fn handle_canonical_chain_unwind(
859        &self,
860        current_head_number: u64,
861        canonical_header: &SealedHeader<N::BlockHeader>,
862    ) -> ProviderResult<()> {
863        let new_head_number = canonical_header.number();
864        debug!(
865            target: "engine::tree",
866            from = current_head_number,
867            to = new_head_number,
868            "Handling unwind: collecting blocks to remove from in-memory state"
869        );
870
871        // Collect blocks that need to be removed from memory
872        let old_blocks =
873            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
874
875        // Load and apply the canonical ancestor block
876        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
877    }
878
879    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
880    fn collect_blocks_for_canonical_unwind(
881        &self,
882        new_head_number: u64,
883        current_head_number: u64,
884    ) -> Vec<ExecutedBlock<N>> {
885        let mut old_blocks =
886            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
887
888        for block_num in (new_head_number + 1)..=current_head_number {
889            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
890                let executed_block = block_state.block_ref().clone();
891                old_blocks.push(executed_block);
892                debug!(
893                    target: "engine::tree",
894                    block_number = block_num,
895                    "Collected block for removal from in-memory state"
896                );
897            }
898        }
899
900        if old_blocks.is_empty() {
901            debug!(
902                target: "engine::tree",
903                "No blocks found in memory to remove, will clear and reset state"
904            );
905        }
906
907        old_blocks
908    }
909
910    /// Applies the canonical ancestor block via a reorg operation.
911    fn apply_canonical_ancestor_via_reorg(
912        &self,
913        canonical_header: &SealedHeader<N::BlockHeader>,
914        old_blocks: Vec<ExecutedBlock<N>>,
915    ) -> ProviderResult<()> {
916        let new_head_hash = canonical_header.hash();
917        let new_head_number = canonical_header.number();
918
919        // Try to load the canonical ancestor's block
920        match self.canonical_block_by_hash(new_head_hash)? {
921            Some(executed_block) => {
922                // Perform the reorg to properly handle the unwind
923                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
924                    new: vec![executed_block],
925                    old: old_blocks,
926                });
927
928                // CRITICAL: Update the canonical head after the reorg
929                // This ensures get_canonical_head() returns the correct block
930                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
931
932                debug!(
933                    target: "engine::tree",
934                    block_number = new_head_number,
935                    block_hash = ?new_head_hash,
936                    "Successfully loaded canonical ancestor into memory via reorg"
937                );
938            }
939            None => {
940                // Fallback: update header only if block cannot be found
941                warn!(
942                    target: "engine::tree",
943                    block_hash = ?new_head_hash,
944                    "Could not find canonical ancestor block, updating header only"
945                );
946                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
947            }
948        }
949
950        Ok(())
951    }
952
953    /// Handles chain advance or same height scenarios.
954    fn handle_chain_advance_or_same_height(
955        &self,
956        canonical_header: &SealedHeader<N::BlockHeader>,
957    ) -> ProviderResult<()> {
958        // Load the block into memory if it's not already present
959        self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
960
961        // Update the canonical head header
962        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
963
964        Ok(())
965    }
966
967    /// Ensures a block is loaded into memory if not already present.
968    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
969        // Check if block is already in memory
970        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
971            return Ok(());
972        }
973
974        // Try to load the block from storage
975        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
976            self.canonical_in_memory_state
977                .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
978
979            debug!(
980                target: "engine::tree",
981                block_number,
982                block_hash = ?block_hash,
983                "Added canonical block to in-memory state"
984            );
985        }
986
987        Ok(())
988    }
989
990    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
991    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
992    /// chain.
993    ///
994    /// These responses should adhere to the [Engine API Spec for
995    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
996    ///
997    /// Returns an error if an internal error occurred like a database error.
998    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
999    fn on_forkchoice_updated(
1000        &mut self,
1001        state: ForkchoiceState,
1002        attrs: Option<T::PayloadAttributes>,
1003        version: EngineApiMessageVersion,
1004    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1005        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1006
1007        // Record metrics
1008        self.record_forkchoice_metrics();
1009
1010        // Pre-validation of forkchoice state
1011        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1012            return Ok(TreeOutcome::new(early_result));
1013        }
1014
1015        // Return early if we are on the correct fork
1016        if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
1017            return Ok(result);
1018        }
1019
1020        // Attempt to apply a chain update when the head differs from our canonical chain.
1021        // This handles reorgs and chain extensions by making the specified head canonical.
1022        if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1023            return Ok(result);
1024        }
1025
1026        // Fallback that ensures to catch up to the network's state.
1027        self.handle_missing_block(state)
1028    }
1029
1030    /// Records metrics for forkchoice updated calls
1031    fn record_forkchoice_metrics(&self) {
1032        self.canonical_in_memory_state.on_forkchoice_update_received();
1033    }
1034
1035    /// Pre-validates the forkchoice state and returns early if validation fails.
1036    ///
1037    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1038    /// returned. Returns `None` if validation passes and processing should continue.
1039    fn validate_forkchoice_state(
1040        &mut self,
1041        state: ForkchoiceState,
1042    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1043        if state.head_block_hash.is_zero() {
1044            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1045        }
1046
1047        // Check if the new head hash is connected to any ancestor that we previously marked as
1048        // invalid
1049        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1050        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1051            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1052        }
1053
1054        if !self.backfill_sync_state.is_idle() {
1055            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1056            // exclusive access to the database
1057            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1058            return Ok(Some(OnForkChoiceUpdated::syncing()));
1059        }
1060
1061        Ok(None)
1062    }
1063
1064    /// Handles the case where the forkchoice head is already canonical.
1065    ///
1066    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1067    /// processing is complete. Returns `None` if the head is not canonical and processing
1068    /// should continue.
1069    fn handle_canonical_head(
1070        &self,
1071        state: ForkchoiceState,
1072        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1073        version: EngineApiMessageVersion,
1074    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1075        // Process the forkchoice update by trying to make the head block canonical
1076        //
1077        // We can only process this forkchoice update if:
1078        // - we have the `head` block
1079        // - the head block is part of a chain that is connected to the canonical chain. This
1080        //   includes reorgs.
1081        //
1082        // Performing a FCU involves:
1083        // - marking the FCU's head block as canonical
1084        // - updating in memory state to reflect the new canonical chain
1085        // - updating canonical state trackers
1086        // - emitting a canonicalization event for the new chain (including reorg)
1087        // - if we have payload attributes, delegate them to the payload service
1088
1089        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1090            return Ok(None);
1091        }
1092
1093        trace!(target: "engine::tree", "fcu head hash is already canonical");
1094
1095        // Update the safe and finalized blocks and ensure their values are valid
1096        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1097            // safe or finalized hashes are invalid
1098            return Ok(Some(TreeOutcome::new(outcome)));
1099        }
1100
1101        // Process payload attributes if the head is already canonical
1102        if let Some(attr) = attrs {
1103            let tip = self
1104                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1105                .ok_or_else(|| {
1106                    // If we can't find the canonical block, then something is wrong and we need
1107                    // to return an error
1108                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1109                })?;
1110            // Clone only when we actually need to process the attributes
1111            let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1112            return Ok(Some(TreeOutcome::new(updated)));
1113        }
1114
1115        // The head block is already canonical
1116        let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1117            PayloadStatusEnum::Valid,
1118            Some(state.head_block_hash),
1119        )));
1120        Ok(Some(outcome))
1121    }
1122
1123    /// Applies chain update for the new head block and processes payload attributes.
1124    ///
1125    /// This method handles the case where the forkchoice head differs from our current canonical
1126    /// head. It attempts to make the specified head block canonical by:
1127    /// - Checking if the head is already part of the canonical chain
1128    /// - Applying chain reorganizations (reorgs) if necessary
1129    /// - Processing payload attributes if provided
1130    /// - Returning the appropriate forkchoice update response
1131    ///
1132    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1133    /// Returns `None` if no chain update was needed or possible.
1134    fn apply_chain_update(
1135        &mut self,
1136        state: ForkchoiceState,
1137        attrs: &Option<T::PayloadAttributes>,
1138        version: EngineApiMessageVersion,
1139    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1140        // Check if the head is already part of the canonical chain
1141        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1142            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1143
1144            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1145            // own chain at will, so we need to always trigger a new payload job if requested.
1146            if self.engine_kind.is_opstack() ||
1147                self.config.always_process_payload_attributes_on_canonical_head()
1148            {
1149                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1150                // part of the canonical chain. We need to update the latest block state to reflect
1151                // the canonical ancestor. This ensures that state providers and the transaction
1152                // pool operate with the correct chain state after forkchoice update processing, and
1153                // new payloads built on the reorg'd head will be added to the tree immediately.
1154                if self.config.unwind_canonical_header() {
1155                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1156                }
1157
1158                if let Some(attr) = attrs {
1159                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1160                    // Clone only when we actually need to process the attributes
1161                    let updated = self.process_payload_attributes(
1162                        attr.clone(),
1163                        &canonical_header,
1164                        state,
1165                        version,
1166                    );
1167                    return Ok(Some(TreeOutcome::new(updated)));
1168                }
1169            }
1170
1171            // According to the Engine API specification, client software MAY skip an update of the
1172            // forkchoice state and MUST NOT begin a payload build process if
1173            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1174            // of canonical chain, i.e. the ancestor passed payload validation process
1175            // and deemed `VALID`. In the case of such an event, client software MUST
1176            // return `{payloadStatus: {status: VALID, latestValidHash:
1177            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1178
1179            // The head block is already canonical and we're not processing payload attributes,
1180            // so we're not triggering a payload job and can return right away
1181
1182            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1183                PayloadStatusEnum::Valid,
1184                Some(state.head_block_hash),
1185            )));
1186            return Ok(Some(outcome));
1187        }
1188
1189        // Ensure we can apply a new chain update for the head block
1190        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1191            let tip = chain_update.tip().clone_sealed_header();
1192            self.on_canonical_chain_update(chain_update);
1193
1194            // Update the safe and finalized blocks and ensure their values are valid
1195            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1196                // safe or finalized hashes are invalid
1197                return Ok(Some(TreeOutcome::new(outcome)));
1198            }
1199
1200            if let Some(attr) = attrs {
1201                // Clone only when we actually need to process the attributes
1202                let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1203                return Ok(Some(TreeOutcome::new(updated)));
1204            }
1205
1206            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1207                PayloadStatusEnum::Valid,
1208                Some(state.head_block_hash),
1209            )));
1210            return Ok(Some(outcome));
1211        }
1212
1213        Ok(None)
1214    }
1215
1216    /// Handles the case where the head block is missing and needs to be downloaded.
1217    ///
1218    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1219    /// Returns a `TreeOutcome` with syncing status and download event.
1220    fn handle_missing_block(
1221        &self,
1222        state: ForkchoiceState,
1223    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1224        // We don't have the block to perform the forkchoice update
1225        // We assume the FCU is valid and at least the head is missing,
1226        // so we need to start syncing to it
1227        //
1228        // find the appropriate target to sync to, if we don't have the safe block hash then we
1229        // start syncing to the safe block via backfill first
1230        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1231        // check that safe block is valid and missing
1232        !state.safe_block_hash.is_zero() &&
1233        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1234        {
1235            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1236            state.safe_block_hash
1237        } else {
1238            state.head_block_hash
1239        };
1240
1241        let target = self.lowest_buffered_ancestor_or(target);
1242        trace!(target: "engine::tree", %target, "downloading missing block");
1243
1244        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1245            PayloadStatusEnum::Syncing,
1246        )))
1247        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1248    }
1249
1250    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1251    /// the current persistence action while we're removing blocks.
1252    fn remove_blocks(&mut self, new_tip_num: u64) {
1253        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1254        if new_tip_num < self.persistence_state.last_persisted_block.number {
1255            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1256            let (tx, rx) = crossbeam_channel::bounded(1);
1257            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1258            self.persistence_state.start_remove(new_tip_num, rx);
1259        }
1260    }
1261
1262    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1263    /// the current persistence action while we're saving blocks.
1264    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1265        if blocks_to_persist.is_empty() {
1266            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1267            return
1268        }
1269
1270        // NOTE: checked non-empty above
1271        let highest_num_hash = blocks_to_persist
1272            .iter()
1273            .max_by_key(|block| block.recovered_block().number())
1274            .map(|b| b.recovered_block().num_hash())
1275            .expect("Checked non-empty persisting blocks");
1276
1277        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1278        let (tx, rx) = crossbeam_channel::bounded(1);
1279        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1280
1281        self.persistence_state.start_save(highest_num_hash, rx);
1282    }
1283
1284    /// Triggers new persistence actions if no persistence task is currently in progress.
1285    ///
1286    /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
1287    /// Persistence completion is handled separately via the `wait_for_event` method.
1288    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1289        if !self.persistence_state.in_progress() {
1290            if let Some(new_tip_num) = self.find_disk_reorg()? {
1291                self.remove_blocks(new_tip_num)
1292            } else if self.should_persist() {
1293                let blocks_to_persist =
1294                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1295                self.persist_blocks(blocks_to_persist);
1296            }
1297        }
1298
1299        Ok(())
1300    }
1301
1302    /// Finishes termination by persisting all remaining blocks and signaling completion.
1303    ///
1304    /// This blocks until all persistence is complete. Always signals completion,
1305    /// even if an error occurs.
1306    fn finish_termination(
1307        &mut self,
1308        pending_termination: oneshot::Sender<()>,
1309    ) -> Result<(), AdvancePersistenceError> {
1310        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1311        let result = self.persist_until_complete();
1312        let _ = pending_termination.send(());
1313        result
1314    }
1315
1316    /// Persists all remaining blocks until none are left.
1317    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1318        loop {
1319            // Wait for any in-progress persistence to complete (blocking)
1320            if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
1321                let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1322                self.on_persistence_complete(result, start_time)?;
1323            }
1324
1325            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1326
1327            if blocks_to_persist.is_empty() {
1328                debug!(target: "engine::tree", "persistence complete, signaling termination");
1329                return Ok(())
1330            }
1331
1332            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1333            self.persist_blocks(blocks_to_persist);
1334        }
1335    }
1336
1337    /// Tries to poll for a completed persistence task (non-blocking).
1338    ///
1339    /// Returns `true` if a persistence task was completed, `false` otherwise.
1340    #[cfg(test)]
1341    pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1342        let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1343            return Ok(false);
1344        };
1345
1346        match rx.try_recv() {
1347            Ok(result) => {
1348                self.on_persistence_complete(result, start_time)?;
1349                Ok(true)
1350            }
1351            Err(crossbeam_channel::TryRecvError::Empty) => {
1352                // Not ready yet, put it back
1353                self.persistence_state.rx = Some((rx, start_time, action));
1354                Ok(false)
1355            }
1356            Err(crossbeam_channel::TryRecvError::Disconnected) => {
1357                Err(AdvancePersistenceError::ChannelClosed)
1358            }
1359        }
1360    }
1361
1362    /// Handles a completed persistence task.
1363    fn on_persistence_complete(
1364        &mut self,
1365        last_persisted_hash_num: Option<BlockNumHash>,
1366        start_time: Instant,
1367    ) -> Result<(), AdvancePersistenceError> {
1368        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1369
1370        let Some(BlockNumHash {
1371            hash: last_persisted_block_hash,
1372            number: last_persisted_block_number,
1373        }) = last_persisted_hash_num
1374        else {
1375            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1376            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1377            return Ok(())
1378        };
1379
1380        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1381        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1382
1383        // Evict trie changesets for blocks below the eviction threshold.
1384        // Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
1385        // the finalized block if set.
1386        let min_threshold =
1387            last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1388        let eviction_threshold =
1389            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1390                // Use the minimum of finalized block and retention threshold to be conservative
1391                finalized.number.min(min_threshold)
1392            } else {
1393                // When finalized is not set (e.g., on L2s), use the retention threshold
1394                min_threshold
1395            };
1396        debug!(
1397            target: "engine::tree",
1398            last_persisted = last_persisted_block_number,
1399            finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1400            eviction_threshold,
1401            "Evicting changesets below threshold"
1402        );
1403        self.changeset_cache.evict(eviction_threshold);
1404
1405        // Invalidate cached overlay since the anchor has changed
1406        self.state.tree_state.invalidate_cached_overlay();
1407
1408        self.on_new_persisted_block()?;
1409
1410        // Re-prepare overlay for the current canonical head with the new anchor.
1411        // Spawn a background task to trigger computation so it's ready when the next payload
1412        // arrives.
1413        if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1414            rayon::spawn(move || {
1415                let _ = overlay.get();
1416            });
1417        }
1418
1419        Ok(())
1420    }
1421
1422    /// Handles a message from the engine.
1423    ///
1424    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1425    fn on_engine_message(
1426        &mut self,
1427        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1428    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1429        match msg {
1430            FromEngine::Event(event) => match event {
1431                FromOrchestrator::BackfillSyncStarted => {
1432                    debug!(target: "engine::tree", "received backfill sync started event");
1433                    self.backfill_sync_state = BackfillSyncState::Active;
1434                }
1435                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1436                    self.on_backfill_sync_finished(ctrl)?;
1437                }
1438                FromOrchestrator::Terminate { tx } => {
1439                    debug!(target: "engine::tree", "received terminate request");
1440                    if let Err(err) = self.finish_termination(tx) {
1441                        error!(target: "engine::tree", %err, "Termination failed");
1442                    }
1443                    return Ok(ops::ControlFlow::Break(()))
1444                }
1445            },
1446            FromEngine::Request(request) => {
1447                match request {
1448                    EngineApiRequest::InsertExecutedBlock(block) => {
1449                        let block_num_hash = block.recovered_block().num_hash();
1450                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1451                            // outdated block that can be skipped
1452                            return Ok(ops::ControlFlow::Continue(()))
1453                        }
1454
1455                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1456                        let now = Instant::now();
1457
1458                        // if the parent is the canonical head, we can insert the block as the
1459                        // pending block
1460                        if self.state.tree_state.canonical_block_hash() ==
1461                            block.recovered_block().parent_hash()
1462                        {
1463                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1464                            self.canonical_in_memory_state.set_pending_block(block.clone());
1465                        }
1466
1467                        self.state.tree_state.insert_executed(block.clone());
1468                        self.payload_validator.on_inserted_executed_block(block.clone());
1469                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1470                        self.emit_event(EngineApiEvent::BeaconConsensus(
1471                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1472                        ));
1473                    }
1474                    EngineApiRequest::Beacon(request) => {
1475                        match request {
1476                            BeaconEngineMessage::ForkchoiceUpdated {
1477                                state,
1478                                payload_attrs,
1479                                tx,
1480                                version,
1481                            } => {
1482                                let has_attrs = payload_attrs.is_some();
1483
1484                                let start = Instant::now();
1485                                let mut output =
1486                                    self.on_forkchoice_updated(state, payload_attrs, version);
1487
1488                                if let Ok(res) = &mut output {
1489                                    // track last received forkchoice state
1490                                    self.state
1491                                        .forkchoice_state_tracker
1492                                        .set_latest(state, res.outcome.forkchoice_status());
1493
1494                                    // emit an event about the handled FCU
1495                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1496                                        state,
1497                                        res.outcome.forkchoice_status(),
1498                                    ));
1499
1500                                    // handle the event if any
1501                                    self.on_maybe_tree_event(res.event.take())?;
1502                                }
1503
1504                                if let Err(ref err) = output {
1505                                    error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1506                                }
1507
1508                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1509                                    start,
1510                                    &mut self.metrics.engine.new_payload.latest_finish_at,
1511                                    has_attrs,
1512                                    &output,
1513                                );
1514
1515                                if let Err(err) =
1516                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1517                                {
1518                                    self.metrics
1519                                        .engine
1520                                        .failed_forkchoice_updated_response_deliveries
1521                                        .increment(1);
1522                                    warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1523                                }
1524                            }
1525                            BeaconEngineMessage::NewPayload { payload, tx } => {
1526                                let start = Instant::now();
1527                                let gas_used = payload.gas_used();
1528                                let num_hash = payload.num_hash();
1529                                let mut output = self.on_new_payload(payload);
1530                                self.metrics.engine.new_payload.update_response_metrics(
1531                                    start,
1532                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1533                                    &output,
1534                                    gas_used,
1535                                );
1536
1537                                let maybe_event =
1538                                    output.as_mut().ok().and_then(|out| out.event.take());
1539
1540                                // emit response
1541                                if let Err(err) =
1542                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1543                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1544                                    }))
1545                                {
1546                                    warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1547                                    self.metrics
1548                                        .engine
1549                                        .failed_new_payload_response_deliveries
1550                                        .increment(1);
1551                                }
1552
1553                                // handle the event if any
1554                                self.on_maybe_tree_event(maybe_event)?;
1555                            }
1556                        }
1557                    }
1558                }
1559            }
1560            FromEngine::DownloadedBlocks(blocks) => {
1561                if let Some(event) = self.on_downloaded(blocks)? {
1562                    self.on_tree_event(event)?;
1563                }
1564            }
1565        }
1566        Ok(ops::ControlFlow::Continue(()))
1567    }
1568
1569    /// Invoked if the backfill sync has finished to target.
1570    ///
1571    /// At this point we consider the block synced to the backfill target.
1572    ///
1573    /// Checks the tracked finalized block against the block on disk and requests another backfill
1574    /// run if the distance to the tip exceeds the threshold for another backfill run.
1575    ///
1576    /// This will also do the necessary housekeeping of the tree state, this includes:
1577    ///  - removing all blocks below the backfill height
1578    ///  - resetting the canonical in-memory state
1579    ///
1580    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1581    /// target block.
1582    fn on_backfill_sync_finished(
1583        &mut self,
1584        ctrl: ControlFlow,
1585    ) -> Result<(), InsertBlockFatalError> {
1586        debug!(target: "engine::tree", "received backfill sync finished event");
1587        self.backfill_sync_state = BackfillSyncState::Idle;
1588
1589        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1590        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1591            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1592            // update the `invalid_headers` cache with the new invalid header
1593            self.state.invalid_headers.insert(**bad_block);
1594
1595            // if this was an unwind then the target is the new height
1596            Some(*target)
1597        } else {
1598            // backfill height is the block number that the backfill finished at
1599            ctrl.block_number()
1600        };
1601
1602        // backfill height is the block number that the backfill finished at
1603        let Some(backfill_height) = backfill_height else { return Ok(()) };
1604
1605        // state house keeping after backfill sync
1606        // remove all executed blocks below the backfill height
1607        //
1608        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1609        // before that
1610        let Some(backfill_num_hash) = self
1611            .provider
1612            .block_hash(backfill_height)?
1613            .map(|hash| BlockNumHash { hash, number: backfill_height })
1614        else {
1615            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1616            return Ok(())
1617        };
1618
1619        if ctrl.is_unwind() {
1620            // the node reset so we need to clear everything above that height so that backfill
1621            // height is the new canonical block.
1622            self.state.tree_state.reset(backfill_num_hash)
1623        } else {
1624            self.state.tree_state.remove_until(
1625                backfill_num_hash,
1626                self.persistence_state.last_persisted_block.hash,
1627                Some(backfill_num_hash),
1628            );
1629        }
1630
1631        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1632        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1633
1634        // remove all buffered blocks below the backfill height
1635        self.state.buffer.remove_old_blocks(backfill_height);
1636        // we remove all entries because now we're synced to the backfill target and consider this
1637        // the canonical chain
1638        self.canonical_in_memory_state.clear_state();
1639
1640        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1641            // update the tracked chain height, after backfill sync both the canonical height and
1642            // persisted height are the same
1643            self.state.tree_state.set_canonical_head(new_head.num_hash());
1644            self.persistence_state.finish(new_head.hash(), new_head.number());
1645
1646            // update the tracked canonical head
1647            self.canonical_in_memory_state.set_canonical_head(new_head);
1648        }
1649
1650        // check if we need to run backfill again by comparing the most recent finalized height to
1651        // the backfill height
1652        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1653        else {
1654            return Ok(())
1655        };
1656        if sync_target_state.finalized_block_hash.is_zero() {
1657            // no finalized block, can't check distance
1658            return Ok(())
1659        }
1660        // get the block number of the finalized block, if we have it
1661        let newest_finalized = self
1662            .state
1663            .buffer
1664            .block(&sync_target_state.finalized_block_hash)
1665            .map(|block| block.number());
1666
1667        // The block number that the backfill finished at - if the progress or newest
1668        // finalized is None then we can't check the distance anyways.
1669        //
1670        // If both are Some, we perform another distance check and return the desired
1671        // backfill target
1672        if let Some(backfill_target) =
1673            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1674                // Determines whether or not we should run backfill again, in case
1675                // the new gap is still large enough and requires running backfill again
1676                self.backfill_sync_target(progress, finalized_number, None)
1677            })
1678        {
1679            // request another backfill run
1680            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1681                backfill_target.into(),
1682            )));
1683            return Ok(())
1684        };
1685
1686        // Check if there are more blocks to sync between current head and FCU target
1687        if let Some(lowest_buffered) =
1688            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1689        {
1690            let current_head_num = self.state.tree_state.current_canonical_head.number;
1691            let target_head_num = lowest_buffered.number();
1692
1693            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1694            {
1695                // There are blocks between current head and FCU target, download them
1696                debug!(
1697                    target: "engine::tree",
1698                    %current_head_num,
1699                    %target_head_num,
1700                    %distance,
1701                    "Backfill complete, downloading remaining blocks to reach FCU target"
1702                );
1703
1704                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1705                    lowest_buffered.parent_hash(),
1706                    distance,
1707                )));
1708                return Ok(());
1709            }
1710        } else {
1711            // We don't have the head block or any of its ancestors buffered. Request
1712            // a download for the head block which will then trigger further sync.
1713            debug!(
1714                target: "engine::tree",
1715                head_hash = %sync_target_state.head_block_hash,
1716                "Backfill complete but head block not buffered, requesting download"
1717            );
1718            self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1719                sync_target_state.head_block_hash,
1720            )));
1721            return Ok(());
1722        }
1723
1724        // try to close the gap by executing buffered blocks that are child blocks of the new head
1725        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1726    }
1727
1728    /// Attempts to make the given target canonical.
1729    ///
1730    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1731    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1732        if let Some(chain_update) = self.on_new_head(target)? {
1733            self.on_canonical_chain_update(chain_update);
1734        }
1735
1736        Ok(())
1737    }
1738
1739    /// Convenience function to handle an optional tree event.
1740    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1741        if let Some(event) = event {
1742            self.on_tree_event(event)?;
1743        }
1744
1745        Ok(())
1746    }
1747
1748    /// Handles a tree event.
1749    ///
1750    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1751    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1752        match event {
1753            TreeEvent::TreeAction(action) => match action {
1754                TreeAction::MakeCanonical { sync_target_head } => {
1755                    self.make_canonical(sync_target_head)?;
1756                }
1757            },
1758            TreeEvent::BackfillAction(action) => {
1759                self.emit_event(EngineApiEvent::BackfillAction(action));
1760            }
1761            TreeEvent::Download(action) => {
1762                self.emit_event(EngineApiEvent::Download(action));
1763            }
1764        }
1765
1766        Ok(())
1767    }
1768
1769    /// Emits an outgoing event to the engine.
1770    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1771        let event = event.into();
1772
1773        if event.is_backfill_action() {
1774            debug_assert_eq!(
1775                self.backfill_sync_state,
1776                BackfillSyncState::Idle,
1777                "backfill action should only be emitted when backfill is idle"
1778            );
1779
1780            if self.persistence_state.in_progress() {
1781                // backfill sync and persisting data are mutually exclusive, so we can't start
1782                // backfill while we're still persisting
1783                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1784                return
1785            }
1786
1787            self.backfill_sync_state = BackfillSyncState::Pending;
1788            self.metrics.engine.pipeline_runs.increment(1);
1789            debug!(target: "engine::tree", "emitting backfill action event");
1790        }
1791
1792        let _ = self.outgoing.send(event).inspect_err(
1793            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1794        );
1795    }
1796
1797    /// Returns true if the canonical chain length minus the last persisted
1798    /// block is greater than or equal to the persistence threshold and
1799    /// backfill is not running.
1800    pub const fn should_persist(&self) -> bool {
1801        if !self.backfill_sync_state.is_idle() {
1802            // can't persist if backfill is running
1803            return false
1804        }
1805
1806        let min_block = self.persistence_state.last_persisted_block.number;
1807        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1808            self.config.persistence_threshold()
1809    }
1810
1811    /// Returns a batch of consecutive canonical blocks to persist in the range
1812    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
1813    fn get_canonical_blocks_to_persist(
1814        &self,
1815        target: PersistTarget,
1816    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1817        // We will calculate the state root using the database, so we need to be sure there are no
1818        // changes
1819        debug_assert!(!self.persistence_state.in_progress());
1820
1821        let mut blocks_to_persist = Vec::new();
1822        let mut current_hash = self.state.tree_state.canonical_block_hash();
1823        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1824        let canonical_head_number = self.state.tree_state.canonical_block_number();
1825
1826        let target_number = match target {
1827            PersistTarget::Head => canonical_head_number,
1828            PersistTarget::Threshold => {
1829                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1830            }
1831        };
1832
1833        debug!(
1834            target: "engine::tree",
1835            ?current_hash,
1836            ?last_persisted_number,
1837            ?canonical_head_number,
1838            ?target_number,
1839            "Returning canonical blocks to persist"
1840        );
1841        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1842            if block.recovered_block().number() <= last_persisted_number {
1843                break;
1844            }
1845
1846            if block.recovered_block().number() <= target_number {
1847                blocks_to_persist.push(block.clone());
1848            }
1849
1850            current_hash = block.recovered_block().parent_hash();
1851        }
1852
1853        // Reverse the order so that the oldest block comes first
1854        blocks_to_persist.reverse();
1855
1856        Ok(blocks_to_persist)
1857    }
1858
1859    /// This clears the blocks from the in-memory tree state that have been persisted to the
1860    /// database.
1861    ///
1862    /// This also updates the canonical in-memory state to reflect the newest persisted block
1863    /// height.
1864    ///
1865    /// Assumes that `finish` has been called on the `persistence_state` at least once
1866    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1867        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1868        // state.
1869        if let Some(remove_above) = self.find_disk_reorg()? {
1870            self.remove_blocks(remove_above);
1871            return Ok(())
1872        }
1873
1874        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1875        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1876        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1877            number: self.persistence_state.last_persisted_block.number,
1878            hash: self.persistence_state.last_persisted_block.hash,
1879        });
1880        Ok(())
1881    }
1882
1883    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1884    ///
1885    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
1886    /// or the database. If the required historical data (such as trie change sets) has been
1887    /// pruned for a given block, this operation will return an error. On archive nodes, it
1888    /// can retrieve any block.
1889    #[instrument(level = "debug", target = "engine::tree", skip(self))]
1890    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1891        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1892        // check memory first
1893        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1894            return Ok(Some(block.clone()))
1895        }
1896
1897        let (block, senders) = self
1898            .provider
1899            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1900            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1901            .split_sealed();
1902        let mut execution_output = self
1903            .provider
1904            .get_state(block.header().number())?
1905            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1906        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1907
1908        debug!(
1909            target: "engine::tree",
1910            number = ?block.number(),
1911            "computing block trie updates",
1912        );
1913        let db_provider = self.provider.database_provider_ro()?;
1914        let trie_updates = reth_trie_db::compute_block_trie_updates(
1915            &self.changeset_cache,
1916            &db_provider,
1917            block.number(),
1918        )?;
1919
1920        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
1921        let sorted_trie_updates = Arc::new(trie_updates);
1922        // Skip building trie input and anchor for DB-loaded blocks.
1923        let trie_data =
1924            ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
1925
1926        let execution_output = Arc::new(BlockExecutionOutput {
1927            state: execution_output.bundle,
1928            result: BlockExecutionResult {
1929                receipts: execution_output.receipts.pop().unwrap_or_default(),
1930                requests: execution_output.requests.pop().unwrap_or_default(),
1931                gas_used: block.gas_used(),
1932                blob_gas_used: block.blob_gas_used().unwrap_or_default(),
1933            },
1934        });
1935
1936        Ok(Some(ExecutedBlock::new(
1937            Arc::new(RecoveredBlock::new_sealed(block, senders)),
1938            execution_output,
1939            trie_data,
1940        )))
1941    }
1942
1943    /// Return sealed block header from in-memory state or database by hash.
1944    fn sealed_header_by_hash(
1945        &self,
1946        hash: B256,
1947    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1948        // check memory first
1949        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1950
1951        if header.is_some() {
1952            Ok(header)
1953        } else {
1954            self.provider.sealed_header_by_hash(hash)
1955        }
1956    }
1957
1958    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1959    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1960    /// not exist in the buffer, this returns the hash that is passed in.
1961    ///
1962    /// Returns the parent hash of the block itself if the block is buffered and has no other
1963    /// buffered ancestors.
1964    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1965        self.state
1966            .buffer
1967            .lowest_ancestor(&hash)
1968            .map(|block| block.parent_hash())
1969            .unwrap_or_else(|| hash)
1970    }
1971
1972    /// If validation fails, the response MUST contain the latest valid hash:
1973    ///
1974    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1975    ///     conditions:
1976    ///     - It is fully validated and deemed VALID
1977    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1978    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1979    ///     conditions are satisfied by a `PoW` block.
1980    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1981    ///     the above conditions.
1982    fn latest_valid_hash_for_invalid_payload(
1983        &mut self,
1984        parent_hash: B256,
1985    ) -> ProviderResult<Option<B256>> {
1986        // Check if parent exists in side chain or in canonical chain.
1987        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1988            return Ok(Some(parent_hash))
1989        }
1990
1991        // iterate over ancestors in the invalid cache
1992        // until we encounter the first valid ancestor
1993        let mut current_hash = parent_hash;
1994        let mut current_block = self.state.invalid_headers.get(&current_hash);
1995        while let Some(block_with_parent) = current_block {
1996            current_hash = block_with_parent.parent;
1997            current_block = self.state.invalid_headers.get(&current_hash);
1998
1999            // If current_header is None, then the current_hash does not have an invalid
2000            // ancestor in the cache, check its presence in blockchain tree
2001            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
2002                return Ok(Some(current_hash))
2003            }
2004        }
2005        Ok(None)
2006    }
2007
2008    /// Prepares the invalid payload response for the given hash, checking the
2009    /// database for the parent hash and populating the payload status with the latest valid hash
2010    /// according to the engine api spec.
2011    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
2012        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
2013        // PoW block, which we need to identify by looking at the parent's block difficulty
2014        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
2015            !parent.difficulty().is_zero()
2016        {
2017            parent_hash = B256::ZERO;
2018        }
2019
2020        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
2021        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2022            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2023        })
2024        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2025    }
2026
2027    /// Returns true if the given hash is the last received sync target block.
2028    ///
2029    /// See [`ForkchoiceStateTracker::sync_target_state`]
2030    fn is_sync_target_head(&self, block_hash: B256) -> bool {
2031        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2032            return target.head_block_hash == block_hash
2033        }
2034        false
2035    }
2036
2037    /// Returns true if the given hash is part of the last received sync target fork choice update.
2038    ///
2039    /// See [`ForkchoiceStateTracker::sync_target_state`]
2040    fn is_any_sync_target(&self, block_hash: B256) -> bool {
2041        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2042            return target.contains(block_hash)
2043        }
2044        false
2045    }
2046
2047    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
2048    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
2049    ///
2050    /// Returns a payload status response according to the engine API spec if the block is known to
2051    /// be invalid.
2052    fn check_invalid_ancestor_with_head(
2053        &mut self,
2054        check: B256,
2055        head: &SealedBlock<N::Block>,
2056    ) -> ProviderResult<Option<PayloadStatus>> {
2057        // check if the check hash was previously marked as invalid
2058        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2059
2060        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2061    }
2062
2063    /// Invoked when a new payload received is invalid.
2064    fn on_invalid_new_payload(
2065        &mut self,
2066        head: SealedBlock<N::Block>,
2067        invalid: BlockWithParent,
2068    ) -> ProviderResult<PayloadStatus> {
2069        // populate the latest valid hash field
2070        let status = self.prepare_invalid_response(invalid.parent)?;
2071
2072        // insert the head block into the invalid header cache
2073        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2074        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2075
2076        Ok(status)
2077    }
2078
2079    /// Finds any invalid ancestor for the given payload.
2080    ///
2081    /// This function walks up the chain of buffered ancestors from the payload's block
2082    /// hash and checks if any ancestor is marked as invalid in the tree state.
2083    ///
2084    /// The check works by:
2085    /// 1. Finding the lowest buffered ancestor for the given block hash
2086    /// 2. If the ancestor is the same as the block hash itself, using the parent hash instead
2087    /// 3. Checking if this ancestor is in the `invalid_headers` map
2088    ///
2089    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
2090    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2091        let parent_hash = payload.parent_hash();
2092        let block_hash = payload.block_hash();
2093        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2094        if lowest_buffered_ancestor == block_hash {
2095            lowest_buffered_ancestor = parent_hash;
2096        }
2097
2098        // Check if the block has an invalid ancestor
2099        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2100    }
2101
2102    /// Handles a payload that has an invalid ancestor.
2103    ///
2104    /// This function validates the payload and processes it according to whether it's
2105    /// well-formed or malformed:
2106    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2107    ///    known-bad block, which violates consensus rules
2108    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2109    ///    validated due to its own structural issues
2110    fn handle_invalid_ancestor_payload(
2111        &mut self,
2112        payload: T::ExecutionData,
2113        invalid: BlockWithParent,
2114    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2115        let parent_hash = payload.parent_hash();
2116        let num_hash = payload.num_hash();
2117
2118        // Here we might have 2 cases
2119        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2120        //    remember it as invalid
2121        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2122        //    an error and forget it
2123        let block = match self.payload_validator.convert_payload_to_block(payload) {
2124            Ok(block) => block,
2125            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2126        };
2127
2128        Ok(self.on_invalid_new_payload(block, invalid)?)
2129    }
2130
2131    /// Checks if the given `head` points to an invalid header, which requires a specific response
2132    /// to a forkchoice update.
2133    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2134        // check if the head was previously marked as invalid
2135        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2136
2137        // Try to prepare invalid response, but handle errors gracefully
2138        match self.prepare_invalid_response(header.parent) {
2139            Ok(status) => Ok(Some(status)),
2140            Err(err) => {
2141                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2142                // Return a basic invalid status without latest valid hash
2143                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2144                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2145                })))
2146            }
2147        }
2148    }
2149
2150    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2151    /// and block body itself.
2152    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2153        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2154            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2155            return Err(e)
2156        }
2157
2158        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2159            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2160            return Err(e)
2161        }
2162
2163        Ok(())
2164    }
2165
2166    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2167    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2168    fn try_connect_buffered_blocks(
2169        &mut self,
2170        parent: BlockNumHash,
2171    ) -> Result<(), InsertBlockFatalError> {
2172        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2173
2174        if blocks.is_empty() {
2175            // nothing to append
2176            return Ok(())
2177        }
2178
2179        let now = Instant::now();
2180        let block_count = blocks.len();
2181        for child in blocks {
2182            let child_num_hash = child.num_hash();
2183            match self.insert_block(child) {
2184                Ok(res) => {
2185                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2186                    if self.is_any_sync_target(child_num_hash.hash) &&
2187                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2188                    {
2189                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2190                        // we just inserted a block that we know is part of the canonical chain, so
2191                        // we can make it canonical
2192                        self.make_canonical(child_num_hash.hash)?;
2193                    }
2194                }
2195                Err(err) => {
2196                    if let InsertPayloadError::Block(err) = err {
2197                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2198                        if let Err(fatal) = self.on_insert_block_error(err) {
2199                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2200                            return Err(fatal)
2201                        }
2202                    }
2203                }
2204            }
2205        }
2206
2207        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2208        Ok(())
2209    }
2210
2211    /// Pre-validates the block and inserts it into the buffer.
2212    fn buffer_block(
2213        &mut self,
2214        block: SealedBlock<N::Block>,
2215    ) -> Result<(), InsertBlockError<N::Block>> {
2216        if let Err(err) = self.validate_block(&block) {
2217            return Err(InsertBlockError::consensus_error(err, block))
2218        }
2219        self.state.buffer.insert_block(block);
2220        Ok(())
2221    }
2222
2223    /// Returns true if the distance from the local tip to the block is greater than the configured
2224    /// threshold.
2225    ///
2226    /// If the `local_tip` is greater than the `block`, then this will return false.
2227    #[inline]
2228    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2229        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2230    }
2231
2232    /// Returns how far the local tip is from the given block. If the local tip is at the same
2233    /// height or its block number is greater than the given block, this returns None.
2234    #[inline]
2235    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2236        if block > local_tip {
2237            Some(block - local_tip)
2238        } else {
2239            None
2240        }
2241    }
2242
2243    /// Returns the target hash to sync to if the distance from the local tip to the block is
2244    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2245    /// that block already).
2246    ///
2247    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2248    /// (missing) finalized block.
2249    fn backfill_sync_target(
2250        &self,
2251        canonical_tip_num: u64,
2252        target_block_number: u64,
2253        downloaded_block: Option<BlockNumHash>,
2254    ) -> Option<B256> {
2255        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2256
2257        // check if the downloaded block is the tracked finalized block
2258        let exceeds_backfill_threshold =
2259            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2260                // if we downloaded the finalized block we can now check how far we're off
2261                (Some(downloaded_block), Some(state))
2262                    if downloaded_block.hash == state.finalized_block_hash =>
2263                {
2264                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2265                }
2266                _ => match sync_target_state
2267                    .as_ref()
2268                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2269                {
2270                    Some(buffered_finalized) => {
2271                        // if we have buffered the finalized block, we should check how far we're
2272                        // off
2273                        self.exceeds_backfill_run_threshold(
2274                            canonical_tip_num,
2275                            buffered_finalized.number(),
2276                        )
2277                    }
2278                    None => {
2279                        // check if the distance exceeds the threshold for backfill sync
2280                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2281                    }
2282                },
2283            };
2284
2285        // if the number of missing blocks is greater than the max, trigger backfill
2286        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2287            // if we have already canonicalized the finalized block, we should skip backfill
2288            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2289                Err(err) => {
2290                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2291                }
2292                Ok(None) => {
2293                    // ensure the finalized block is known (not the zero hash)
2294                    if !state.finalized_block_hash.is_zero() {
2295                        // we don't have the block yet and the distance exceeds the allowed
2296                        // threshold
2297                        return Some(state.finalized_block_hash)
2298                    }
2299
2300                    // OPTIMISTIC SYNCING
2301                    //
2302                    // It can happen when the node is doing an
2303                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2304                    // but is expecting the EL to sync as high
2305                    // as possible before finalizing.
2306                    //
2307                    // This usually doesn't happen on ETH mainnet since CLs use the more
2308                    // secure checkpoint syncing.
2309                    //
2310                    // However, optimism chains will do this. The risk of a reorg is however
2311                    // low.
2312                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2313                    return Some(state.head_block_hash)
2314                }
2315                Ok(Some(_)) => {
2316                    // we're fully synced to the finalized block
2317                }
2318            }
2319        }
2320
2321        None
2322    }
2323
2324    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2325    /// happen if a reorg is happening while we are persisting a block.
2326    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2327        let mut canonical = self.state.tree_state.current_canonical_head;
2328        let mut persisted = self.persistence_state.last_persisted_block;
2329
2330        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2331            Ok(self
2332                .sealed_header_by_hash(num_hash.hash)?
2333                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2334                .parent_num_hash())
2335        };
2336
2337        // Happy path, canonical chain is ahead or equal to persisted chain.
2338        // Walk canonical chain back to make sure that it connects to persisted chain.
2339        while canonical.number > persisted.number {
2340            canonical = parent_num_hash(canonical)?;
2341        }
2342
2343        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2344        if canonical == persisted {
2345            return Ok(None);
2346        }
2347
2348        // At this point, we know that `persisted` block can't be reached by walking the canonical
2349        // chain back. In this case we need to truncate it to the first canonical block it connects
2350        // to.
2351
2352        // Firstly, walk back until we reach the same height as `canonical`.
2353        while persisted.number > canonical.number {
2354            persisted = parent_num_hash(persisted)?;
2355        }
2356
2357        debug_assert_eq!(persisted.number, canonical.number);
2358
2359        // Now walk both chains back until we find a common ancestor.
2360        while persisted.hash != canonical.hash {
2361            canonical = parent_num_hash(canonical)?;
2362            persisted = parent_num_hash(persisted)?;
2363        }
2364
2365        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2366
2367        Ok(Some(persisted.number))
2368    }
2369
2370    /// Invoked when we the canonical chain has been updated.
2371    ///
2372    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2373    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2374        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2375        let start = Instant::now();
2376
2377        // update the tracked canonical head
2378        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2379
2380        let tip = chain_update.tip().clone_sealed_header();
2381        let notification = chain_update.to_chain_notification();
2382
2383        // reinsert any missing reorged blocks
2384        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2385            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2386            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2387            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2388
2389            self.update_reorg_metrics(old.len(), old_first);
2390            self.reinsert_reorged_blocks(new.clone());
2391
2392            // When use_hashed_state is enabled, skip reinserting the old chain — the
2393            // bundle state references plain state reverts which don't exist.
2394            if !self.use_hashed_state {
2395                self.reinsert_reorged_blocks(old.clone());
2396            }
2397        }
2398
2399        // update the tracked in-memory state with the new chain
2400        self.canonical_in_memory_state.update_chain(chain_update);
2401        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2402
2403        // Update metrics based on new tip
2404        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2405
2406        // sends an event to all active listeners about the new canonical chain
2407        self.canonical_in_memory_state.notify_canon_state(notification);
2408
2409        // emit event
2410        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2411            Box::new(tip),
2412            start.elapsed(),
2413        ));
2414    }
2415
2416    /// This updates metrics based on the given reorg length and first reorged block number.
2417    fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2418        if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2419            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2420                first_reorged_block <= finalized.number
2421            {
2422                self.metrics.tree.reorgs.finalized.increment(1);
2423            } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2424                first_reorged_block <= safe.number
2425            {
2426                self.metrics.tree.reorgs.safe.increment(1);
2427            } else {
2428                self.metrics.tree.reorgs.head.increment(1);
2429            }
2430        } else {
2431            debug_unreachable!("Reorged chain doesn't have any blocks");
2432        }
2433        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2434    }
2435
2436    /// This reinserts any blocks in the new chain that do not already exist in the tree
2437    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2438        for block in new_chain {
2439            if self
2440                .state
2441                .tree_state
2442                .executed_block_by_hash(block.recovered_block().hash())
2443                .is_none()
2444            {
2445                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2446                self.state.tree_state.insert_executed(block);
2447            }
2448        }
2449    }
2450
2451    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2452    ///
2453    /// This mainly compares the missing parent of the downloaded block with the current canonical
2454    /// tip, and decides whether or not backfill sync should be triggered.
2455    fn on_disconnected_downloaded_block(
2456        &self,
2457        downloaded_block: BlockNumHash,
2458        missing_parent: BlockNumHash,
2459        head: BlockNumHash,
2460    ) -> Option<TreeEvent> {
2461        // compare the missing parent with the canonical tip
2462        if let Some(target) =
2463            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2464        {
2465            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2466            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2467        }
2468
2469        // continue downloading the missing parent
2470        //
2471        // this happens if either:
2472        //  * the missing parent block num < canonical tip num
2473        //    * this case represents a missing block on a fork that is shorter than the canonical
2474        //      chain
2475        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2476        //    less than the backfill threshold
2477        //    * this case represents a potentially long range of blocks to download and execute
2478        let request = if let Some(distance) =
2479            self.distance_from_local_tip(head.number, missing_parent.number)
2480        {
2481            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2482            DownloadRequest::BlockRange(missing_parent.hash, distance)
2483        } else {
2484            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2485            // This happens when the missing parent is on an outdated
2486            // sidechain and we can only download the missing block itself
2487            DownloadRequest::single_block(missing_parent.hash)
2488        };
2489
2490        Some(TreeEvent::Download(request))
2491    }
2492
2493    /// Invoked with a block downloaded from the network
2494    ///
2495    /// Returns an event with the appropriate action to take, such as:
2496    ///  - download more missing blocks
2497    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2498    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2499    fn on_downloaded_block(
2500        &mut self,
2501        block: SealedBlock<N::Block>,
2502    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2503        let block_num_hash = block.num_hash();
2504        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2505        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2506            return Ok(None)
2507        }
2508
2509        if !self.backfill_sync_state.is_idle() {
2510            return Ok(None)
2511        }
2512
2513        // try to append the block
2514        match self.insert_block(block) {
2515            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2516                // check if we just inserted a block that's part of sync targets,
2517                // i.e. head, safe, or finalized
2518                if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2519                    sync_target.contains(block_num_hash.hash)
2520                {
2521                    debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2522
2523                    // we just inserted a block that we know is part of the canonical chain, so we
2524                    // can make it canonical
2525                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2526                        sync_target_head: block_num_hash.hash,
2527                    })))
2528                }
2529                trace!(target: "engine::tree", "appended downloaded block");
2530                self.try_connect_buffered_blocks(block_num_hash)?;
2531            }
2532            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2533                // block is not connected to the canonical head, we need to download
2534                // its missing branch first
2535                return Ok(self.on_disconnected_downloaded_block(
2536                    block_num_hash,
2537                    missing_ancestor,
2538                    head,
2539                ))
2540            }
2541            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2542                trace!(target: "engine::tree", "downloaded block already executed");
2543            }
2544            Err(err) => {
2545                if let InsertPayloadError::Block(err) = err {
2546                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2547                    if let Err(fatal) = self.on_insert_block_error(err) {
2548                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2549                        return Err(fatal)
2550                    }
2551                }
2552            }
2553        }
2554        Ok(None)
2555    }
2556
2557    /// Inserts a payload into the tree and executes it.
2558    ///
2559    /// This function validates the payload's basic structure, then executes it using the
2560    /// payload validator. The execution includes running all transactions in the payload
2561    /// and validating the resulting state transitions.
2562    ///
2563    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2564    /// or `InsertPayloadError` if validation or execution failed.
2565    fn insert_payload(
2566        &mut self,
2567        payload: T::ExecutionData,
2568    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2569        self.insert_block_or_payload(
2570            payload.block_with_parent(),
2571            payload,
2572            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2573            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2574        )
2575    }
2576
2577    fn insert_block(
2578        &mut self,
2579        block: SealedBlock<N::Block>,
2580    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2581        self.insert_block_or_payload(
2582            block.block_with_parent(),
2583            block,
2584            |validator, block, ctx| validator.validate_block(block, ctx),
2585            |_, block| Ok(block),
2586        )
2587    }
2588
2589    /// Inserts a block or payload into the blockchain tree with full execution.
2590    ///
2591    /// This is a generic function that handles both blocks and payloads by accepting
2592    /// a block identifier, input data, and execution/validation functions. It performs
2593    /// comprehensive checks and execution:
2594    ///
2595    /// - Validates that the block doesn't already exist in the tree
2596    /// - Ensures parent state is available, buffering if necessary
2597    /// - Executes the block/payload using the provided execute function
2598    /// - Handles both canonical and fork chain insertions
2599    /// - Updates pending block state when appropriate
2600    /// - Emits consensus engine events and records metrics
2601    ///
2602    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2603    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2604    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2605    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2606    fn insert_block_or_payload<Input, Err>(
2607        &mut self,
2608        block_id: BlockWithParent,
2609        input: Input,
2610        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2611        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2612    ) -> Result<InsertPayloadOk, Err>
2613    where
2614        Err: From<InsertBlockError<N::Block>>,
2615    {
2616        let block_insert_start = Instant::now();
2617        let block_num_hash = block_id.block;
2618        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2619
2620        // Check if block already exists - first in memory, then DB only if it could be persisted
2621        if self.state.tree_state.sealed_header_by_hash(&block_num_hash.hash).is_some() {
2622            convert_to_block(self, input)?;
2623            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2624        }
2625
2626        // Only query DB if block could be persisted (number <= last persisted block).
2627        // New blocks from CL always have number > last persisted, so skip DB lookup for them.
2628        if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2629            match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2630                Err(err) => {
2631                    let block = convert_to_block(self, input)?;
2632                    return Err(InsertBlockError::new(block, err.into()).into());
2633                }
2634                Ok(Some(_)) => {
2635                    convert_to_block(self, input)?;
2636                    return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2637                }
2638                Ok(None) => {}
2639            }
2640        }
2641
2642        // Ensure that the parent state is available.
2643        match self.state_provider_builder(block_id.parent) {
2644            Err(err) => {
2645                let block = convert_to_block(self, input)?;
2646                return Err(InsertBlockError::new(block, err.into()).into());
2647            }
2648            Ok(None) => {
2649                let block = convert_to_block(self, input)?;
2650
2651                // we don't have the state required to execute this block, buffering it and find the
2652                // missing parent block
2653                let missing_ancestor = self
2654                    .state
2655                    .buffer
2656                    .lowest_ancestor(&block.parent_hash())
2657                    .map(|block| block.parent_num_hash())
2658                    .unwrap_or_else(|| block.parent_num_hash());
2659
2660                self.state.buffer.insert_block(block);
2661
2662                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2663                    head: self.state.tree_state.current_canonical_head,
2664                    missing_ancestor,
2665                }))
2666            }
2667            Ok(Some(_)) => {}
2668        }
2669
2670        // determine whether we are on a fork chain by comparing the block number with the
2671        // canonical head. This is a simple check that is sufficient for the event emission below.
2672        // A block is considered a fork if its number is less than or equal to the canonical head,
2673        // as this indicates there's already a canonical block at that height.
2674        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2675
2676        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2677
2678        let start = Instant::now();
2679
2680        let executed = execute(&mut self.payload_validator, input, ctx)?;
2681
2682        // if the parent is the canonical head, we can insert the block as the pending block
2683        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2684        {
2685            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2686            self.canonical_in_memory_state.set_pending_block(executed.clone());
2687        }
2688
2689        self.state.tree_state.insert_executed(executed.clone());
2690        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2691
2692        // emit insert event
2693        let elapsed = start.elapsed();
2694        let engine_event = if is_fork {
2695            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2696        } else {
2697            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2698        };
2699        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2700
2701        self.metrics
2702            .engine
2703            .block_insert_total_duration
2704            .record(block_insert_start.elapsed().as_secs_f64());
2705        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2706        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2707    }
2708
2709    /// Handles an error that occurred while inserting a block.
2710    ///
2711    /// If this is a validation error this will mark the block as invalid.
2712    ///
2713    /// Returns the proper payload status response if the block is invalid.
2714    fn on_insert_block_error(
2715        &mut self,
2716        error: InsertBlockError<N::Block>,
2717    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2718        let (block, error) = error.split();
2719
2720        // if invalid block, we check the validation error. Otherwise return the fatal
2721        // error.
2722        let validation_err = error.ensure_validation_error()?;
2723
2724        // If the error was due to an invalid payload, the payload is added to the
2725        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2726        // returned.
2727        warn!(
2728            target: "engine::tree",
2729            invalid_hash=%block.hash(),
2730            invalid_number=block.number(),
2731            %validation_err,
2732            "Invalid block error on new payload",
2733        );
2734        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2735
2736        // keep track of the invalid header
2737        self.state.invalid_headers.insert(block.block_with_parent());
2738        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2739            Box::new(block),
2740        )));
2741
2742        Ok(PayloadStatus::new(
2743            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2744            latest_valid_hash,
2745        ))
2746    }
2747
2748    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2749    fn on_new_payload_error(
2750        &mut self,
2751        error: NewPayloadError,
2752        payload_num_hash: NumHash,
2753        parent_hash: B256,
2754    ) -> ProviderResult<PayloadStatus> {
2755        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2756        // we need to convert the error to a payload status (response to the CL)
2757
2758        let latest_valid_hash =
2759            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2760                // Engine-API rules:
2761                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2762                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2763                None
2764            } else {
2765                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2766            };
2767
2768        let status = PayloadStatusEnum::from(error);
2769        Ok(PayloadStatus::new(status, latest_valid_hash))
2770    }
2771
2772    /// Attempts to find the header for the given block hash if it is canonical.
2773    pub fn find_canonical_header(
2774        &self,
2775        hash: B256,
2776    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2777        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2778
2779        if canonical.is_none() {
2780            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2781        }
2782
2783        Ok(canonical)
2784    }
2785
2786    /// Updates the tracked finalized block if we have it.
2787    fn update_finalized_block(
2788        &self,
2789        finalized_block_hash: B256,
2790    ) -> Result<(), OnForkChoiceUpdated> {
2791        if finalized_block_hash.is_zero() {
2792            return Ok(())
2793        }
2794
2795        match self.find_canonical_header(finalized_block_hash) {
2796            Ok(None) => {
2797                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2798                // if the finalized block is not known, we can't update the finalized block
2799                return Err(OnForkChoiceUpdated::invalid_state())
2800            }
2801            Ok(Some(finalized)) => {
2802                if Some(finalized.num_hash()) !=
2803                    self.canonical_in_memory_state.get_finalized_num_hash()
2804                {
2805                    // we're also persisting the finalized block on disk so we can reload it on
2806                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2807                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2808                    self.canonical_in_memory_state.set_finalized(finalized.clone());
2809                    // Update finalized block height metric
2810                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2811                }
2812            }
2813            Err(err) => {
2814                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2815            }
2816        }
2817
2818        Ok(())
2819    }
2820
2821    /// Updates the tracked safe block if we have it
2822    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2823        if safe_block_hash.is_zero() {
2824            return Ok(())
2825        }
2826
2827        match self.find_canonical_header(safe_block_hash) {
2828            Ok(None) => {
2829                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2830                // if the safe block is not known, we can't update the safe block
2831                return Err(OnForkChoiceUpdated::invalid_state())
2832            }
2833            Ok(Some(safe)) => {
2834                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2835                    // we're also persisting the safe block on disk so we can reload it on
2836                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2837                    let _ = self.persistence.save_safe_block_number(safe.number());
2838                    self.canonical_in_memory_state.set_safe(safe.clone());
2839                    // Update safe block height metric
2840                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
2841                }
2842            }
2843            Err(err) => {
2844                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2845            }
2846        }
2847
2848        Ok(())
2849    }
2850
2851    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2852    /// made canonical.
2853    ///
2854    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2855    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2856    ///
2857    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2858    /// are consistent with the head block.
2859    fn ensure_consistent_forkchoice_state(
2860        &self,
2861        state: ForkchoiceState,
2862    ) -> Result<(), OnForkChoiceUpdated> {
2863        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2864        // after the head block is canonicalized.
2865        //
2866        // This ensures that the finalized block is consistent with the head block, i.e. the
2867        // finalized block is an ancestor of the head block.
2868        self.update_finalized_block(state.finalized_block_hash)?;
2869
2870        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2871        // after the head block is canonicalized.
2872        //
2873        // This ensures that the safe block is consistent with the head block, i.e. the safe
2874        // block is an ancestor of the head block.
2875        self.update_safe_block(state.safe_block_hash)
2876    }
2877
2878    /// Validates the payload attributes with respect to the header and fork choice state.
2879    ///
2880    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2881    /// return an error if the payload attributes are invalid.
2882    fn process_payload_attributes(
2883        &self,
2884        attrs: T::PayloadAttributes,
2885        head: &N::BlockHeader,
2886        state: ForkchoiceState,
2887        version: EngineApiMessageVersion,
2888    ) -> OnForkChoiceUpdated {
2889        if let Err(err) =
2890            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2891        {
2892            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2893            return OnForkChoiceUpdated::invalid_payload_attributes()
2894        }
2895
2896        // 8. Client software MUST begin a payload build process building on top of
2897        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2898        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2899        //    The build process is specified in the Payload building section.
2900        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2901            state.head_block_hash,
2902            attrs,
2903            version as u8,
2904        ) {
2905            Ok(attributes) => {
2906                // send the payload to the builder and return the receiver for the pending payload
2907                // id, initiating payload job is handled asynchronously
2908                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2909
2910                // Client software MUST respond to this method call in the following way:
2911                // {
2912                //      payloadStatus: {
2913                //          status: VALID,
2914                //          latestValidHash: forkchoiceState.headBlockHash,
2915                //          validationError: null
2916                //      },
2917                //      payloadId: buildProcessId
2918                // }
2919                //
2920                // if the payload is deemed VALID and the build process has begun.
2921                OnForkChoiceUpdated::updated_with_pending_payload_id(
2922                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2923                    pending_payload_id,
2924                )
2925            }
2926            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2927        }
2928    }
2929
2930    /// Remove all blocks up to __and including__ the given block number.
2931    ///
2932    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2933    /// those which have a fork point at or below the finalized hash.
2934    ///
2935    /// Canonical blocks below the upper bound will still be removed.
2936    pub(crate) fn remove_before(
2937        &mut self,
2938        upper_bound: BlockNumHash,
2939        finalized_hash: Option<B256>,
2940    ) -> ProviderResult<()> {
2941        // first fetch the finalized block number and then call the remove_before method on
2942        // tree_state
2943        let num = if let Some(hash) = finalized_hash {
2944            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2945        } else {
2946            None
2947        };
2948
2949        self.state.tree_state.remove_until(
2950            upper_bound,
2951            self.persistence_state.last_persisted_block.hash,
2952            num,
2953        );
2954        Ok(())
2955    }
2956
2957    /// Returns a builder for creating state providers for the given hash.
2958    ///
2959    /// This is an optimization for parallel execution contexts where we want to avoid
2960    /// creating state providers in the critical path.
2961    pub fn state_provider_builder(
2962        &self,
2963        hash: B256,
2964    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2965    where
2966        P: BlockReader + StateProviderFactory + StateReader + Clone,
2967    {
2968        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2969            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2970            // the block leads back to the canonical chain
2971            return Ok(Some(StateProviderBuilder::new(
2972                self.provider.clone(),
2973                historical,
2974                Some(blocks),
2975            )))
2976        }
2977
2978        // Check if the block is persisted
2979        if let Some(header) = self.provider.header(hash)? {
2980            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2981            // For persisted blocks, we create a builder that will fetch state directly from the
2982            // database
2983            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2984        }
2985
2986        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2987        Ok(None)
2988    }
2989}
2990
2991/// Events received in the main engine loop.
2992#[derive(Debug)]
2993enum LoopEvent<T, N>
2994where
2995    N: NodePrimitives,
2996    T: PayloadTypes,
2997{
2998    /// An engine API message was received.
2999    EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3000    /// A persistence task completed.
3001    PersistenceComplete {
3002        /// The result of the persistence operation.
3003        result: Option<BlockNumHash>,
3004        /// When the persistence operation started.
3005        start_time: Instant,
3006    },
3007    /// A channel was disconnected.
3008    Disconnected,
3009}
3010
3011/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3012/// variant.
3013///
3014/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3015/// is valid or not.
3016#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3017pub enum BlockStatus {
3018    /// The block is valid and block extends canonical chain.
3019    Valid,
3020    /// The block may be valid and has an unknown missing ancestor.
3021    Disconnected {
3022        /// Current canonical head.
3023        head: BlockNumHash,
3024        /// The lowest ancestor block that is not connected to the canonical chain.
3025        missing_ancestor: BlockNumHash,
3026    },
3027}
3028
3029/// How a payload was inserted if it was valid.
3030///
3031/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3032/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3033#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3034pub enum InsertPayloadOk {
3035    /// The payload was valid, but we have already seen it.
3036    AlreadySeen(BlockStatus),
3037    /// The payload was valid and inserted into the tree.
3038    Inserted(BlockStatus),
3039}
3040
3041/// Target for block persistence.
3042#[derive(Debug, Clone, Copy)]
3043enum PersistTarget {
3044    /// Persist up to `canonical_head - memory_block_buffer_target`.
3045    Threshold,
3046    /// Persist all blocks up to and including the canonical head.
3047    Head,
3048}