Skip to main content

reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
17    NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{
28    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
29};
30use reth_primitives_traits::{
31    FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
32};
33use reth_provider::{
34    BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
35    DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
36    StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
37    StorageSettingsCache, TransactionVariant,
38};
39use reth_revm::database::StateProviderDatabase;
40use reth_stages_api::ControlFlow;
41use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
42use reth_trie_db::ChangesetCache;
43use revm::interpreter::debug_unreachable;
44use state::TreeState;
45use std::{fmt::Debug, ops, sync::Arc, time::Duration};
46
47use crossbeam_channel::{Receiver, Sender};
48use tokio::sync::{
49    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
50    oneshot,
51};
52use tracing::*;
53
54mod block_buffer;
55mod cached_state;
56pub mod error;
57pub mod instrumented_state;
58mod invalid_headers;
59mod metrics;
60pub mod payload_processor;
61pub mod payload_validator;
62mod persistence_state;
63pub mod precompile_cache;
64#[cfg(test)]
65mod tests;
66mod trie_updates;
67
68use crate::tree::error::AdvancePersistenceError;
69pub use block_buffer::BlockBuffer;
70pub use cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache};
71pub use invalid_headers::InvalidHeaderCache;
72pub use metrics::EngineApiMetrics;
73pub use payload_processor::*;
74pub use payload_validator::{BasicEngineValidator, EngineValidator};
75pub use persistence_state::PersistenceState;
76pub use reth_engine_primitives::TreeConfig;
77
78pub mod state;
79
80/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
81///
82/// This is the default threshold, and represents the distance (gap) from the local head to a
83/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
84/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
85///
86/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
87/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
88/// backfill this gap.
89pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91/// The minimum number of blocks to retain in the changeset cache after eviction.
92///
93/// This ensures that recent trie changesets are kept in memory for potential reorgs,
94/// even when the finalized block is not set (e.g., on L2s like Optimism).
95const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
96
97/// A builder for creating state providers that can be used across threads.
98#[derive(Clone, Debug)]
99pub struct StateProviderBuilder<N: NodePrimitives, P> {
100    /// The provider factory used to create providers.
101    provider_factory: P,
102    /// The historical block hash to fetch state from.
103    historical: B256,
104    /// The blocks that form the chain from historical to target and are in memory.
105    overlay: Option<Vec<ExecutedBlock<N>>>,
106}
107
108impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
109    /// Creates a new state provider from the provider factory, historical block hash and optional
110    /// overlaid blocks.
111    pub const fn new(
112        provider_factory: P,
113        historical: B256,
114        overlay: Option<Vec<ExecutedBlock<N>>>,
115    ) -> Self {
116        Self { provider_factory, historical, overlay }
117    }
118}
119
120impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
121where
122    P: BlockReader + StateProviderFactory + StateReader + Clone,
123{
124    /// Creates a new state provider from this builder.
125    pub fn build(&self) -> ProviderResult<StateProviderBox> {
126        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
127        if let Some(overlay) = self.overlay.clone() {
128            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
129        }
130        Ok(provider)
131    }
132}
133
134/// Tracks the state of the engine api internals.
135///
136/// This type is not shareable.
137#[derive(Debug)]
138pub struct EngineApiTreeState<N: NodePrimitives> {
139    /// Tracks the state of the blockchain tree.
140    tree_state: TreeState<N>,
141    /// Tracks the forkchoice state updates received by the CL.
142    forkchoice_state_tracker: ForkchoiceStateTracker,
143    /// Buffer of detached blocks.
144    buffer: BlockBuffer<N::Block>,
145    /// Tracks the header of invalid payloads that were rejected by the engine because they're
146    /// invalid.
147    invalid_headers: InvalidHeaderCache,
148}
149
150impl<N: NodePrimitives> EngineApiTreeState<N> {
151    fn new(
152        block_buffer_limit: u32,
153        max_invalid_header_cache_length: u32,
154        canonical_block: BlockNumHash,
155        engine_kind: EngineApiKind,
156    ) -> Self {
157        Self {
158            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
159            buffer: BlockBuffer::new(block_buffer_limit),
160            tree_state: TreeState::new(canonical_block, engine_kind),
161            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
162        }
163    }
164
165    /// Returns a reference to the tree state.
166    pub const fn tree_state(&self) -> &TreeState<N> {
167        &self.tree_state
168    }
169
170    /// Returns true if the block has been marked as invalid.
171    pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
172        self.invalid_headers.get(hash).is_some()
173    }
174}
175
176/// The outcome of a tree operation.
177#[derive(Debug)]
178pub struct TreeOutcome<T> {
179    /// The outcome of the operation.
180    pub outcome: T,
181    /// An optional event to tell the caller to do something.
182    pub event: Option<TreeEvent>,
183}
184
185impl<T> TreeOutcome<T> {
186    /// Create new tree outcome.
187    pub const fn new(outcome: T) -> Self {
188        Self { outcome, event: None }
189    }
190
191    /// Set event on the outcome.
192    pub fn with_event(mut self, event: TreeEvent) -> Self {
193        self.event = Some(event);
194        self
195    }
196}
197
198/// Events that are triggered by Tree Chain
199#[derive(Debug)]
200pub enum TreeEvent {
201    /// Tree action is needed.
202    TreeAction(TreeAction),
203    /// Backfill action is needed.
204    BackfillAction(BackfillAction),
205    /// Block download is needed.
206    Download(DownloadRequest),
207}
208
209impl TreeEvent {
210    /// Returns true if the event is a backfill action.
211    const fn is_backfill_action(&self) -> bool {
212        matches!(self, Self::BackfillAction(_))
213    }
214}
215
216/// The actions that can be performed on the tree.
217#[derive(Debug)]
218pub enum TreeAction {
219    /// Make target canonical.
220    MakeCanonical {
221        /// The sync target head hash
222        sync_target_head: B256,
223    },
224}
225
226/// The engine API tree handler implementation.
227///
228/// This type is responsible for processing engine API requests, maintaining the canonical state and
229/// emitting events.
230pub struct EngineApiTreeHandler<N, P, T, V, C>
231where
232    N: NodePrimitives,
233    T: PayloadTypes,
234    C: ConfigureEvm<Primitives = N> + 'static,
235{
236    provider: P,
237    consensus: Arc<dyn FullConsensus<N>>,
238    payload_validator: V,
239    /// Keeps track of internals such as executed and buffered blocks.
240    state: EngineApiTreeState<N>,
241    /// The half for sending messages to the engine.
242    ///
243    /// This is kept so that we can queue in messages to ourself that we can process later, for
244    /// example distributing workload across multiple messages that would otherwise take too long
245    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
246    /// them one by one so that we can handle incoming engine API in between and don't become
247    /// unresponsive. This can happen during live sync transition where we're trying to close the
248    /// gap (up to 3 epochs of blocks in the worst case).
249    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
250    /// Incoming engine API requests.
251    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
252    /// Outgoing events that are emitted to the handler.
253    outgoing: UnboundedSender<EngineApiEvent<N>>,
254    /// Channels to the persistence layer.
255    persistence: PersistenceHandle<N>,
256    /// Tracks the state changes of the persistence task.
257    persistence_state: PersistenceState,
258    /// Flag indicating the state of the node's backfill synchronization process.
259    backfill_sync_state: BackfillSyncState,
260    /// Keeps track of the state of the canonical chain that isn't persisted yet.
261    /// This is intended to be accessed from external sources, such as rpc.
262    canonical_in_memory_state: CanonicalInMemoryState<N>,
263    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
264    /// updates
265    payload_builder: PayloadBuilderHandle<T>,
266    /// Configuration settings.
267    config: TreeConfig,
268    /// Metrics for the engine api.
269    metrics: EngineApiMetrics,
270    /// The engine API variant of this handler
271    engine_kind: EngineApiKind,
272    /// The EVM configuration.
273    evm_config: C,
274    /// Changeset cache for in-memory trie changesets
275    changeset_cache: ChangesetCache,
276    /// Whether the node uses hashed state as canonical storage (v2 mode).
277    /// Cached at construction to avoid threading `StorageSettingsCache` bounds everywhere.
278    use_hashed_state: bool,
279    /// Task runtime for spawning blocking work on named, reusable threads.
280    runtime: reth_tasks::Runtime,
281}
282
283impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
284    for EngineApiTreeHandler<N, P, T, V, C>
285where
286    N: NodePrimitives,
287    C: Debug + ConfigureEvm<Primitives = N>,
288{
289    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290        f.debug_struct("EngineApiTreeHandler")
291            .field("provider", &self.provider)
292            .field("consensus", &self.consensus)
293            .field("payload_validator", &self.payload_validator)
294            .field("state", &self.state)
295            .field("incoming_tx", &self.incoming_tx)
296            .field("persistence", &self.persistence)
297            .field("persistence_state", &self.persistence_state)
298            .field("backfill_sync_state", &self.backfill_sync_state)
299            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
300            .field("payload_builder", &self.payload_builder)
301            .field("config", &self.config)
302            .field("metrics", &self.metrics)
303            .field("engine_kind", &self.engine_kind)
304            .field("evm_config", &self.evm_config)
305            .field("changeset_cache", &self.changeset_cache)
306            .field("use_hashed_state", &self.use_hashed_state)
307            .field("runtime", &self.runtime)
308            .finish()
309    }
310}
311
312impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
313where
314    N: NodePrimitives,
315    P: DatabaseProviderFactory
316        + BlockReader<Block = N::Block, Header = N::BlockHeader>
317        + StateProviderFactory
318        + StateReader<Receipt = N::Receipt>
319        + HashedPostStateProvider
320        + Clone
321        + 'static,
322    P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
323        + StageCheckpointReader
324        + ChangeSetReader
325        + StorageChangeSetReader
326        + StorageSettingsCache,
327    C: ConfigureEvm<Primitives = N> + 'static,
328    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
329    V: EngineValidator<T> + WaitForCaches,
330{
331    /// Creates a new [`EngineApiTreeHandler`].
332    #[expect(clippy::too_many_arguments)]
333    pub fn new(
334        provider: P,
335        consensus: Arc<dyn FullConsensus<N>>,
336        payload_validator: V,
337        outgoing: UnboundedSender<EngineApiEvent<N>>,
338        state: EngineApiTreeState<N>,
339        canonical_in_memory_state: CanonicalInMemoryState<N>,
340        persistence: PersistenceHandle<N>,
341        persistence_state: PersistenceState,
342        payload_builder: PayloadBuilderHandle<T>,
343        config: TreeConfig,
344        engine_kind: EngineApiKind,
345        evm_config: C,
346        changeset_cache: ChangesetCache,
347        use_hashed_state: bool,
348        runtime: reth_tasks::Runtime,
349    ) -> Self {
350        let (incoming_tx, incoming) = crossbeam_channel::unbounded();
351
352        Self {
353            provider,
354            consensus,
355            payload_validator,
356            incoming,
357            outgoing,
358            persistence,
359            persistence_state,
360            backfill_sync_state: BackfillSyncState::Idle,
361            state,
362            canonical_in_memory_state,
363            payload_builder,
364            config,
365            metrics: Default::default(),
366            incoming_tx,
367            engine_kind,
368            evm_config,
369            changeset_cache,
370            use_hashed_state,
371            runtime,
372        }
373    }
374
375    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
376    /// own thread.
377    ///
378    /// Returns the sender through which incoming requests can be sent to the task and the receiver
379    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
380    #[expect(clippy::complexity)]
381    pub fn spawn_new(
382        provider: P,
383        consensus: Arc<dyn FullConsensus<N>>,
384        payload_validator: V,
385        persistence: PersistenceHandle<N>,
386        payload_builder: PayloadBuilderHandle<T>,
387        canonical_in_memory_state: CanonicalInMemoryState<N>,
388        config: TreeConfig,
389        kind: EngineApiKind,
390        evm_config: C,
391        changeset_cache: ChangesetCache,
392        use_hashed_state: bool,
393        runtime: reth_tasks::Runtime,
394    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
395    {
396        let best_block_number = provider.best_block_number().unwrap_or(0);
397        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
398
399        let persistence_state = PersistenceState {
400            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
401            rx: None,
402        };
403
404        let (tx, outgoing) = unbounded_channel();
405        let state = EngineApiTreeState::new(
406            config.block_buffer_limit(),
407            config.max_invalid_header_cache_length(),
408            header.num_hash(),
409            kind,
410        );
411
412        let task = Self::new(
413            provider,
414            consensus,
415            payload_validator,
416            tx,
417            state,
418            canonical_in_memory_state,
419            persistence,
420            persistence_state,
421            payload_builder,
422            config,
423            kind,
424            evm_config,
425            changeset_cache,
426            use_hashed_state,
427            runtime,
428        );
429        let incoming = task.incoming_tx.clone();
430        spawn_os_thread("engine", || {
431            increase_thread_priority();
432            task.run()
433        });
434        (incoming, outgoing)
435    }
436
437    /// Returns a [`TreeOutcome`] indicating the forkchoice head is valid and canonical.
438    fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
439        TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
440            PayloadStatusEnum::Valid,
441            Some(state.head_block_hash),
442        )))
443    }
444
445    /// Returns a new [`Sender`] to send messages to this type.
446    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
447        self.incoming_tx.clone()
448    }
449
450    /// Run the engine API handler.
451    ///
452    /// This will block the current thread and process incoming messages.
453    pub fn run(mut self) {
454        loop {
455            match self.wait_for_event() {
456                LoopEvent::EngineMessage(msg) => {
457                    debug!(target: "engine::tree", %msg, "received new engine message");
458                    match self.on_engine_message(msg) {
459                        Ok(ops::ControlFlow::Break(())) => return,
460                        Ok(ops::ControlFlow::Continue(())) => {}
461                        Err(fatal) => {
462                            error!(target: "engine::tree", %fatal, "insert block fatal error");
463                            return
464                        }
465                    }
466                }
467                LoopEvent::PersistenceComplete { result, start_time } => {
468                    if let Err(err) = self.on_persistence_complete(result, start_time) {
469                        error!(target: "engine::tree", %err, "Persistence complete handling failed");
470                        return
471                    }
472                }
473                LoopEvent::Disconnected => {
474                    error!(target: "engine::tree", "Channel disconnected");
475                    return
476                }
477            }
478
479            // Always check if we need to trigger new persistence after any event:
480            // - After engine messages: new blocks may have been inserted that exceed the
481            //   persistence threshold
482            // - After persistence completion: we can now persist more blocks if needed
483            if let Err(err) = self.advance_persistence() {
484                error!(target: "engine::tree", %err, "Advancing persistence failed");
485                return
486            }
487        }
488    }
489
490    /// Blocks until the next event is ready: either an incoming engine message or a persistence
491    /// completion (if one is in progress).
492    ///
493    /// Uses biased selection to prioritize persistence completion to update in-memory state and
494    /// unblock further writes.
495    fn wait_for_event(&mut self) -> LoopEvent<T, N> {
496        // Take ownership of persistence rx if present
497        let maybe_persistence = self.persistence_state.rx.take();
498
499        if let Some((persistence_rx, start_time, action)) = maybe_persistence {
500            // Biased select prioritizes persistence completion to update in memory state and
501            // unblock further writes
502            crossbeam_channel::select_biased! {
503                recv(persistence_rx) -> result => {
504                    // Don't put it back - consumed (oneshot-like behavior)
505                    match result {
506                        Ok(value) => LoopEvent::PersistenceComplete {
507                            result: value,
508                            start_time,
509                        },
510                        Err(_) => LoopEvent::Disconnected,
511                    }
512                },
513                recv(self.incoming) -> msg => {
514                    // Put the persistence rx back - we didn't consume it
515                    self.persistence_state.rx = Some((persistence_rx, start_time, action));
516                    match msg {
517                        Ok(m) => LoopEvent::EngineMessage(m),
518                        Err(_) => LoopEvent::Disconnected,
519                    }
520                },
521            }
522        } else {
523            // No persistence in progress - just wait on incoming
524            match self.incoming.recv() {
525                Ok(m) => LoopEvent::EngineMessage(m),
526                Err(_) => LoopEvent::Disconnected,
527            }
528        }
529    }
530
531    /// Invoked when previously requested blocks were downloaded.
532    ///
533    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
534    /// will execute the first batch and send the remaining blocks back through the channel so that
535    /// block request processing isn't blocked for a long time.
536    fn on_downloaded(
537        &mut self,
538        mut blocks: Vec<SealedBlock<N::Block>>,
539    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
540        if blocks.is_empty() {
541            // nothing to execute
542            return Ok(None)
543        }
544
545        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
546        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
547        for block in blocks.drain(..batch) {
548            if let Some(event) = self.on_downloaded_block(block)? {
549                let needs_backfill = event.is_backfill_action();
550                self.on_tree_event(event)?;
551                if needs_backfill {
552                    // can exit early if backfill is needed
553                    return Ok(None)
554                }
555            }
556        }
557
558        // if we still have blocks to execute, send them as a followup request
559        if !blocks.is_empty() {
560            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
561        }
562
563        Ok(None)
564    }
565
566    /// When the Consensus layer receives a new block via the consensus gossip protocol,
567    /// the transactions in the block are sent to the execution layer in the form of a
568    /// [`PayloadTypes::ExecutionData`], for example
569    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
570    /// Execution layer executes the transactions and validates the state in the block header,
571    /// then passes validation data back to Consensus layer, that adds the block to the head of
572    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
573    /// network in the form of a "Beacon block".
574    ///
575    /// These responses should adhere to the [Engine API Spec for
576    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
577    ///
578    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
579    /// returns an error if an internal error occurred.
580    #[instrument(
581        level = "debug",
582        target = "engine::tree",
583        skip_all,
584        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
585    )]
586    fn on_new_payload(
587        &mut self,
588        payload: T::ExecutionData,
589    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
590        trace!(target: "engine::tree", "invoked new payload");
591
592        // start timing for the new payload process
593        let start = Instant::now();
594
595        // Ensures that the given payload does not violate any consensus rules that concern the
596        // block's layout, like:
597        //    - missing or invalid base fee
598        //    - invalid extra data
599        //    - invalid transactions
600        //    - incorrect hash
601        //    - the versioned hashes passed with the payload do not exactly match transaction
602        //      versioned hashes
603        //    - the block does not contain blob transactions if it is pre-cancun
604        //
605        // This validates the following engine API rule:
606        //
607        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
608        //    validation by taking the following steps:
609        //
610        //   1. Obtain the actual array by concatenating blob versioned hashes lists
611        //      (`tx.blob_versioned_hashes`) of each [blob
612        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
613        //      in the payload, respecting the order of inclusion. If the payload has no blob
614        //      transactions the expected array **MUST** be `[]`.
615        //
616        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
617        //      null}` if the expected and the actual arrays don't match.
618        //
619        // This validation **MUST** be instantly run in all cases even during active sync process.
620
621        let num_hash = payload.num_hash();
622        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
623        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
624
625        let block_hash = num_hash.hash;
626
627        // Check for invalid ancestors
628        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
629            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
630            return Ok(TreeOutcome::new(status));
631        }
632
633        // record pre-execution phase duration
634        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
635
636        let status = if self.backfill_sync_state.is_idle() {
637            self.try_insert_payload(payload)?
638        } else {
639            self.try_buffer_payload(payload)?
640        };
641
642        let mut outcome = TreeOutcome::new(status);
643        // if the block is valid and it is the current sync target head, make it canonical
644        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
645            // Only create the canonical event if this block isn't already the canonical head
646            if self.state.tree_state.canonical_block_hash() != block_hash {
647                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
648                    sync_target_head: block_hash,
649                }));
650            }
651        }
652
653        // record total newPayload duration
654        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
655
656        Ok(outcome)
657    }
658
659    /// Processes a payload during normal sync operation.
660    ///
661    /// Returns:
662    /// - `Valid`: Payload successfully validated and inserted
663    /// - `Syncing`: Parent missing, payload buffered for later
664    /// - Error status: Payload is invalid
665    #[instrument(level = "debug", target = "engine::tree", skip_all)]
666    fn try_insert_payload(
667        &mut self,
668        payload: T::ExecutionData,
669    ) -> Result<PayloadStatus, InsertBlockFatalError> {
670        let block_hash = payload.block_hash();
671        let num_hash = payload.num_hash();
672        let parent_hash = payload.parent_hash();
673        let mut latest_valid_hash = None;
674
675        match self.insert_payload(payload) {
676            Ok(status) => {
677                let status = match status {
678                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
679                        latest_valid_hash = Some(block_hash);
680                        self.try_connect_buffered_blocks(num_hash)?;
681                        PayloadStatusEnum::Valid
682                    }
683                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
684                        latest_valid_hash = Some(block_hash);
685                        PayloadStatusEnum::Valid
686                    }
687                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
688                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
689                        // not known to be invalid, but we don't know anything else
690                        PayloadStatusEnum::Syncing
691                    }
692                };
693
694                Ok(PayloadStatus::new(status, latest_valid_hash))
695            }
696            Err(error) => match error {
697                InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
698                InsertPayloadError::Payload(error) => {
699                    Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
700                }
701            },
702        }
703    }
704
705    /// Stores a payload for later processing during backfill sync.
706    ///
707    /// During backfill, the node lacks the state needed to validate payloads,
708    /// so they are buffered (stored in memory) until their parent blocks are synced.
709    ///
710    /// Returns:
711    /// - `Syncing`: Payload successfully buffered
712    /// - Error status: Payload is malformed or invalid
713    fn try_buffer_payload(
714        &mut self,
715        payload: T::ExecutionData,
716    ) -> Result<PayloadStatus, InsertBlockFatalError> {
717        let parent_hash = payload.parent_hash();
718        let num_hash = payload.num_hash();
719
720        match self.payload_validator.convert_payload_to_block(payload) {
721            // if the block is well-formed, buffer it for later
722            Ok(block) => {
723                if let Err(error) = self.buffer_block(block) {
724                    Ok(self.on_insert_block_error(error)?)
725                } else {
726                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
727                }
728            }
729            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
730        }
731    }
732
733    /// Returns the new chain for the given head.
734    ///
735    /// This also handles reorgs.
736    ///
737    /// Note: This does not update the tracked state and instead returns the new chain based on the
738    /// given head.
739    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
740        // get the executed new head block
741        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
742            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
743            self.metrics.engine.executed_new_block_cache_miss.increment(1);
744            return Ok(None)
745        };
746
747        let new_head_number = new_head_block.recovered_block().number();
748        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
749
750        let mut new_chain = vec![new_head_block.clone()];
751        let mut current_hash = new_head_block.recovered_block().parent_hash();
752        let mut current_number = new_head_number - 1;
753
754        // Walk back the new chain until we reach a block we know about
755        //
756        // This is only done for in-memory blocks, because we should not have persisted any blocks
757        // that are _above_ the current canonical head.
758        while current_number > current_canonical_number {
759            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
760            {
761                current_hash = block.recovered_block().parent_hash();
762                current_number -= 1;
763                new_chain.push(block);
764            } else {
765                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
766                // This should never happen as we're walking back a chain that should connect to
767                // the canonical chain
768                return Ok(None)
769            }
770        }
771
772        // If we have reached the current canonical head by walking back from the target, then we
773        // know this represents an extension of the canonical chain.
774        if current_hash == self.state.tree_state.current_canonical_head.hash {
775            new_chain.reverse();
776
777            // Simple extension of the current chain
778            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
779        }
780
781        // We have a reorg. Walk back both chains to find the fork point.
782        let mut old_chain = Vec::new();
783        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
784
785        // If the canonical chain is ahead of the new chain,
786        // gather all blocks until new head number.
787        while current_canonical_number > current_number {
788            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
789                old_hash = block.recovered_block().parent_hash();
790                old_chain.push(block);
791                current_canonical_number -= 1;
792            } else {
793                // This shouldn't happen as we're walking back the canonical chain
794                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
795                return Ok(None)
796            }
797        }
798
799        // Both new and old chain pointers are now at the same height.
800        debug_assert_eq!(current_number, current_canonical_number);
801
802        // Walk both chains from specified hashes at same height until
803        // a common ancestor (fork block) is reached.
804        while old_hash != current_hash {
805            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
806                old_hash = block.recovered_block().parent_hash();
807                old_chain.push(block);
808            } else {
809                // This shouldn't happen as we're walking back the canonical chain
810                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
811                return Ok(None)
812            }
813
814            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
815            {
816                current_hash = block.recovered_block().parent_hash();
817                new_chain.push(block);
818            } else {
819                // This shouldn't happen as we've already walked this path
820                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
821                return Ok(None)
822            }
823        }
824        new_chain.reverse();
825        old_chain.reverse();
826
827        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
828    }
829
830    /// Updates the latest block state to the specified canonical ancestor.
831    ///
832    /// This method ensures that the latest block tracks the given canonical header by resetting
833    ///
834    /// # Arguments
835    /// * `canonical_header` - The canonical header to set as the new head
836    ///
837    /// # Returns
838    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
839    ///
840    /// Caution: This unwinds the canonical chain
841    fn update_latest_block_to_canonical_ancestor(
842        &mut self,
843        canonical_header: &SealedHeader<N::BlockHeader>,
844    ) -> ProviderResult<()> {
845        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
846        let current_head_number = self.state.tree_state.canonical_block_number();
847        let new_head_number = canonical_header.number();
848        let new_head_hash = canonical_header.hash();
849
850        // Update tree state with the new canonical head
851        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
852
853        // Handle the state update based on whether this is an unwind scenario
854        if new_head_number < current_head_number {
855            debug!(
856                target: "engine::tree",
857                current_head = current_head_number,
858                new_head = new_head_number,
859                new_head_hash = ?new_head_hash,
860                "FCU unwind detected: reverting to canonical ancestor"
861            );
862
863            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
864        } else {
865            debug!(
866                target: "engine::tree",
867                previous_head = current_head_number,
868                new_head = new_head_number,
869                new_head_hash = ?new_head_hash,
870                "Advancing latest block to canonical ancestor"
871            );
872            self.handle_chain_advance_or_same_height(canonical_header)
873        }
874    }
875
876    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
877    /// to the canonical header
878    fn handle_canonical_chain_unwind(
879        &self,
880        current_head_number: u64,
881        canonical_header: &SealedHeader<N::BlockHeader>,
882    ) -> ProviderResult<()> {
883        let new_head_number = canonical_header.number();
884        debug!(
885            target: "engine::tree",
886            from = current_head_number,
887            to = new_head_number,
888            "Handling unwind: collecting blocks to remove from in-memory state"
889        );
890
891        // Collect blocks that need to be removed from memory
892        let old_blocks =
893            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
894
895        // Load and apply the canonical ancestor block
896        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
897    }
898
899    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
900    fn collect_blocks_for_canonical_unwind(
901        &self,
902        new_head_number: u64,
903        current_head_number: u64,
904    ) -> Vec<ExecutedBlock<N>> {
905        let mut old_blocks =
906            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
907
908        for block_num in (new_head_number + 1)..=current_head_number {
909            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
910                let executed_block = block_state.block_ref().clone();
911                old_blocks.push(executed_block);
912                debug!(
913                    target: "engine::tree",
914                    block_number = block_num,
915                    "Collected block for removal from in-memory state"
916                );
917            }
918        }
919
920        if old_blocks.is_empty() {
921            debug!(
922                target: "engine::tree",
923                "No blocks found in memory to remove, will clear and reset state"
924            );
925        }
926
927        old_blocks
928    }
929
930    /// Applies the canonical ancestor block via a reorg operation.
931    fn apply_canonical_ancestor_via_reorg(
932        &self,
933        canonical_header: &SealedHeader<N::BlockHeader>,
934        old_blocks: Vec<ExecutedBlock<N>>,
935    ) -> ProviderResult<()> {
936        let new_head_hash = canonical_header.hash();
937        let new_head_number = canonical_header.number();
938
939        // Try to load the canonical ancestor's block
940        match self.canonical_block_by_hash(new_head_hash)? {
941            Some(executed_block) => {
942                // Perform the reorg to properly handle the unwind
943                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
944                    new: vec![executed_block],
945                    old: old_blocks,
946                });
947
948                // CRITICAL: Update the canonical head after the reorg
949                // This ensures get_canonical_head() returns the correct block
950                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
951
952                debug!(
953                    target: "engine::tree",
954                    block_number = new_head_number,
955                    block_hash = ?new_head_hash,
956                    "Successfully loaded canonical ancestor into memory via reorg"
957                );
958            }
959            None => {
960                // Fallback: update header only if block cannot be found
961                warn!(
962                    target: "engine::tree",
963                    block_hash = ?new_head_hash,
964                    "Could not find canonical ancestor block, updating header only"
965                );
966                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
967            }
968        }
969
970        Ok(())
971    }
972
973    /// Handles chain advance or same height scenarios.
974    fn handle_chain_advance_or_same_height(
975        &self,
976        canonical_header: &SealedHeader<N::BlockHeader>,
977    ) -> ProviderResult<()> {
978        // Load the block into memory if it's not already present
979        self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
980
981        // Update the canonical head header
982        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
983
984        Ok(())
985    }
986
987    /// Ensures a block is loaded into memory if not already present.
988    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
989        // Check if block is already in memory
990        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
991            return Ok(());
992        }
993
994        // Try to load the block from storage
995        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
996            self.canonical_in_memory_state
997                .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
998
999            debug!(
1000                target: "engine::tree",
1001                block_number,
1002                block_hash = ?block_hash,
1003                "Added canonical block to in-memory state"
1004            );
1005        }
1006
1007        Ok(())
1008    }
1009
1010    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1011    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1012    /// chain.
1013    ///
1014    /// These responses should adhere to the [Engine API Spec for
1015    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1016    ///
1017    /// Returns an error if an internal error occurred like a database error.
1018    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1019    fn on_forkchoice_updated(
1020        &mut self,
1021        state: ForkchoiceState,
1022        attrs: Option<T::PayloadAttributes>,
1023        version: EngineApiMessageVersion,
1024    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1025        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1026
1027        // Record metrics
1028        self.record_forkchoice_metrics();
1029
1030        // Pre-validation of forkchoice state
1031        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1032            return Ok(TreeOutcome::new(early_result));
1033        }
1034
1035        // Return early if we are on the correct fork
1036        if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
1037            return Ok(result);
1038        }
1039
1040        // Attempt to apply a chain update when the head differs from our canonical chain.
1041        // This handles reorgs and chain extensions by making the specified head canonical.
1042        if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1043            return Ok(result);
1044        }
1045
1046        // Fallback that ensures to catch up to the network's state.
1047        self.handle_missing_block(state)
1048    }
1049
1050    /// Records metrics for forkchoice updated calls
1051    fn record_forkchoice_metrics(&self) {
1052        self.canonical_in_memory_state.on_forkchoice_update_received();
1053    }
1054
1055    /// Pre-validates the forkchoice state and returns early if validation fails.
1056    ///
1057    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1058    /// returned. Returns `None` if validation passes and processing should continue.
1059    fn validate_forkchoice_state(
1060        &mut self,
1061        state: ForkchoiceState,
1062    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1063        if state.head_block_hash.is_zero() {
1064            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1065        }
1066
1067        // Check if the new head hash is connected to any ancestor that we previously marked as
1068        // invalid
1069        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1070        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1071            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1072        }
1073
1074        if !self.backfill_sync_state.is_idle() {
1075            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1076            // exclusive access to the database
1077            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1078            return Ok(Some(OnForkChoiceUpdated::syncing()));
1079        }
1080
1081        Ok(None)
1082    }
1083
1084    /// Handles the case where the forkchoice head is already canonical.
1085    ///
1086    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1087    /// processing is complete. Returns `None` if the head is not canonical and processing
1088    /// should continue.
1089    fn handle_canonical_head(
1090        &self,
1091        state: ForkchoiceState,
1092        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1093        version: EngineApiMessageVersion,
1094    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1095        // Process the forkchoice update by trying to make the head block canonical
1096        //
1097        // We can only process this forkchoice update if:
1098        // - we have the `head` block
1099        // - the head block is part of a chain that is connected to the canonical chain. This
1100        //   includes reorgs.
1101        //
1102        // Performing a FCU involves:
1103        // - marking the FCU's head block as canonical
1104        // - updating in memory state to reflect the new canonical chain
1105        // - updating canonical state trackers
1106        // - emitting a canonicalization event for the new chain (including reorg)
1107        // - if we have payload attributes, delegate them to the payload service
1108
1109        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1110            return Ok(None);
1111        }
1112
1113        trace!(target: "engine::tree", "fcu head hash is already canonical");
1114
1115        // Update the safe and finalized blocks and ensure their values are valid
1116        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1117            // safe or finalized hashes are invalid
1118            return Ok(Some(TreeOutcome::new(outcome)));
1119        }
1120
1121        // Process payload attributes if the head is already canonical
1122        if let Some(attr) = attrs {
1123            let tip = self
1124                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1125                .ok_or_else(|| {
1126                    // If we can't find the canonical block, then something is wrong and we need
1127                    // to return an error
1128                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1129                })?;
1130            // Clone only when we actually need to process the attributes
1131            let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1132            return Ok(Some(TreeOutcome::new(updated)));
1133        }
1134
1135        // The head block is already canonical
1136        Ok(Some(Self::valid_outcome(state)))
1137    }
1138
1139    /// Applies chain update for the new head block and processes payload attributes.
1140    ///
1141    /// This method handles the case where the forkchoice head differs from our current canonical
1142    /// head. It attempts to make the specified head block canonical by:
1143    /// - Checking if the head is already part of the canonical chain
1144    /// - Applying chain reorganizations (reorgs) if necessary
1145    /// - Processing payload attributes if provided
1146    /// - Returning the appropriate forkchoice update response
1147    ///
1148    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1149    /// Returns `None` if no chain update was needed or possible.
1150    fn apply_chain_update(
1151        &mut self,
1152        state: ForkchoiceState,
1153        attrs: &Option<T::PayloadAttributes>,
1154        version: EngineApiMessageVersion,
1155    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1156        // Check if the head is already part of the canonical chain
1157        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1158            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1159
1160            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1161            // own chain at will, so we need to always trigger a new payload job if requested.
1162            if self.engine_kind.is_opstack() ||
1163                self.config.always_process_payload_attributes_on_canonical_head()
1164            {
1165                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1166                // part of the canonical chain. We need to update the latest block state to reflect
1167                // the canonical ancestor. This ensures that state providers and the transaction
1168                // pool operate with the correct chain state after forkchoice update processing, and
1169                // new payloads built on the reorg'd head will be added to the tree immediately.
1170                if self.config.unwind_canonical_header() {
1171                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1172                }
1173
1174                if let Some(attr) = attrs {
1175                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1176                    // Clone only when we actually need to process the attributes
1177                    let updated = self.process_payload_attributes(
1178                        attr.clone(),
1179                        &canonical_header,
1180                        state,
1181                        version,
1182                    );
1183                    return Ok(Some(TreeOutcome::new(updated)));
1184                }
1185            }
1186
1187            // According to the Engine API specification, client software MAY skip an update of the
1188            // forkchoice state and MUST NOT begin a payload build process if
1189            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1190            // of canonical chain, i.e. the ancestor passed payload validation process
1191            // and deemed `VALID`. In the case of such an event, client software MUST
1192            // return `{payloadStatus: {status: VALID, latestValidHash:
1193            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1194
1195            // The head block is already canonical and we're not processing payload attributes,
1196            // so we're not triggering a payload job and can return right away
1197            return Ok(Some(Self::valid_outcome(state)));
1198        }
1199
1200        // Ensure we can apply a new chain update for the head block
1201        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1202            let tip = chain_update.tip().clone_sealed_header();
1203            self.on_canonical_chain_update(chain_update);
1204
1205            // Update the safe and finalized blocks and ensure their values are valid
1206            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1207                // safe or finalized hashes are invalid
1208                return Ok(Some(TreeOutcome::new(outcome)));
1209            }
1210
1211            if let Some(attr) = attrs {
1212                // Clone only when we actually need to process the attributes
1213                let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1214                return Ok(Some(TreeOutcome::new(updated)));
1215            }
1216
1217            return Ok(Some(Self::valid_outcome(state)));
1218        }
1219
1220        Ok(None)
1221    }
1222
1223    /// Handles the case where the head block is missing and needs to be downloaded.
1224    ///
1225    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1226    /// Returns a `TreeOutcome` with syncing status and download event.
1227    fn handle_missing_block(
1228        &self,
1229        state: ForkchoiceState,
1230    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1231        // We don't have the block to perform the forkchoice update
1232        // We assume the FCU is valid and at least the head is missing,
1233        // so we need to start syncing to it
1234        //
1235        // find the appropriate target to sync to, if we don't have the safe block hash then we
1236        // start syncing to the safe block via backfill first
1237        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1238        // check that safe block is valid and missing
1239        !state.safe_block_hash.is_zero() &&
1240        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1241        {
1242            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1243            state.safe_block_hash
1244        } else {
1245            state.head_block_hash
1246        };
1247
1248        let target = self.lowest_buffered_ancestor_or(target);
1249        trace!(target: "engine::tree", %target, "downloading missing block");
1250
1251        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1252            PayloadStatusEnum::Syncing,
1253        )))
1254        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1255    }
1256
1257    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1258    /// the current persistence action while we're removing blocks.
1259    fn remove_blocks(&mut self, new_tip_num: u64) {
1260        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1261        if new_tip_num < self.persistence_state.last_persisted_block.number {
1262            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1263            let (tx, rx) = crossbeam_channel::bounded(1);
1264            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1265            self.persistence_state.start_remove(new_tip_num, rx);
1266        }
1267    }
1268
1269    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1270    /// the current persistence action while we're saving blocks.
1271    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1272        if blocks_to_persist.is_empty() {
1273            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1274            return
1275        }
1276
1277        // NOTE: checked non-empty above
1278        let highest_num_hash = blocks_to_persist
1279            .iter()
1280            .max_by_key(|block| block.recovered_block().number())
1281            .map(|b| b.recovered_block().num_hash())
1282            .expect("Checked non-empty persisting blocks");
1283
1284        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1285        let (tx, rx) = crossbeam_channel::bounded(1);
1286        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1287
1288        self.persistence_state.start_save(highest_num_hash, rx);
1289    }
1290
1291    /// Triggers new persistence actions if no persistence task is currently in progress.
1292    ///
1293    /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
1294    /// Persistence completion is handled separately via the `wait_for_event` method.
1295    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1296        if !self.persistence_state.in_progress() {
1297            if let Some(new_tip_num) = self.find_disk_reorg()? {
1298                self.remove_blocks(new_tip_num)
1299            } else if self.should_persist() {
1300                let blocks_to_persist =
1301                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1302                self.persist_blocks(blocks_to_persist);
1303            }
1304        }
1305
1306        Ok(())
1307    }
1308
1309    /// Finishes termination by persisting all remaining blocks and signaling completion.
1310    ///
1311    /// This blocks until all persistence is complete. Always signals completion,
1312    /// even if an error occurs.
1313    fn finish_termination(
1314        &mut self,
1315        pending_termination: oneshot::Sender<()>,
1316    ) -> Result<(), AdvancePersistenceError> {
1317        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1318        let result = self.persist_until_complete();
1319        let _ = pending_termination.send(());
1320        result
1321    }
1322
1323    /// Persists all remaining blocks until none are left.
1324    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1325        loop {
1326            // Wait for any in-progress persistence to complete (blocking)
1327            if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
1328                let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1329                self.on_persistence_complete(result, start_time)?;
1330            }
1331
1332            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1333
1334            if blocks_to_persist.is_empty() {
1335                debug!(target: "engine::tree", "persistence complete, signaling termination");
1336                return Ok(())
1337            }
1338
1339            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1340            self.persist_blocks(blocks_to_persist);
1341        }
1342    }
1343
1344    /// Tries to poll for a completed persistence task (non-blocking).
1345    ///
1346    /// Returns `true` if a persistence task was completed, `false` otherwise.
1347    #[cfg(test)]
1348    pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1349        let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1350            return Ok(false);
1351        };
1352
1353        match rx.try_recv() {
1354            Ok(result) => {
1355                self.on_persistence_complete(result, start_time)?;
1356                Ok(true)
1357            }
1358            Err(crossbeam_channel::TryRecvError::Empty) => {
1359                // Not ready yet, put it back
1360                self.persistence_state.rx = Some((rx, start_time, action));
1361                Ok(false)
1362            }
1363            Err(crossbeam_channel::TryRecvError::Disconnected) => {
1364                Err(AdvancePersistenceError::ChannelClosed)
1365            }
1366        }
1367    }
1368
1369    /// Handles a completed persistence task.
1370    fn on_persistence_complete(
1371        &mut self,
1372        last_persisted_hash_num: Option<BlockNumHash>,
1373        start_time: Instant,
1374    ) -> Result<(), AdvancePersistenceError> {
1375        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1376
1377        let Some(BlockNumHash {
1378            hash: last_persisted_block_hash,
1379            number: last_persisted_block_number,
1380        }) = last_persisted_hash_num
1381        else {
1382            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1383            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1384            return Ok(())
1385        };
1386
1387        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1388        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1389
1390        // Evict trie changesets for blocks below the eviction threshold.
1391        // Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
1392        // the finalized block if set.
1393        let min_threshold =
1394            last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1395        let eviction_threshold =
1396            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1397                // Use the minimum of finalized block and retention threshold to be conservative
1398                finalized.number.min(min_threshold)
1399            } else {
1400                // When finalized is not set (e.g., on L2s), use the retention threshold
1401                min_threshold
1402            };
1403        debug!(
1404            target: "engine::tree",
1405            last_persisted = last_persisted_block_number,
1406            finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1407            eviction_threshold,
1408            "Evicting changesets below threshold"
1409        );
1410        self.changeset_cache.evict(eviction_threshold);
1411
1412        // Invalidate cached overlay since the anchor has changed
1413        self.state.tree_state.invalidate_cached_overlay();
1414
1415        self.on_new_persisted_block()?;
1416
1417        // Re-prepare overlay for the current canonical head with the new anchor.
1418        // Spawn a background task to trigger computation so it's ready when the next payload
1419        // arrives.
1420        if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1421            self.runtime.spawn_blocking_named("prepare-overlay", move || {
1422                let _ = overlay.get();
1423            });
1424        }
1425
1426        Ok(())
1427    }
1428
1429    /// Handles a message from the engine.
1430    ///
1431    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1432    fn on_engine_message(
1433        &mut self,
1434        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1435    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1436        match msg {
1437            FromEngine::Event(event) => match event {
1438                FromOrchestrator::BackfillSyncStarted => {
1439                    debug!(target: "engine::tree", "received backfill sync started event");
1440                    self.backfill_sync_state = BackfillSyncState::Active;
1441                }
1442                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1443                    self.on_backfill_sync_finished(ctrl)?;
1444                }
1445                FromOrchestrator::Terminate { tx } => {
1446                    debug!(target: "engine::tree", "received terminate request");
1447                    if let Err(err) = self.finish_termination(tx) {
1448                        error!(target: "engine::tree", %err, "Termination failed");
1449                    }
1450                    return Ok(ops::ControlFlow::Break(()))
1451                }
1452            },
1453            FromEngine::Request(request) => {
1454                match request {
1455                    EngineApiRequest::InsertExecutedBlock(block) => {
1456                        let block_num_hash = block.recovered_block().num_hash();
1457                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1458                            // outdated block that can be skipped
1459                            return Ok(ops::ControlFlow::Continue(()))
1460                        }
1461
1462                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1463                        let now = Instant::now();
1464
1465                        // if the parent is the canonical head, we can insert the block as the
1466                        // pending block
1467                        if self.state.tree_state.canonical_block_hash() ==
1468                            block.recovered_block().parent_hash()
1469                        {
1470                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1471                            self.canonical_in_memory_state.set_pending_block(block.clone());
1472                        }
1473
1474                        self.state.tree_state.insert_executed(block.clone());
1475                        self.payload_validator.on_inserted_executed_block(block.clone());
1476                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1477                        self.emit_event(EngineApiEvent::BeaconConsensus(
1478                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1479                        ));
1480                    }
1481                    EngineApiRequest::Beacon(request) => {
1482                        match request {
1483                            BeaconEngineMessage::ForkchoiceUpdated {
1484                                state,
1485                                payload_attrs,
1486                                tx,
1487                                version,
1488                            } => {
1489                                let has_attrs = payload_attrs.is_some();
1490
1491                                let start = Instant::now();
1492                                let mut output =
1493                                    self.on_forkchoice_updated(state, payload_attrs, version);
1494
1495                                if let Ok(res) = &mut output {
1496                                    // track last received forkchoice state
1497                                    self.state
1498                                        .forkchoice_state_tracker
1499                                        .set_latest(state, res.outcome.forkchoice_status());
1500
1501                                    // emit an event about the handled FCU
1502                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1503                                        state,
1504                                        res.outcome.forkchoice_status(),
1505                                    ));
1506
1507                                    // handle the event if any
1508                                    self.on_maybe_tree_event(res.event.take())?;
1509                                }
1510
1511                                if let Err(ref err) = output {
1512                                    error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1513                                }
1514
1515                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1516                                    start,
1517                                    &mut self.metrics.engine.new_payload.latest_finish_at,
1518                                    has_attrs,
1519                                    &output,
1520                                );
1521
1522                                if let Err(err) =
1523                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1524                                {
1525                                    self.metrics
1526                                        .engine
1527                                        .failed_forkchoice_updated_response_deliveries
1528                                        .increment(1);
1529                                    warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1530                                }
1531                            }
1532                            BeaconEngineMessage::NewPayload { payload, tx } => {
1533                                let start = Instant::now();
1534                                let gas_used = payload.gas_used();
1535                                let num_hash = payload.num_hash();
1536                                let mut output = self.on_new_payload(payload);
1537                                self.metrics.engine.new_payload.update_response_metrics(
1538                                    start,
1539                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1540                                    &output,
1541                                    gas_used,
1542                                );
1543
1544                                let maybe_event =
1545                                    output.as_mut().ok().and_then(|out| out.event.take());
1546
1547                                // emit response
1548                                if let Err(err) =
1549                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1550                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1551                                    }))
1552                                {
1553                                    warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1554                                    self.metrics
1555                                        .engine
1556                                        .failed_new_payload_response_deliveries
1557                                        .increment(1);
1558                                }
1559
1560                                // handle the event if any
1561                                self.on_maybe_tree_event(maybe_event)?;
1562                            }
1563                            BeaconEngineMessage::RethNewPayload { payload, tx } => {
1564                                // Before processing the new payload, we wait for persistence and
1565                                // cache updates to complete. We do it in parallel, spawning
1566                                // persistence and cache update wait tasks with Tokio, so that we
1567                                // can get an unbiased breakdown on how long did every step take.
1568                                //
1569                                // If we first wait for persistence, and only then for cache
1570                                // updates, we will offset the cache update waits by the duration of
1571                                // persistence, which is incorrect.
1572                                debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload");
1573
1574                                let pending_persistence = self.persistence_state.rx.take();
1575                                let persistence_rx = if let Some((rx, start_time, _action)) =
1576                                    pending_persistence
1577                                {
1578                                    let (persistence_tx, persistence_rx) =
1579                                        std::sync::mpsc::channel();
1580                                    self.runtime.spawn_blocking_named("wait-persist", move || {
1581                                        let start = Instant::now();
1582                                        let result =
1583                                            rx.recv().expect("persistence state channel closed");
1584                                        let _ = persistence_tx.send((
1585                                            result,
1586                                            start_time,
1587                                            start.elapsed(),
1588                                        ));
1589                                    });
1590                                    Some(persistence_rx)
1591                                } else {
1592                                    None
1593                                };
1594
1595                                let cache_wait = self.payload_validator.wait_for_caches();
1596
1597                                let persistence_wait = if let Some(persistence_rx) = persistence_rx
1598                                {
1599                                    let (result, start_time, wait_duration) = persistence_rx
1600                                        .recv()
1601                                        .expect("persistence result channel closed");
1602                                    let _ = self.on_persistence_complete(result, start_time);
1603                                    Some(wait_duration)
1604                                } else {
1605                                    None
1606                                };
1607
1608                                debug!(
1609                                    target: "engine::tree",
1610                                    ?persistence_wait,
1611                                    execution_cache_wait = ?cache_wait.execution_cache,
1612                                    sparse_trie_wait = ?cache_wait.sparse_trie,
1613                                    "Persistence finished and caches updated for reth_newPayload"
1614                                );
1615
1616                                let start = Instant::now();
1617                                let gas_used = payload.gas_used();
1618                                let num_hash = payload.num_hash();
1619                                let mut output = self.on_new_payload(payload);
1620                                let latency = start.elapsed();
1621                                self.metrics.engine.new_payload.update_response_metrics(
1622                                    start,
1623                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1624                                    &output,
1625                                    gas_used,
1626                                );
1627
1628                                let maybe_event =
1629                                    output.as_mut().ok().and_then(|out| out.event.take());
1630
1631                                let timings = NewPayloadTimings {
1632                                    latency,
1633                                    persistence_wait,
1634                                    execution_cache_wait: cache_wait.execution_cache,
1635                                    sparse_trie_wait: cache_wait.sparse_trie,
1636                                };
1637                                if let Err(err) =
1638                                    tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1639                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1640                                    }))
1641                                {
1642                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1643                                    self.metrics
1644                                        .engine
1645                                        .failed_new_payload_response_deliveries
1646                                        .increment(1);
1647                                }
1648
1649                                self.on_maybe_tree_event(maybe_event)?;
1650                            }
1651                        }
1652                    }
1653                }
1654            }
1655            FromEngine::DownloadedBlocks(blocks) => {
1656                if let Some(event) = self.on_downloaded(blocks)? {
1657                    self.on_tree_event(event)?;
1658                }
1659            }
1660        }
1661        Ok(ops::ControlFlow::Continue(()))
1662    }
1663
1664    /// Invoked if the backfill sync has finished to target.
1665    ///
1666    /// At this point we consider the block synced to the backfill target.
1667    ///
1668    /// Checks the tracked finalized block against the block on disk and requests another backfill
1669    /// run if the distance to the tip exceeds the threshold for another backfill run.
1670    ///
1671    /// This will also do the necessary housekeeping of the tree state, this includes:
1672    ///  - removing all blocks below the backfill height
1673    ///  - resetting the canonical in-memory state
1674    ///
1675    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1676    /// target block.
1677    fn on_backfill_sync_finished(
1678        &mut self,
1679        ctrl: ControlFlow,
1680    ) -> Result<(), InsertBlockFatalError> {
1681        debug!(target: "engine::tree", "received backfill sync finished event");
1682        self.backfill_sync_state = BackfillSyncState::Idle;
1683
1684        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1685        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1686            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1687            // update the `invalid_headers` cache with the new invalid header
1688            self.state.invalid_headers.insert(**bad_block);
1689
1690            // if this was an unwind then the target is the new height
1691            Some(*target)
1692        } else {
1693            // backfill height is the block number that the backfill finished at
1694            ctrl.block_number()
1695        };
1696
1697        // backfill height is the block number that the backfill finished at
1698        let Some(backfill_height) = backfill_height else { return Ok(()) };
1699
1700        // state house keeping after backfill sync
1701        // remove all executed blocks below the backfill height
1702        //
1703        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1704        // before that
1705        let Some(backfill_num_hash) = self
1706            .provider
1707            .block_hash(backfill_height)?
1708            .map(|hash| BlockNumHash { hash, number: backfill_height })
1709        else {
1710            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1711            return Ok(())
1712        };
1713
1714        if ctrl.is_unwind() {
1715            // the node reset so we need to clear everything above that height so that backfill
1716            // height is the new canonical block.
1717            self.state.tree_state.reset(backfill_num_hash)
1718        } else {
1719            self.state.tree_state.remove_until(
1720                backfill_num_hash,
1721                self.persistence_state.last_persisted_block.hash,
1722                Some(backfill_num_hash),
1723            );
1724        }
1725
1726        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1727        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1728
1729        // remove all buffered blocks below the backfill height
1730        self.state.buffer.remove_old_blocks(backfill_height);
1731        // we remove all entries because now we're synced to the backfill target and consider this
1732        // the canonical chain
1733        self.canonical_in_memory_state.clear_state();
1734
1735        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1736            // update the tracked chain height, after backfill sync both the canonical height and
1737            // persisted height are the same
1738            self.state.tree_state.set_canonical_head(new_head.num_hash());
1739            self.persistence_state.finish(new_head.hash(), new_head.number());
1740
1741            // update the tracked canonical head
1742            self.canonical_in_memory_state.set_canonical_head(new_head);
1743        }
1744
1745        // check if we need to run backfill again by comparing the most recent finalized height to
1746        // the backfill height
1747        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1748        else {
1749            return Ok(())
1750        };
1751        if sync_target_state.finalized_block_hash.is_zero() {
1752            // no finalized block, can't check distance
1753            return Ok(())
1754        }
1755        // get the block number of the finalized block, if we have it
1756        let newest_finalized = self
1757            .state
1758            .buffer
1759            .block(&sync_target_state.finalized_block_hash)
1760            .map(|block| block.number());
1761
1762        // The block number that the backfill finished at - if the progress or newest
1763        // finalized is None then we can't check the distance anyways.
1764        //
1765        // If both are Some, we perform another distance check and return the desired
1766        // backfill target
1767        if let Some(backfill_target) =
1768            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1769                // Determines whether or not we should run backfill again, in case
1770                // the new gap is still large enough and requires running backfill again
1771                self.backfill_sync_target(progress, finalized_number, None)
1772            })
1773        {
1774            // request another backfill run
1775            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1776                backfill_target.into(),
1777            )));
1778            return Ok(())
1779        };
1780
1781        // Check if there are more blocks to sync between current head and FCU target
1782        if let Some(lowest_buffered) =
1783            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1784        {
1785            let current_head_num = self.state.tree_state.current_canonical_head.number;
1786            let target_head_num = lowest_buffered.number();
1787
1788            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1789            {
1790                // There are blocks between current head and FCU target, download them
1791                debug!(
1792                    target: "engine::tree",
1793                    %current_head_num,
1794                    %target_head_num,
1795                    %distance,
1796                    "Backfill complete, downloading remaining blocks to reach FCU target"
1797                );
1798
1799                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1800                    lowest_buffered.parent_hash(),
1801                    distance,
1802                )));
1803                return Ok(());
1804            }
1805        } else {
1806            // We don't have the head block or any of its ancestors buffered. Request
1807            // a download for the head block which will then trigger further sync.
1808            debug!(
1809                target: "engine::tree",
1810                head_hash = %sync_target_state.head_block_hash,
1811                "Backfill complete but head block not buffered, requesting download"
1812            );
1813            self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1814                sync_target_state.head_block_hash,
1815            )));
1816            return Ok(());
1817        }
1818
1819        // try to close the gap by executing buffered blocks that are child blocks of the new head
1820        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1821    }
1822
1823    /// Attempts to make the given target canonical.
1824    ///
1825    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1826    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1827        if let Some(chain_update) = self.on_new_head(target)? {
1828            self.on_canonical_chain_update(chain_update);
1829        }
1830
1831        Ok(())
1832    }
1833
1834    /// Convenience function to handle an optional tree event.
1835    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1836        if let Some(event) = event {
1837            self.on_tree_event(event)?;
1838        }
1839
1840        Ok(())
1841    }
1842
1843    /// Handles a tree event.
1844    ///
1845    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1846    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1847        match event {
1848            TreeEvent::TreeAction(action) => match action {
1849                TreeAction::MakeCanonical { sync_target_head } => {
1850                    self.make_canonical(sync_target_head)?;
1851                }
1852            },
1853            TreeEvent::BackfillAction(action) => {
1854                self.emit_event(EngineApiEvent::BackfillAction(action));
1855            }
1856            TreeEvent::Download(action) => {
1857                self.emit_event(EngineApiEvent::Download(action));
1858            }
1859        }
1860
1861        Ok(())
1862    }
1863
1864    /// Emits an outgoing event to the engine.
1865    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1866        let event = event.into();
1867
1868        if event.is_backfill_action() {
1869            debug_assert_eq!(
1870                self.backfill_sync_state,
1871                BackfillSyncState::Idle,
1872                "backfill action should only be emitted when backfill is idle"
1873            );
1874
1875            if self.persistence_state.in_progress() {
1876                // backfill sync and persisting data are mutually exclusive, so we can't start
1877                // backfill while we're still persisting
1878                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1879                return
1880            }
1881
1882            self.backfill_sync_state = BackfillSyncState::Pending;
1883            self.metrics.engine.pipeline_runs.increment(1);
1884            debug!(target: "engine::tree", "emitting backfill action event");
1885        }
1886
1887        let _ = self.outgoing.send(event).inspect_err(
1888            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1889        );
1890    }
1891
1892    /// Returns true if the canonical chain length minus the last persisted
1893    /// block is greater than or equal to the persistence threshold and
1894    /// backfill is not running.
1895    pub const fn should_persist(&self) -> bool {
1896        if !self.backfill_sync_state.is_idle() {
1897            // can't persist if backfill is running
1898            return false
1899        }
1900
1901        let min_block = self.persistence_state.last_persisted_block.number;
1902        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1903            self.config.persistence_threshold()
1904    }
1905
1906    /// Returns a batch of consecutive canonical blocks to persist in the range
1907    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
1908    fn get_canonical_blocks_to_persist(
1909        &self,
1910        target: PersistTarget,
1911    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1912        // We will calculate the state root using the database, so we need to be sure there are no
1913        // changes
1914        debug_assert!(!self.persistence_state.in_progress());
1915
1916        let mut blocks_to_persist = Vec::new();
1917        let mut current_hash = self.state.tree_state.canonical_block_hash();
1918        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1919        let canonical_head_number = self.state.tree_state.canonical_block_number();
1920
1921        let target_number = match target {
1922            PersistTarget::Head => canonical_head_number,
1923            PersistTarget::Threshold => {
1924                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1925            }
1926        };
1927
1928        debug!(
1929            target: "engine::tree",
1930            ?current_hash,
1931            ?last_persisted_number,
1932            ?canonical_head_number,
1933            ?target_number,
1934            "Returning canonical blocks to persist"
1935        );
1936        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1937            if block.recovered_block().number() <= last_persisted_number {
1938                break;
1939            }
1940
1941            if block.recovered_block().number() <= target_number {
1942                blocks_to_persist.push(block.clone());
1943            }
1944
1945            current_hash = block.recovered_block().parent_hash();
1946        }
1947
1948        // Reverse the order so that the oldest block comes first
1949        blocks_to_persist.reverse();
1950
1951        Ok(blocks_to_persist)
1952    }
1953
1954    /// This clears the blocks from the in-memory tree state that have been persisted to the
1955    /// database.
1956    ///
1957    /// This also updates the canonical in-memory state to reflect the newest persisted block
1958    /// height.
1959    ///
1960    /// Assumes that `finish` has been called on the `persistence_state` at least once
1961    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1962        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1963        // state.
1964        if let Some(remove_above) = self.find_disk_reorg()? {
1965            self.remove_blocks(remove_above);
1966            return Ok(())
1967        }
1968
1969        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1970        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1971        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1972            number: self.persistence_state.last_persisted_block.number,
1973            hash: self.persistence_state.last_persisted_block.hash,
1974        });
1975        Ok(())
1976    }
1977
1978    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1979    ///
1980    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
1981    /// or the database. If the required historical data (such as trie change sets) has been
1982    /// pruned for a given block, this operation will return an error. On archive nodes, it
1983    /// can retrieve any block.
1984    #[instrument(level = "debug", target = "engine::tree", skip(self))]
1985    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1986        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1987        // check memory first
1988        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1989            return Ok(Some(block.clone()))
1990        }
1991
1992        let (block, senders) = self
1993            .provider
1994            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1995            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1996            .split_sealed();
1997        let mut execution_output = self
1998            .provider
1999            .get_state(block.header().number())?
2000            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2001        let hashed_state = self.provider.hashed_post_state(execution_output.state());
2002
2003        debug!(
2004            target: "engine::tree",
2005            number = ?block.number(),
2006            "computing block trie updates",
2007        );
2008        let db_provider = self.provider.database_provider_ro()?;
2009        let trie_updates = reth_trie_db::compute_block_trie_updates(
2010            &self.changeset_cache,
2011            &db_provider,
2012            block.number(),
2013        )?;
2014
2015        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2016        let sorted_trie_updates = Arc::new(trie_updates);
2017        // Skip building trie input and anchor for DB-loaded blocks.
2018        let trie_data =
2019            ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2020
2021        let execution_output = Arc::new(BlockExecutionOutput {
2022            state: execution_output.bundle,
2023            result: BlockExecutionResult {
2024                receipts: execution_output.receipts.pop().unwrap_or_default(),
2025                requests: execution_output.requests.pop().unwrap_or_default(),
2026                gas_used: block.gas_used(),
2027                blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2028            },
2029        });
2030
2031        Ok(Some(ExecutedBlock::new(
2032            Arc::new(RecoveredBlock::new_sealed(block, senders)),
2033            execution_output,
2034            trie_data,
2035        )))
2036    }
2037
2038    /// Return sealed block header from in-memory state or database by hash.
2039    fn sealed_header_by_hash(
2040        &self,
2041        hash: B256,
2042    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2043        // check memory first
2044        let header = self.state.tree_state.sealed_header_by_hash(&hash);
2045
2046        if header.is_some() {
2047            Ok(header)
2048        } else {
2049            self.provider.sealed_header_by_hash(hash)
2050        }
2051    }
2052
2053    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
2054    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
2055    /// not exist in the buffer, this returns the hash that is passed in.
2056    ///
2057    /// Returns the parent hash of the block itself if the block is buffered and has no other
2058    /// buffered ancestors.
2059    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2060        self.state
2061            .buffer
2062            .lowest_ancestor(&hash)
2063            .map(|block| block.parent_hash())
2064            .unwrap_or_else(|| hash)
2065    }
2066
2067    /// If validation fails, the response MUST contain the latest valid hash:
2068    ///
2069    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
2070    ///     conditions:
2071    ///     - It is fully validated and deemed VALID
2072    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
2073    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
2074    ///     conditions are satisfied by a `PoW` block.
2075    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
2076    ///     the above conditions.
2077    fn latest_valid_hash_for_invalid_payload(
2078        &mut self,
2079        parent_hash: B256,
2080    ) -> ProviderResult<Option<B256>> {
2081        // Check if parent exists in side chain or in canonical chain.
2082        if self.sealed_header_by_hash(parent_hash)?.is_some() {
2083            return Ok(Some(parent_hash))
2084        }
2085
2086        // iterate over ancestors in the invalid cache
2087        // until we encounter the first valid ancestor
2088        let mut current_hash = parent_hash;
2089        let mut current_block = self.state.invalid_headers.get(&current_hash);
2090        while let Some(block_with_parent) = current_block {
2091            current_hash = block_with_parent.parent;
2092            current_block = self.state.invalid_headers.get(&current_hash);
2093
2094            // If current_header is None, then the current_hash does not have an invalid
2095            // ancestor in the cache, check its presence in blockchain tree
2096            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
2097                return Ok(Some(current_hash))
2098            }
2099        }
2100        Ok(None)
2101    }
2102
2103    /// Prepares the invalid payload response for the given hash, checking the
2104    /// database for the parent hash and populating the payload status with the latest valid hash
2105    /// according to the engine api spec.
2106    fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2107        let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2108            // Edge case: the `latestValid` field is the zero hash if the parent block is the
2109            // terminal PoW block, which we need to identify by looking at the parent's block
2110            // difficulty
2111            Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2112            Some(_) => Some(parent_hash),
2113            None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2114        };
2115
2116        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2117            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2118        })
2119        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2120    }
2121
2122    /// Returns true if the given hash is the last received sync target block.
2123    ///
2124    /// See [`ForkchoiceStateTracker::sync_target_state`]
2125    fn is_sync_target_head(&self, block_hash: B256) -> bool {
2126        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2127            return target.head_block_hash == block_hash
2128        }
2129        false
2130    }
2131
2132    /// Returns true if the given hash is part of the last received sync target fork choice update.
2133    ///
2134    /// See [`ForkchoiceStateTracker::sync_target_state`]
2135    fn is_any_sync_target(&self, block_hash: B256) -> bool {
2136        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2137            return target.contains(block_hash)
2138        }
2139        false
2140    }
2141
2142    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
2143    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
2144    ///
2145    /// Returns a payload status response according to the engine API spec if the block is known to
2146    /// be invalid.
2147    fn check_invalid_ancestor_with_head(
2148        &mut self,
2149        check: B256,
2150        head: &SealedBlock<N::Block>,
2151    ) -> ProviderResult<Option<PayloadStatus>> {
2152        // check if the check hash was previously marked as invalid
2153        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2154
2155        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2156    }
2157
2158    /// Invoked when a new payload received is invalid.
2159    fn on_invalid_new_payload(
2160        &mut self,
2161        head: SealedBlock<N::Block>,
2162        invalid: BlockWithParent,
2163    ) -> ProviderResult<PayloadStatus> {
2164        // populate the latest valid hash field
2165        let status = self.prepare_invalid_response(invalid.parent)?;
2166
2167        // insert the head block into the invalid header cache
2168        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2169        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2170
2171        Ok(status)
2172    }
2173
2174    /// Finds any invalid ancestor for the given payload.
2175    ///
2176    /// This function walks up the chain of buffered ancestors from the payload's block
2177    /// hash and checks if any ancestor is marked as invalid in the tree state.
2178    ///
2179    /// The check works by:
2180    /// 1. Finding the lowest buffered ancestor for the given block hash
2181    /// 2. If the ancestor is the same as the block hash itself, using the parent hash instead
2182    /// 3. Checking if this ancestor is in the `invalid_headers` map
2183    ///
2184    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
2185    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2186        let parent_hash = payload.parent_hash();
2187        let block_hash = payload.block_hash();
2188        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2189        if lowest_buffered_ancestor == block_hash {
2190            lowest_buffered_ancestor = parent_hash;
2191        }
2192
2193        // Check if the block has an invalid ancestor
2194        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2195    }
2196
2197    /// Handles a payload that has an invalid ancestor.
2198    ///
2199    /// This function validates the payload and processes it according to whether it's
2200    /// well-formed or malformed:
2201    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2202    ///    known-bad block, which violates consensus rules
2203    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2204    ///    validated due to its own structural issues
2205    fn handle_invalid_ancestor_payload(
2206        &mut self,
2207        payload: T::ExecutionData,
2208        invalid: BlockWithParent,
2209    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2210        let parent_hash = payload.parent_hash();
2211        let num_hash = payload.num_hash();
2212
2213        // Here we might have 2 cases
2214        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2215        //    remember it as invalid
2216        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2217        //    an error and forget it
2218        let block = match self.payload_validator.convert_payload_to_block(payload) {
2219            Ok(block) => block,
2220            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2221        };
2222
2223        Ok(self.on_invalid_new_payload(block, invalid)?)
2224    }
2225
2226    /// Checks if the given `head` points to an invalid header, which requires a specific response
2227    /// to a forkchoice update.
2228    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2229        // check if the head was previously marked as invalid
2230        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2231
2232        // Try to prepare invalid response, but handle errors gracefully
2233        match self.prepare_invalid_response(header.parent) {
2234            Ok(status) => Ok(Some(status)),
2235            Err(err) => {
2236                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2237                // Return a basic invalid status without latest valid hash
2238                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2239                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2240                })))
2241            }
2242        }
2243    }
2244
2245    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2246    /// and block body itself.
2247    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2248        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2249            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2250            return Err(e)
2251        }
2252
2253        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2254            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2255            return Err(e)
2256        }
2257
2258        Ok(())
2259    }
2260
2261    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2262    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2263    fn try_connect_buffered_blocks(
2264        &mut self,
2265        parent: BlockNumHash,
2266    ) -> Result<(), InsertBlockFatalError> {
2267        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2268
2269        if blocks.is_empty() {
2270            // nothing to append
2271            return Ok(())
2272        }
2273
2274        let now = Instant::now();
2275        let block_count = blocks.len();
2276        for child in blocks {
2277            let child_num_hash = child.num_hash();
2278            match self.insert_block(child) {
2279                Ok(res) => {
2280                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2281                    if self.is_any_sync_target(child_num_hash.hash) &&
2282                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2283                    {
2284                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2285                        // we just inserted a block that we know is part of the canonical chain, so
2286                        // we can make it canonical
2287                        self.make_canonical(child_num_hash.hash)?;
2288                    }
2289                }
2290                Err(err) => {
2291                    if let InsertPayloadError::Block(err) = err {
2292                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2293                        if let Err(fatal) = self.on_insert_block_error(err) {
2294                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2295                            return Err(fatal)
2296                        }
2297                    }
2298                }
2299            }
2300        }
2301
2302        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2303        Ok(())
2304    }
2305
2306    /// Pre-validates the block and inserts it into the buffer.
2307    fn buffer_block(
2308        &mut self,
2309        block: SealedBlock<N::Block>,
2310    ) -> Result<(), InsertBlockError<N::Block>> {
2311        if let Err(err) = self.validate_block(&block) {
2312            return Err(InsertBlockError::consensus_error(err, block))
2313        }
2314        self.state.buffer.insert_block(block);
2315        Ok(())
2316    }
2317
2318    /// Returns true if the distance from the local tip to the block is greater than the configured
2319    /// threshold.
2320    ///
2321    /// If the `local_tip` is greater than the `block`, then this will return false.
2322    #[inline]
2323    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2324        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2325    }
2326
2327    /// Returns how far the local tip is from the given block. If the local tip is at the same
2328    /// height or its block number is greater than the given block, this returns None.
2329    #[inline]
2330    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2331        if block > local_tip {
2332            Some(block - local_tip)
2333        } else {
2334            None
2335        }
2336    }
2337
2338    /// Returns the target hash to sync to if the distance from the local tip to the block is
2339    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2340    /// that block already).
2341    ///
2342    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2343    /// (missing) finalized block.
2344    fn backfill_sync_target(
2345        &self,
2346        canonical_tip_num: u64,
2347        target_block_number: u64,
2348        downloaded_block: Option<BlockNumHash>,
2349    ) -> Option<B256> {
2350        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2351
2352        // check if the downloaded block is the tracked finalized block
2353        let exceeds_backfill_threshold =
2354            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2355                // if we downloaded the finalized block we can now check how far we're off
2356                (Some(downloaded_block), Some(state))
2357                    if downloaded_block.hash == state.finalized_block_hash =>
2358                {
2359                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2360                }
2361                _ => match sync_target_state
2362                    .as_ref()
2363                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2364                {
2365                    Some(buffered_finalized) => {
2366                        // if we have buffered the finalized block, we should check how far we're
2367                        // off
2368                        self.exceeds_backfill_run_threshold(
2369                            canonical_tip_num,
2370                            buffered_finalized.number(),
2371                        )
2372                    }
2373                    None => {
2374                        // check if the distance exceeds the threshold for backfill sync
2375                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2376                    }
2377                },
2378            };
2379
2380        // if the number of missing blocks is greater than the max, trigger backfill
2381        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2382            // if we have already canonicalized the finalized block, we should skip backfill
2383            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2384                Err(err) => {
2385                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2386                }
2387                Ok(None) => {
2388                    // ensure the finalized block is known (not the zero hash)
2389                    if !state.finalized_block_hash.is_zero() {
2390                        // we don't have the block yet and the distance exceeds the allowed
2391                        // threshold
2392                        return Some(state.finalized_block_hash)
2393                    }
2394
2395                    // OPTIMISTIC SYNCING
2396                    //
2397                    // It can happen when the node is doing an
2398                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2399                    // but is expecting the EL to sync as high
2400                    // as possible before finalizing.
2401                    //
2402                    // This usually doesn't happen on ETH mainnet since CLs use the more
2403                    // secure checkpoint syncing.
2404                    //
2405                    // However, optimism chains will do this. The risk of a reorg is however
2406                    // low.
2407                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2408                    return Some(state.head_block_hash)
2409                }
2410                Ok(Some(_)) => {
2411                    // we're fully synced to the finalized block
2412                }
2413            }
2414        }
2415
2416        None
2417    }
2418
2419    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2420    /// happen if a reorg is happening while we are persisting a block.
2421    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2422        let mut canonical = self.state.tree_state.current_canonical_head;
2423        let mut persisted = self.persistence_state.last_persisted_block;
2424
2425        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2426            Ok(self
2427                .sealed_header_by_hash(num_hash.hash)?
2428                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2429                .parent_num_hash())
2430        };
2431
2432        // Happy path, canonical chain is ahead or equal to persisted chain.
2433        // Walk canonical chain back to make sure that it connects to persisted chain.
2434        while canonical.number > persisted.number {
2435            canonical = parent_num_hash(canonical)?;
2436        }
2437
2438        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2439        if canonical == persisted {
2440            return Ok(None);
2441        }
2442
2443        // At this point, we know that `persisted` block can't be reached by walking the canonical
2444        // chain back. In this case we need to truncate it to the first canonical block it connects
2445        // to.
2446
2447        // Firstly, walk back until we reach the same height as `canonical`.
2448        while persisted.number > canonical.number {
2449            persisted = parent_num_hash(persisted)?;
2450        }
2451
2452        debug_assert_eq!(persisted.number, canonical.number);
2453
2454        // Now walk both chains back until we find a common ancestor.
2455        while persisted.hash != canonical.hash {
2456            canonical = parent_num_hash(canonical)?;
2457            persisted = parent_num_hash(persisted)?;
2458        }
2459
2460        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2461
2462        Ok(Some(persisted.number))
2463    }
2464
2465    /// Invoked when we the canonical chain has been updated.
2466    ///
2467    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2468    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2469        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2470        let start = Instant::now();
2471
2472        // update the tracked canonical head
2473        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2474
2475        let tip = chain_update.tip().clone_sealed_header();
2476        let notification = chain_update.to_chain_notification();
2477
2478        // reinsert any missing reorged blocks
2479        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2480            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2481            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2482            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2483
2484            self.update_reorg_metrics(old.len(), old_first);
2485            self.reinsert_reorged_blocks(new.clone());
2486
2487            // When use_hashed_state is enabled, skip reinserting the old chain — the
2488            // bundle state references plain state reverts which don't exist.
2489            if !self.use_hashed_state {
2490                self.reinsert_reorged_blocks(old.clone());
2491            }
2492        }
2493
2494        // update the tracked in-memory state with the new chain
2495        self.canonical_in_memory_state.update_chain(chain_update);
2496        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2497
2498        // Update metrics based on new tip
2499        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2500
2501        // sends an event to all active listeners about the new canonical chain
2502        self.canonical_in_memory_state.notify_canon_state(notification);
2503
2504        // emit event
2505        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2506            Box::new(tip),
2507            start.elapsed(),
2508        ));
2509    }
2510
2511    /// This updates metrics based on the given reorg length and first reorged block number.
2512    fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2513        if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2514            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2515                first_reorged_block <= finalized.number
2516            {
2517                self.metrics.tree.reorgs.finalized.increment(1);
2518            } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2519                first_reorged_block <= safe.number
2520            {
2521                self.metrics.tree.reorgs.safe.increment(1);
2522            } else {
2523                self.metrics.tree.reorgs.head.increment(1);
2524            }
2525        } else {
2526            debug_unreachable!("Reorged chain doesn't have any blocks");
2527        }
2528        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2529    }
2530
2531    /// This reinserts any blocks in the new chain that do not already exist in the tree
2532    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2533        for block in new_chain {
2534            if self
2535                .state
2536                .tree_state
2537                .executed_block_by_hash(block.recovered_block().hash())
2538                .is_none()
2539            {
2540                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2541                self.state.tree_state.insert_executed(block);
2542            }
2543        }
2544    }
2545
2546    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2547    ///
2548    /// This mainly compares the missing parent of the downloaded block with the current canonical
2549    /// tip, and decides whether or not backfill sync should be triggered.
2550    fn on_disconnected_downloaded_block(
2551        &self,
2552        downloaded_block: BlockNumHash,
2553        missing_parent: BlockNumHash,
2554        head: BlockNumHash,
2555    ) -> Option<TreeEvent> {
2556        // compare the missing parent with the canonical tip
2557        if let Some(target) =
2558            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2559        {
2560            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2561            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2562        }
2563
2564        // continue downloading the missing parent
2565        //
2566        // this happens if either:
2567        //  * the missing parent block num < canonical tip num
2568        //    * this case represents a missing block on a fork that is shorter than the canonical
2569        //      chain
2570        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2571        //    less than the backfill threshold
2572        //    * this case represents a potentially long range of blocks to download and execute
2573        let request = if let Some(distance) =
2574            self.distance_from_local_tip(head.number, missing_parent.number)
2575        {
2576            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2577            DownloadRequest::BlockRange(missing_parent.hash, distance)
2578        } else {
2579            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2580            // This happens when the missing parent is on an outdated
2581            // sidechain and we can only download the missing block itself
2582            DownloadRequest::single_block(missing_parent.hash)
2583        };
2584
2585        Some(TreeEvent::Download(request))
2586    }
2587
2588    /// Handles a downloaded block that was successfully inserted as valid.
2589    ///
2590    /// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
2591    /// If it matches a non-head sync target (safe or finalized), makes it canonical inline
2592    /// and triggers a download for the remaining blocks towards the actual head.
2593    /// Otherwise, tries to connect buffered blocks.
2594    fn on_valid_downloaded_block(
2595        &mut self,
2596        block_num_hash: BlockNumHash,
2597    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2598        // check if we just inserted a block that's part of sync targets,
2599        // i.e. head, safe, or finalized
2600        if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2601            sync_target.contains(block_num_hash.hash)
2602        {
2603            debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2604
2605            if sync_target.head_block_hash == block_num_hash.hash {
2606                // we just inserted the sync target head block, make it canonical
2607                return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2608                    sync_target_head: block_num_hash.hash,
2609                })))
2610            }
2611
2612            // This block is part of the sync target (safe or finalized) but not the
2613            // head. Make it canonical and try to connect any buffered children, then
2614            // continue downloading towards the actual head if needed.
2615            self.make_canonical(block_num_hash.hash)?;
2616            self.try_connect_buffered_blocks(block_num_hash)?;
2617
2618            // Check if we've reached the sync target head after connecting buffered
2619            // blocks (e.g. the head block may have already been buffered).
2620            if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2621                let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2622                trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2623                return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2624            }
2625
2626            return Ok(None)
2627        }
2628        trace!(target: "engine::tree", "appended downloaded block");
2629        self.try_connect_buffered_blocks(block_num_hash)?;
2630        Ok(None)
2631    }
2632
2633    /// Invoked with a block downloaded from the network
2634    ///
2635    /// Returns an event with the appropriate action to take, such as:
2636    ///  - download more missing blocks
2637    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2638    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2639    fn on_downloaded_block(
2640        &mut self,
2641        block: SealedBlock<N::Block>,
2642    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2643        let block_num_hash = block.num_hash();
2644        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2645        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2646            return Ok(None)
2647        }
2648
2649        if !self.backfill_sync_state.is_idle() {
2650            return Ok(None)
2651        }
2652
2653        // try to append the block
2654        match self.insert_block(block) {
2655            Ok(
2656                InsertPayloadOk::Inserted(BlockStatus::Valid) |
2657                InsertPayloadOk::AlreadySeen(BlockStatus::Valid),
2658            ) => {
2659                return self.on_valid_downloaded_block(block_num_hash);
2660            }
2661            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2662                // block is not connected to the canonical head, we need to download
2663                // its missing branch first
2664                return Ok(self.on_disconnected_downloaded_block(
2665                    block_num_hash,
2666                    missing_ancestor,
2667                    head,
2668                ))
2669            }
2670            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2671                trace!(target: "engine::tree", "downloaded block already executed");
2672            }
2673            Err(err) => {
2674                if let InsertPayloadError::Block(err) = err {
2675                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2676                    if let Err(fatal) = self.on_insert_block_error(err) {
2677                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2678                        return Err(fatal)
2679                    }
2680                }
2681            }
2682        }
2683        Ok(None)
2684    }
2685
2686    /// Inserts a payload into the tree and executes it.
2687    ///
2688    /// This function validates the payload's basic structure, then executes it using the
2689    /// payload validator. The execution includes running all transactions in the payload
2690    /// and validating the resulting state transitions.
2691    ///
2692    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2693    /// or `InsertPayloadError` if validation or execution failed.
2694    fn insert_payload(
2695        &mut self,
2696        payload: T::ExecutionData,
2697    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2698        self.insert_block_or_payload(
2699            payload.block_with_parent(),
2700            payload,
2701            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2702            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2703        )
2704    }
2705
2706    fn insert_block(
2707        &mut self,
2708        block: SealedBlock<N::Block>,
2709    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2710        self.insert_block_or_payload(
2711            block.block_with_parent(),
2712            block,
2713            |validator, block, ctx| validator.validate_block(block, ctx),
2714            |_, block| Ok(block),
2715        )
2716    }
2717
2718    /// Inserts a block or payload into the blockchain tree with full execution.
2719    ///
2720    /// This is a generic function that handles both blocks and payloads by accepting
2721    /// a block identifier, input data, and execution/validation functions. It performs
2722    /// comprehensive checks and execution:
2723    ///
2724    /// - Validates that the block doesn't already exist in the tree
2725    /// - Ensures parent state is available, buffering if necessary
2726    /// - Executes the block/payload using the provided execute function
2727    /// - Handles both canonical and fork chain insertions
2728    /// - Updates pending block state when appropriate
2729    /// - Emits consensus engine events and records metrics
2730    ///
2731    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2732    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2733    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2734    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2735    fn insert_block_or_payload<Input, Err>(
2736        &mut self,
2737        block_id: BlockWithParent,
2738        input: Input,
2739        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2740        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2741    ) -> Result<InsertPayloadOk, Err>
2742    where
2743        Err: From<InsertBlockError<N::Block>>,
2744    {
2745        let block_insert_start = Instant::now();
2746        let block_num_hash = block_id.block;
2747        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2748
2749        // Check if block already exists - first in memory, then DB only if it could be persisted
2750        if self.state.tree_state.sealed_header_by_hash(&block_num_hash.hash).is_some() {
2751            convert_to_block(self, input)?;
2752            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2753        }
2754
2755        // Only query DB if block could be persisted (number <= last persisted block).
2756        // New blocks from CL always have number > last persisted, so skip DB lookup for them.
2757        if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2758            match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2759                Err(err) => {
2760                    let block = convert_to_block(self, input)?;
2761                    return Err(InsertBlockError::new(block, err.into()).into());
2762                }
2763                Ok(Some(_)) => {
2764                    convert_to_block(self, input)?;
2765                    return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2766                }
2767                Ok(None) => {}
2768            }
2769        }
2770
2771        // Ensure that the parent state is available.
2772        match self.state_provider_builder(block_id.parent) {
2773            Err(err) => {
2774                let block = convert_to_block(self, input)?;
2775                return Err(InsertBlockError::new(block, err.into()).into());
2776            }
2777            Ok(None) => {
2778                let block = convert_to_block(self, input)?;
2779
2780                // we don't have the state required to execute this block, buffering it and find the
2781                // missing parent block
2782                let missing_ancestor = self
2783                    .state
2784                    .buffer
2785                    .lowest_ancestor(&block.parent_hash())
2786                    .map(|block| block.parent_num_hash())
2787                    .unwrap_or_else(|| block.parent_num_hash());
2788
2789                self.state.buffer.insert_block(block);
2790
2791                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2792                    head: self.state.tree_state.current_canonical_head,
2793                    missing_ancestor,
2794                }))
2795            }
2796            Ok(Some(_)) => {}
2797        }
2798
2799        // determine whether we are on a fork chain by comparing the block number with the
2800        // canonical head. This is a simple check that is sufficient for the event emission below.
2801        // A block is considered a fork if its number is less than or equal to the canonical head,
2802        // as this indicates there's already a canonical block at that height.
2803        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2804
2805        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2806
2807        let start = Instant::now();
2808
2809        let executed = execute(&mut self.payload_validator, input, ctx)?;
2810
2811        // if the parent is the canonical head, we can insert the block as the pending block
2812        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2813        {
2814            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2815            self.canonical_in_memory_state.set_pending_block(executed.clone());
2816        }
2817
2818        self.state.tree_state.insert_executed(executed.clone());
2819        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2820
2821        // emit insert event
2822        let elapsed = start.elapsed();
2823        let engine_event = if is_fork {
2824            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2825        } else {
2826            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2827        };
2828        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2829
2830        self.metrics
2831            .engine
2832            .block_insert_total_duration
2833            .record(block_insert_start.elapsed().as_secs_f64());
2834        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2835        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2836    }
2837
2838    /// Handles an error that occurred while inserting a block.
2839    ///
2840    /// If this is a validation error this will mark the block as invalid.
2841    ///
2842    /// Returns the proper payload status response if the block is invalid.
2843    fn on_insert_block_error(
2844        &mut self,
2845        error: InsertBlockError<N::Block>,
2846    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2847        let (block, error) = error.split();
2848
2849        // if invalid block, we check the validation error. Otherwise return the fatal
2850        // error.
2851        let validation_err = error.ensure_validation_error()?;
2852
2853        // If the error was due to an invalid payload, the payload is added to the
2854        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2855        // returned.
2856        warn!(
2857            target: "engine::tree",
2858            invalid_hash=%block.hash(),
2859            invalid_number=block.number(),
2860            %validation_err,
2861            "Invalid block error on new payload",
2862        );
2863        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2864
2865        // keep track of the invalid header
2866        self.state.invalid_headers.insert(block.block_with_parent());
2867        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2868            Box::new(block),
2869        )));
2870
2871        Ok(PayloadStatus::new(
2872            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2873            latest_valid_hash,
2874        ))
2875    }
2876
2877    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2878    fn on_new_payload_error(
2879        &mut self,
2880        error: NewPayloadError,
2881        payload_num_hash: NumHash,
2882        parent_hash: B256,
2883    ) -> ProviderResult<PayloadStatus> {
2884        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2885        // we need to convert the error to a payload status (response to the CL)
2886
2887        let latest_valid_hash =
2888            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2889                // Engine-API rules:
2890                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2891                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2892                None
2893            } else {
2894                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2895            };
2896
2897        let status = PayloadStatusEnum::from(error);
2898        Ok(PayloadStatus::new(status, latest_valid_hash))
2899    }
2900
2901    /// Attempts to find the header for the given block hash if it is canonical.
2902    pub fn find_canonical_header(
2903        &self,
2904        hash: B256,
2905    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2906        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2907
2908        if canonical.is_none() {
2909            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2910        }
2911
2912        Ok(canonical)
2913    }
2914
2915    /// Updates the tracked finalized block if we have it.
2916    fn update_finalized_block(
2917        &self,
2918        finalized_block_hash: B256,
2919    ) -> Result<(), OnForkChoiceUpdated> {
2920        if finalized_block_hash.is_zero() {
2921            return Ok(())
2922        }
2923
2924        match self.find_canonical_header(finalized_block_hash) {
2925            Ok(None) => {
2926                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2927                // if the finalized block is not known, we can't update the finalized block
2928                return Err(OnForkChoiceUpdated::invalid_state())
2929            }
2930            Ok(Some(finalized)) => {
2931                if Some(finalized.num_hash()) !=
2932                    self.canonical_in_memory_state.get_finalized_num_hash()
2933                {
2934                    // we're also persisting the finalized block on disk so we can reload it on
2935                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2936                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2937                    self.canonical_in_memory_state.set_finalized(finalized.clone());
2938                    // Update finalized block height metric
2939                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2940                }
2941            }
2942            Err(err) => {
2943                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2944            }
2945        }
2946
2947        Ok(())
2948    }
2949
2950    /// Updates the tracked safe block if we have it
2951    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2952        if safe_block_hash.is_zero() {
2953            return Ok(())
2954        }
2955
2956        match self.find_canonical_header(safe_block_hash) {
2957            Ok(None) => {
2958                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2959                // if the safe block is not known, we can't update the safe block
2960                return Err(OnForkChoiceUpdated::invalid_state())
2961            }
2962            Ok(Some(safe)) => {
2963                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2964                    // we're also persisting the safe block on disk so we can reload it on
2965                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2966                    let _ = self.persistence.save_safe_block_number(safe.number());
2967                    self.canonical_in_memory_state.set_safe(safe.clone());
2968                    // Update safe block height metric
2969                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
2970                }
2971            }
2972            Err(err) => {
2973                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2974            }
2975        }
2976
2977        Ok(())
2978    }
2979
2980    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2981    /// made canonical.
2982    ///
2983    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2984    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2985    ///
2986    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2987    /// are consistent with the head block.
2988    fn ensure_consistent_forkchoice_state(
2989        &self,
2990        state: ForkchoiceState,
2991    ) -> Result<(), OnForkChoiceUpdated> {
2992        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2993        // after the head block is canonicalized.
2994        //
2995        // This ensures that the finalized block is consistent with the head block, i.e. the
2996        // finalized block is an ancestor of the head block.
2997        self.update_finalized_block(state.finalized_block_hash)?;
2998
2999        // Also ensure that the safe block, if not zero, is known and in the canonical chain
3000        // after the head block is canonicalized.
3001        //
3002        // This ensures that the safe block is consistent with the head block, i.e. the safe
3003        // block is an ancestor of the head block.
3004        self.update_safe_block(state.safe_block_hash)
3005    }
3006
3007    /// Validates the payload attributes with respect to the header and fork choice state.
3008    ///
3009    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
3010    /// return an error if the payload attributes are invalid.
3011    fn process_payload_attributes(
3012        &self,
3013        attrs: T::PayloadAttributes,
3014        head: &N::BlockHeader,
3015        state: ForkchoiceState,
3016        version: EngineApiMessageVersion,
3017    ) -> OnForkChoiceUpdated {
3018        if let Err(err) =
3019            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
3020        {
3021            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3022            return OnForkChoiceUpdated::invalid_payload_attributes()
3023        }
3024
3025        // 8. Client software MUST begin a payload build process building on top of
3026        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
3027        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
3028        //    The build process is specified in the Payload building section.
3029        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
3030            state.head_block_hash,
3031            attrs,
3032            version as u8,
3033        ) {
3034            Ok(attributes) => {
3035                // send the payload to the builder and return the receiver for the pending payload
3036                // id, initiating payload job is handled asynchronously
3037                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
3038
3039                // Client software MUST respond to this method call in the following way:
3040                // {
3041                //      payloadStatus: {
3042                //          status: VALID,
3043                //          latestValidHash: forkchoiceState.headBlockHash,
3044                //          validationError: null
3045                //      },
3046                //      payloadId: buildProcessId
3047                // }
3048                //
3049                // if the payload is deemed VALID and the build process has begun.
3050                OnForkChoiceUpdated::updated_with_pending_payload_id(
3051                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3052                    pending_payload_id,
3053                )
3054            }
3055            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
3056        }
3057    }
3058
3059    /// Remove all blocks up to __and including__ the given block number.
3060    ///
3061    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
3062    /// those which have a fork point at or below the finalized hash.
3063    ///
3064    /// Canonical blocks below the upper bound will still be removed.
3065    pub(crate) fn remove_before(
3066        &mut self,
3067        upper_bound: BlockNumHash,
3068        finalized_hash: Option<B256>,
3069    ) -> ProviderResult<()> {
3070        // first fetch the finalized block number and then call the remove_before method on
3071        // tree_state
3072        let num = if let Some(hash) = finalized_hash {
3073            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3074        } else {
3075            None
3076        };
3077
3078        self.state.tree_state.remove_until(
3079            upper_bound,
3080            self.persistence_state.last_persisted_block.hash,
3081            num,
3082        );
3083        Ok(())
3084    }
3085
3086    /// Returns a builder for creating state providers for the given hash.
3087    ///
3088    /// This is an optimization for parallel execution contexts where we want to avoid
3089    /// creating state providers in the critical path.
3090    pub fn state_provider_builder(
3091        &self,
3092        hash: B256,
3093    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3094    where
3095        P: BlockReader + StateProviderFactory + StateReader + Clone,
3096    {
3097        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3098            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3099            // the block leads back to the canonical chain
3100            return Ok(Some(StateProviderBuilder::new(
3101                self.provider.clone(),
3102                historical,
3103                Some(blocks),
3104            )))
3105        }
3106
3107        // Check if the block is persisted
3108        if let Some(header) = self.provider.header(hash)? {
3109            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3110            // For persisted blocks, we create a builder that will fetch state directly from the
3111            // database
3112            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3113        }
3114
3115        debug!(target: "engine::tree", %hash, "no canonical state found for block");
3116        Ok(None)
3117    }
3118}
3119
3120/// Events received in the main engine loop.
3121#[derive(Debug)]
3122enum LoopEvent<T, N>
3123where
3124    N: NodePrimitives,
3125    T: PayloadTypes,
3126{
3127    /// An engine API message was received.
3128    EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3129    /// A persistence task completed.
3130    PersistenceComplete {
3131        /// The result of the persistence operation.
3132        result: Option<BlockNumHash>,
3133        /// When the persistence operation started.
3134        start_time: Instant,
3135    },
3136    /// A channel was disconnected.
3137    Disconnected,
3138}
3139
3140/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3141/// variant.
3142///
3143/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3144/// is valid or not.
3145#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3146pub enum BlockStatus {
3147    /// The block is valid and block extends canonical chain.
3148    Valid,
3149    /// The block may be valid and has an unknown missing ancestor.
3150    Disconnected {
3151        /// Current canonical head.
3152        head: BlockNumHash,
3153        /// The lowest ancestor block that is not connected to the canonical chain.
3154        missing_ancestor: BlockNumHash,
3155    },
3156}
3157
3158/// How a payload was inserted if it was valid.
3159///
3160/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3161/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3162#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3163pub enum InsertPayloadOk {
3164    /// The payload was valid, but we have already seen it.
3165    AlreadySeen(BlockStatus),
3166    /// The payload was valid and inserted into the tree.
3167    Inserted(BlockStatus),
3168}
3169
3170/// Target for block persistence.
3171#[derive(Debug, Clone, Copy)]
3172enum PersistTarget {
3173    /// Persist up to `canonical_head - memory_block_buffer_target`.
3174    Threshold,
3175    /// Persist all blocks up to and including the canonical head.
3176    Head,
3177}
3178
3179/// Result of waiting for caches to become available.
3180#[derive(Debug, Clone, Copy, Default)]
3181pub struct CacheWaitDurations {
3182    /// Time spent waiting for the execution cache lock.
3183    pub execution_cache: Duration,
3184    /// Time spent waiting for the sparse trie lock.
3185    pub sparse_trie: Duration,
3186}
3187
3188/// Trait for types that can wait for caches to become available.
3189///
3190/// This is used by `reth_newPayload` endpoint to ensure that payload processing
3191/// waits for any ongoing operations to complete before starting.
3192pub trait WaitForCaches {
3193    /// Waits for cache updates to complete.
3194    ///
3195    /// Returns the time spent waiting for each cache separately.
3196    fn wait_for_caches(&self) -> CacheWaitDurations;
3197}