Skip to main content

reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17    MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29    FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32    BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33    DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34    StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35    StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48    oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64
65use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
66pub use block_buffer::BlockBuffer;
67pub use invalid_headers::InvalidHeaderCache;
68pub use metrics::EngineApiMetrics;
69pub use payload_processor::*;
70pub use payload_validator::{BasicEngineValidator, EngineValidator};
71pub use persistence_state::PersistenceState;
72pub use reth_engine_primitives::TreeConfig;
73pub use reth_execution_cache::{
74    CachedStateMetrics, CachedStateProvider, ExecutionCache, PayloadExecutionCache, SavedCache,
75};
76
77pub mod state;
78
79/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
80///
81/// This is the default threshold, and represents the distance (gap) from the local head to a
82/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
83/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
84///
85/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
86/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
87/// backfill this gap.
88pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
89
90/// The minimum number of blocks to retain in the changeset cache after eviction.
91///
92/// This ensures that recent trie changesets are kept in memory for potential reorgs,
93/// even when the finalized block is not set (e.g., on L2s like Optimism).
94const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
95
96/// A builder for creating state providers that can be used across threads.
97#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99    /// The provider factory used to create providers.
100    provider_factory: P,
101    /// The historical block hash to fetch state from.
102    historical: B256,
103    /// The blocks that form the chain from historical to target and are in memory.
104    overlay: Option<Vec<ExecutedBlock<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108    /// Creates a new state provider from the provider factory, historical block hash and optional
109    /// overlaid blocks.
110    pub const fn new(
111        provider_factory: P,
112        historical: B256,
113        overlay: Option<Vec<ExecutedBlock<N>>>,
114    ) -> Self {
115        Self { provider_factory, historical, overlay }
116    }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121    P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123    /// Creates a new state provider from this builder.
124    pub fn build(&self) -> ProviderResult<StateProviderBox> {
125        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126        if let Some(overlay) = self.overlay.clone() {
127            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128        }
129        Ok(provider)
130    }
131}
132
133/// Tracks the state of the engine api internals.
134///
135/// This type is not shareable.
136#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138    /// Tracks the state of the blockchain tree.
139    tree_state: TreeState<N>,
140    /// Tracks the forkchoice state updates received by the CL.
141    forkchoice_state_tracker: ForkchoiceStateTracker,
142    /// Buffer of detached blocks.
143    buffer: BlockBuffer<N::Block>,
144    /// Tracks the header of invalid payloads that were rejected by the engine because they're
145    /// invalid.
146    invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150    fn new(
151        block_buffer_limit: u32,
152        max_invalid_header_cache_length: u32,
153        canonical_block: BlockNumHash,
154        engine_kind: EngineApiKind,
155    ) -> Self {
156        Self {
157            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158            buffer: BlockBuffer::new(block_buffer_limit),
159            tree_state: TreeState::new(canonical_block, engine_kind),
160            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161        }
162    }
163
164    /// Returns a reference to the tree state.
165    pub const fn tree_state(&self) -> &TreeState<N> {
166        &self.tree_state
167    }
168
169    /// Returns true if the block has been marked as invalid.
170    pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
171        self.invalid_headers.get(hash).is_some()
172    }
173}
174
175/// The outcome of a tree operation.
176#[derive(Debug)]
177pub struct TreeOutcome<T> {
178    /// The outcome of the operation.
179    pub outcome: T,
180    /// An optional event to tell the caller to do something.
181    pub event: Option<TreeEvent>,
182    /// Whether the block was already seen, meaning no real execution happened during this
183    /// `newPayload` call.
184    pub already_seen: bool,
185}
186
187impl<T> TreeOutcome<T> {
188    /// Create new tree outcome.
189    pub const fn new(outcome: T) -> Self {
190        Self { outcome, event: None, already_seen: false }
191    }
192
193    /// Set event on the outcome.
194    pub fn with_event(mut self, event: TreeEvent) -> Self {
195        self.event = Some(event);
196        self
197    }
198
199    /// Set the `already_seen` flag on the outcome.
200    pub const fn with_already_seen(mut self, value: bool) -> Self {
201        self.already_seen = value;
202        self
203    }
204}
205
206/// Result of trying to insert a new payload in [`EngineApiTreeHandler`].
207#[derive(Debug)]
208pub struct TryInsertPayloadResult {
209    /// - `Valid`: Payload successfully validated and inserted
210    /// - `Syncing`: Parent missing, payload buffered for later
211    /// - Error status: Payload is invalid
212    pub status: PayloadStatus,
213    /// Whether the block was already seen
214    pub already_seen: bool,
215}
216
217impl TryInsertPayloadResult {
218    /// Convert the result into a [`TreeOutcome`].
219    #[inline]
220    pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
221        TreeOutcome::new(self.status).with_already_seen(self.already_seen)
222    }
223}
224
225/// Events that are triggered by Tree Chain
226#[derive(Debug)]
227pub enum TreeEvent {
228    /// Tree action is needed.
229    TreeAction(TreeAction),
230    /// Backfill action is needed.
231    BackfillAction(BackfillAction),
232    /// Block download is needed.
233    Download(DownloadRequest),
234}
235
236impl TreeEvent {
237    /// Returns true if the event is a backfill action.
238    const fn is_backfill_action(&self) -> bool {
239        matches!(self, Self::BackfillAction(_))
240    }
241}
242
243/// The actions that can be performed on the tree.
244#[derive(Debug)]
245pub enum TreeAction {
246    /// Make target canonical.
247    MakeCanonical {
248        /// The sync target head hash
249        sync_target_head: B256,
250    },
251}
252
253/// The engine API tree handler implementation.
254///
255/// This type is responsible for processing engine API requests, maintaining the canonical state and
256/// emitting events.
257pub struct EngineApiTreeHandler<N, P, T, V, C>
258where
259    N: NodePrimitives,
260    T: PayloadTypes,
261    C: ConfigureEvm<Primitives = N> + 'static,
262{
263    provider: P,
264    consensus: Arc<dyn FullConsensus<N>>,
265    payload_validator: V,
266    /// Keeps track of internals such as executed and buffered blocks.
267    state: EngineApiTreeState<N>,
268    /// The half for sending messages to the engine.
269    ///
270    /// This is kept so that we can queue in messages to ourself that we can process later, for
271    /// example distributing workload across multiple messages that would otherwise take too long
272    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
273    /// them one by one so that we can handle incoming engine API in between and don't become
274    /// unresponsive. This can happen during live sync transition where we're trying to close the
275    /// gap (up to 3 epochs of blocks in the worst case).
276    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
277    /// Incoming engine API requests.
278    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
279    /// Outgoing events that are emitted to the handler.
280    outgoing: UnboundedSender<EngineApiEvent<N>>,
281    /// Channels to the persistence layer.
282    persistence: PersistenceHandle<N>,
283    /// Tracks the state changes of the persistence task.
284    persistence_state: PersistenceState,
285    /// Flag indicating the state of the node's backfill synchronization process.
286    backfill_sync_state: BackfillSyncState,
287    /// Keeps track of the state of the canonical chain that isn't persisted yet.
288    /// This is intended to be accessed from external sources, such as rpc.
289    canonical_in_memory_state: CanonicalInMemoryState<N>,
290    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
291    /// updates
292    payload_builder: PayloadBuilderHandle<T>,
293    /// Configuration settings.
294    config: TreeConfig,
295    /// Metrics for the engine api.
296    metrics: EngineApiMetrics,
297    /// The engine API variant of this handler
298    engine_kind: EngineApiKind,
299    /// The EVM configuration.
300    evm_config: C,
301    /// Changeset cache for in-memory trie changesets
302    changeset_cache: ChangesetCache,
303    /// Timing statistics for executed blocks, keyed by block hash.
304    /// Stored here (not in `ExecutedBlock`) to avoid leaking observability concerns into the block
305    /// type. Entries are removed when blocks are persisted or invalidated.
306    execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
307    /// Task runtime for spawning blocking work on named, reusable threads.
308    runtime: reth_tasks::Runtime,
309}
310
311impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
312    for EngineApiTreeHandler<N, P, T, V, C>
313where
314    N: NodePrimitives,
315    C: Debug + ConfigureEvm<Primitives = N>,
316{
317    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318        f.debug_struct("EngineApiTreeHandler")
319            .field("provider", &self.provider)
320            .field("consensus", &self.consensus)
321            .field("payload_validator", &self.payload_validator)
322            .field("state", &self.state)
323            .field("incoming_tx", &self.incoming_tx)
324            .field("persistence", &self.persistence)
325            .field("persistence_state", &self.persistence_state)
326            .field("backfill_sync_state", &self.backfill_sync_state)
327            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
328            .field("payload_builder", &self.payload_builder)
329            .field("config", &self.config)
330            .field("metrics", &self.metrics)
331            .field("engine_kind", &self.engine_kind)
332            .field("evm_config", &self.evm_config)
333            .field("changeset_cache", &self.changeset_cache)
334            .field("execution_timing_stats", &self.execution_timing_stats.len())
335            .field("runtime", &self.runtime)
336            .finish()
337    }
338}
339
340impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
341where
342    N: NodePrimitives,
343    P: DatabaseProviderFactory
344        + BlockReader<Block = N::Block, Header = N::BlockHeader>
345        + StateProviderFactory
346        + StateReader<Receipt = N::Receipt>
347        + HashedPostStateProvider
348        + Clone
349        + 'static,
350    P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
351        + StageCheckpointReader
352        + ChangeSetReader
353        + StorageChangeSetReader
354        + StorageSettingsCache,
355    C: ConfigureEvm<Primitives = N> + 'static,
356    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
357    V: EngineValidator<T> + WaitForCaches,
358{
359    /// Creates a new [`EngineApiTreeHandler`].
360    #[expect(clippy::too_many_arguments)]
361    pub fn new(
362        provider: P,
363        consensus: Arc<dyn FullConsensus<N>>,
364        payload_validator: V,
365        outgoing: UnboundedSender<EngineApiEvent<N>>,
366        state: EngineApiTreeState<N>,
367        canonical_in_memory_state: CanonicalInMemoryState<N>,
368        persistence: PersistenceHandle<N>,
369        persistence_state: PersistenceState,
370        payload_builder: PayloadBuilderHandle<T>,
371        config: TreeConfig,
372        engine_kind: EngineApiKind,
373        evm_config: C,
374        changeset_cache: ChangesetCache,
375        runtime: reth_tasks::Runtime,
376    ) -> Self {
377        let (incoming_tx, incoming) = crossbeam_channel::unbounded();
378
379        Self {
380            provider,
381            consensus,
382            payload_validator,
383            incoming,
384            outgoing,
385            persistence,
386            persistence_state,
387            backfill_sync_state: BackfillSyncState::Idle,
388            state,
389            canonical_in_memory_state,
390            payload_builder,
391            config,
392            metrics: Default::default(),
393            incoming_tx,
394            engine_kind,
395            evm_config,
396            changeset_cache,
397            execution_timing_stats: HashMap::new(),
398            runtime,
399        }
400    }
401
402    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
403    /// own thread.
404    ///
405    /// Returns the sender through which incoming requests can be sent to the task and the receiver
406    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
407    #[expect(clippy::complexity)]
408    pub fn spawn_new(
409        provider: P,
410        consensus: Arc<dyn FullConsensus<N>>,
411        payload_validator: V,
412        persistence: PersistenceHandle<N>,
413        payload_builder: PayloadBuilderHandle<T>,
414        canonical_in_memory_state: CanonicalInMemoryState<N>,
415        config: TreeConfig,
416        kind: EngineApiKind,
417        evm_config: C,
418        changeset_cache: ChangesetCache,
419        runtime: reth_tasks::Runtime,
420    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
421    {
422        let best_block_number = provider.best_block_number().unwrap_or(0);
423        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
424
425        let persistence_state = PersistenceState {
426            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
427            rx: None,
428        };
429
430        let (tx, outgoing) = unbounded_channel();
431        let state = EngineApiTreeState::new(
432            config.block_buffer_limit(),
433            config.max_invalid_header_cache_length(),
434            header.num_hash(),
435            kind,
436        );
437
438        let task = Self::new(
439            provider,
440            consensus,
441            payload_validator,
442            tx,
443            state,
444            canonical_in_memory_state,
445            persistence,
446            persistence_state,
447            payload_builder,
448            config,
449            kind,
450            evm_config,
451            changeset_cache,
452            runtime,
453        );
454        let incoming = task.incoming_tx.clone();
455        spawn_os_thread("engine", || {
456            increase_thread_priority();
457            task.run()
458        });
459        (incoming, outgoing)
460    }
461
462    /// Returns a [`TreeOutcome`] indicating the forkchoice head is valid and canonical.
463    fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
464        TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
465            PayloadStatusEnum::Valid,
466            Some(state.head_block_hash),
467        )))
468    }
469
470    /// Returns a new [`Sender`] to send messages to this type.
471    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
472        self.incoming_tx.clone()
473    }
474
475    /// Run the engine API handler.
476    ///
477    /// This will block the current thread and process incoming messages.
478    pub fn run(mut self) {
479        loop {
480            match self.wait_for_event() {
481                LoopEvent::EngineMessage(msg) => {
482                    debug!(target: "engine::tree", %msg, "received new engine message");
483                    match self.on_engine_message(msg) {
484                        Ok(ops::ControlFlow::Break(())) => return,
485                        Ok(ops::ControlFlow::Continue(())) => {}
486                        Err(fatal) => {
487                            error!(target: "engine::tree", %fatal, "insert block fatal error");
488                            return
489                        }
490                    }
491                }
492                LoopEvent::PersistenceComplete { result, start_time } => {
493                    if let Err(err) = self.on_persistence_complete(result, start_time) {
494                        error!(target: "engine::tree", %err, "Persistence complete handling failed");
495                        return
496                    }
497                }
498                LoopEvent::Disconnected => {
499                    error!(target: "engine::tree", "Channel disconnected");
500                    return
501                }
502            }
503
504            // Always check if we need to trigger new persistence after any event:
505            // - After engine messages: new blocks may have been inserted that exceed the
506            //   persistence threshold
507            // - After persistence completion: we can now persist more blocks if needed
508            if let Err(err) = self.advance_persistence() {
509                error!(target: "engine::tree", %err, "Advancing persistence failed");
510                return
511            }
512        }
513    }
514
515    /// Blocks until the next event is ready: either an incoming engine message or a persistence
516    /// completion (if one is in progress).
517    ///
518    /// Uses biased selection to prioritize persistence completion to update in-memory state and
519    /// unblock further writes.
520    fn wait_for_event(&mut self) -> LoopEvent<T, N> {
521        // Take ownership of persistence rx if present
522        let maybe_persistence = self.persistence_state.rx.take();
523
524        if let Some((persistence_rx, start_time, action)) = maybe_persistence {
525            // Biased select prioritizes persistence completion to update in memory state and
526            // unblock further writes
527            crossbeam_channel::select_biased! {
528                recv(persistence_rx) -> result => {
529                    // Don't put it back - consumed (oneshot-like behavior)
530                    match result {
531                        Ok(result) => LoopEvent::PersistenceComplete {
532                            result,
533                            start_time,
534                        },
535                        Err(_) => LoopEvent::Disconnected,
536                    }
537                },
538                recv(self.incoming) -> msg => {
539                    // Put the persistence rx back - we didn't consume it
540                    self.persistence_state.rx = Some((persistence_rx, start_time, action));
541                    match msg {
542                        Ok(m) => LoopEvent::EngineMessage(m),
543                        Err(_) => LoopEvent::Disconnected,
544                    }
545                },
546            }
547        } else {
548            // No persistence in progress - just wait on incoming
549            match self.incoming.recv() {
550                Ok(m) => LoopEvent::EngineMessage(m),
551                Err(_) => LoopEvent::Disconnected,
552            }
553        }
554    }
555
556    /// Invoked when previously requested blocks were downloaded.
557    ///
558    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
559    /// will execute the first batch and send the remaining blocks back through the channel so that
560    /// block request processing isn't blocked for a long time.
561    fn on_downloaded(
562        &mut self,
563        mut blocks: Vec<SealedBlock<N::Block>>,
564    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
565        if blocks.is_empty() {
566            // nothing to execute
567            return Ok(None)
568        }
569
570        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
571        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
572        for block in blocks.drain(..batch) {
573            if let Some(event) = self.on_downloaded_block(block)? {
574                let needs_backfill = event.is_backfill_action();
575                self.on_tree_event(event)?;
576                if needs_backfill {
577                    // can exit early if backfill is needed
578                    return Ok(None)
579                }
580            }
581        }
582
583        // if we still have blocks to execute, send them as a followup request
584        if !blocks.is_empty() {
585            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
586        }
587
588        Ok(None)
589    }
590
591    /// When the Consensus layer receives a new block via the consensus gossip protocol,
592    /// the transactions in the block are sent to the execution layer in the form of a
593    /// [`PayloadTypes::ExecutionData`], for example
594    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
595    /// Execution layer executes the transactions and validates the state in the block header,
596    /// then passes validation data back to Consensus layer, that adds the block to the head of
597    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
598    /// network in the form of a "Beacon block".
599    ///
600    /// These responses should adhere to the [Engine API Spec for
601    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
602    ///
603    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
604    /// returns an error if an internal error occurred.
605    #[instrument(
606        level = "debug",
607        target = "engine::tree",
608        skip_all,
609        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
610    )]
611    fn on_new_payload(
612        &mut self,
613        payload: T::ExecutionData,
614    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
615        trace!(target: "engine::tree", "invoked new payload");
616
617        // start timing for the new payload process
618        let start = Instant::now();
619
620        // Ensures that the given payload does not violate any consensus rules that concern the
621        // block's layout, like:
622        //    - missing or invalid base fee
623        //    - invalid extra data
624        //    - invalid transactions
625        //    - incorrect hash
626        //    - the versioned hashes passed with the payload do not exactly match transaction
627        //      versioned hashes
628        //    - the block does not contain blob transactions if it is pre-cancun
629        //
630        // This validates the following engine API rule:
631        //
632        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
633        //    validation by taking the following steps:
634        //
635        //   1. Obtain the actual array by concatenating blob versioned hashes lists
636        //      (`tx.blob_versioned_hashes`) of each [blob
637        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
638        //      in the payload, respecting the order of inclusion. If the payload has no blob
639        //      transactions the expected array **MUST** be `[]`.
640        //
641        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
642        //      null}` if the expected and the actual arrays don't match.
643        //
644        // This validation **MUST** be instantly run in all cases even during active sync process.
645
646        let num_hash = payload.num_hash();
647        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
648        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
649
650        let block_hash = num_hash.hash;
651
652        // Check for invalid ancestors
653        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
654            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
655            return Ok(TreeOutcome::new(status));
656        }
657
658        // record pre-execution phase duration
659        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
660
661        let mut outcome = if self.backfill_sync_state.is_idle() {
662            self.try_insert_payload(payload)?.into_outcome()
663        } else {
664            TreeOutcome::new(self.try_buffer_payload(payload)?)
665        };
666
667        // if the block is valid and it is the current sync target head, make it canonical
668        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
669            // Only create the canonical event if this block isn't already the canonical head
670            if self.state.tree_state.canonical_block_hash() != block_hash {
671                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
672                    sync_target_head: block_hash,
673                }));
674            }
675        }
676
677        // record total newPayload duration
678        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
679
680        Ok(outcome)
681    }
682
683    /// Processes a payload during normal sync operation.
684    #[instrument(level = "debug", target = "engine::tree", skip_all)]
685    fn try_insert_payload(
686        &mut self,
687        payload: T::ExecutionData,
688    ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
689        let block_hash = payload.block_hash();
690        let num_hash = payload.num_hash();
691        let parent_hash = payload.parent_hash();
692        let mut latest_valid_hash = None;
693
694        match self.insert_payload(payload) {
695            Ok(status) => {
696                let (status, already_seen) = match status {
697                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
698                        latest_valid_hash = Some(block_hash);
699                        self.try_connect_buffered_blocks(num_hash)?;
700                        (PayloadStatusEnum::Valid, false)
701                    }
702                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
703                        latest_valid_hash = Some(block_hash);
704                        (PayloadStatusEnum::Valid, true)
705                    }
706                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
707                        (PayloadStatusEnum::Syncing, false)
708                    }
709                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
710                        // not known to be invalid, but we don't know anything else
711                        (PayloadStatusEnum::Syncing, true)
712                    }
713                };
714
715                Ok(TryInsertPayloadResult {
716                    status: PayloadStatus::new(status, latest_valid_hash),
717                    already_seen,
718                })
719            }
720            Err(error) => {
721                let status = match error {
722                    InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
723                    InsertPayloadError::Payload(error) => {
724                        self.on_new_payload_error(error, num_hash, parent_hash)?
725                    }
726                };
727
728                Ok(TryInsertPayloadResult { status, already_seen: false })
729            }
730        }
731    }
732
733    /// Stores a payload for later processing during backfill sync.
734    ///
735    /// During backfill, the node lacks the state needed to validate payloads,
736    /// so they are buffered (stored in memory) until their parent blocks are synced.
737    ///
738    /// Returns:
739    /// - `Syncing`: Payload successfully buffered
740    /// - Error status: Payload is malformed or invalid
741    fn try_buffer_payload(
742        &mut self,
743        payload: T::ExecutionData,
744    ) -> Result<PayloadStatus, InsertBlockFatalError> {
745        let parent_hash = payload.parent_hash();
746        let num_hash = payload.num_hash();
747
748        match self.payload_validator.convert_payload_to_block(payload) {
749            // if the block is well-formed, buffer it for later
750            Ok(block) => {
751                if let Err(error) = self.buffer_block(block) {
752                    Ok(self.on_insert_block_error(error)?)
753                } else {
754                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
755                }
756            }
757            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
758        }
759    }
760
761    /// Returns the new chain for the given head.
762    ///
763    /// This also handles reorgs.
764    ///
765    /// Note: This does not update the tracked state and instead returns the new chain based on the
766    /// given head.
767    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
768        // get the executed new head block
769        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
770            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
771            self.metrics.engine.executed_new_block_cache_miss.increment(1);
772            return Ok(None)
773        };
774
775        let new_head_number = new_head_block.recovered_block().number();
776        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
777
778        let mut new_chain = vec![new_head_block.clone()];
779        let mut current_hash = new_head_block.recovered_block().parent_hash();
780        let mut current_number = new_head_number - 1;
781
782        // Walk back the new chain until we reach a block we know about
783        //
784        // This is only done for in-memory blocks, because we should not have persisted any blocks
785        // that are _above_ the current canonical head.
786        while current_number > current_canonical_number {
787            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
788            {
789                current_hash = block.recovered_block().parent_hash();
790                current_number -= 1;
791                new_chain.push(block);
792            } else {
793                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
794                // This should never happen as we're walking back a chain that should connect to
795                // the canonical chain
796                return Ok(None)
797            }
798        }
799
800        // If we have reached the current canonical head by walking back from the target, then we
801        // know this represents an extension of the canonical chain.
802        if current_hash == self.state.tree_state.current_canonical_head.hash {
803            new_chain.reverse();
804
805            // Simple extension of the current chain
806            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
807        }
808
809        // We have a reorg. Walk back both chains to find the fork point.
810        let mut old_chain = Vec::new();
811        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
812
813        // If the canonical chain is ahead of the new chain,
814        // gather all blocks until new head number.
815        while current_canonical_number > current_number {
816            let block = self.canonical_block_by_hash(old_hash)?;
817            old_hash = block.recovered_block().parent_hash();
818            old_chain.push(block);
819            current_canonical_number -= 1;
820        }
821
822        // Both new and old chain pointers are now at the same height.
823        debug_assert_eq!(current_number, current_canonical_number);
824
825        // Walk both chains from specified hashes at same height until
826        // a common ancestor (fork block) is reached.
827        while old_hash != current_hash {
828            let block = self.canonical_block_by_hash(old_hash)?;
829            old_hash = block.recovered_block().parent_hash();
830            old_chain.push(block);
831
832            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
833            {
834                current_hash = block.recovered_block().parent_hash();
835                new_chain.push(block);
836            } else {
837                // This shouldn't happen as we've already walked this path
838                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
839                return Ok(None)
840            }
841        }
842        new_chain.reverse();
843        old_chain.reverse();
844
845        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
846    }
847
848    /// Updates the latest block state to the specified canonical ancestor.
849    ///
850    /// This method ensures that the latest block tracks the given canonical header by resetting
851    ///
852    /// # Arguments
853    /// * `canonical_header` - The canonical header to set as the new head
854    ///
855    /// # Returns
856    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
857    ///
858    /// Caution: This unwinds the canonical chain
859    fn update_latest_block_to_canonical_ancestor(
860        &mut self,
861        canonical_header: &SealedHeader<N::BlockHeader>,
862    ) -> ProviderResult<()> {
863        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
864        let current_head_number = self.state.tree_state.canonical_block_number();
865        let new_head_number = canonical_header.number();
866        let new_head_hash = canonical_header.hash();
867
868        // Update tree state with the new canonical head
869        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
870
871        // Handle the state update based on whether this is an unwind scenario
872        if new_head_number < current_head_number {
873            debug!(
874                target: "engine::tree",
875                current_head = current_head_number,
876                new_head = new_head_number,
877                new_head_hash = ?new_head_hash,
878                "FCU unwind detected: reverting to canonical ancestor"
879            );
880
881            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
882        } else {
883            debug!(
884                target: "engine::tree",
885                previous_head = current_head_number,
886                new_head = new_head_number,
887                new_head_hash = ?new_head_hash,
888                "Advancing latest block to canonical ancestor"
889            );
890            self.handle_chain_advance_or_same_height(canonical_header)
891        }
892    }
893
894    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
895    /// to the canonical header
896    fn handle_canonical_chain_unwind(
897        &self,
898        current_head_number: u64,
899        canonical_header: &SealedHeader<N::BlockHeader>,
900    ) -> ProviderResult<()> {
901        let new_head_number = canonical_header.number();
902        debug!(
903            target: "engine::tree",
904            from = current_head_number,
905            to = new_head_number,
906            "Handling unwind: collecting blocks to remove from in-memory state"
907        );
908
909        // Collect blocks that need to be removed from memory
910        let old_blocks =
911            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
912
913        // Load and apply the canonical ancestor block
914        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
915    }
916
917    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
918    fn collect_blocks_for_canonical_unwind(
919        &self,
920        new_head_number: u64,
921        current_head_number: u64,
922    ) -> Vec<ExecutedBlock<N>> {
923        let mut old_blocks =
924            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
925
926        for block_num in (new_head_number + 1)..=current_head_number {
927            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
928                let executed_block = block_state.block_ref().clone();
929                old_blocks.push(executed_block);
930                debug!(
931                    target: "engine::tree",
932                    block_number = block_num,
933                    "Collected block for removal from in-memory state"
934                );
935            }
936        }
937
938        if old_blocks.is_empty() {
939            debug!(
940                target: "engine::tree",
941                "No blocks found in memory to remove, will clear and reset state"
942            );
943        }
944
945        old_blocks
946    }
947
948    /// Applies the canonical ancestor block via a reorg operation.
949    fn apply_canonical_ancestor_via_reorg(
950        &self,
951        canonical_header: &SealedHeader<N::BlockHeader>,
952        old_blocks: Vec<ExecutedBlock<N>>,
953    ) -> ProviderResult<()> {
954        let new_head_hash = canonical_header.hash();
955        let new_head_number = canonical_header.number();
956
957        // Load the canonical ancestor's block
958        let executed_block = self.canonical_block_by_hash(new_head_hash)?;
959        // Perform the reorg to properly handle the unwind
960        self.canonical_in_memory_state
961            .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
962
963        // CRITICAL: Update the canonical head after the reorg
964        // This ensures get_canonical_head() returns the correct block
965        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
966
967        debug!(
968            target: "engine::tree",
969            block_number = new_head_number,
970            block_hash = ?new_head_hash,
971            "Successfully loaded canonical ancestor into memory via reorg"
972        );
973
974        Ok(())
975    }
976
977    /// Handles chain advance or same height scenarios.
978    fn handle_chain_advance_or_same_height(
979        &self,
980        canonical_header: &SealedHeader<N::BlockHeader>,
981    ) -> ProviderResult<()> {
982        // Load the block into memory if it's not already present
983        self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
984
985        // Update the canonical head header
986        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
987
988        Ok(())
989    }
990
991    /// Ensures a block is loaded into memory if not already present.
992    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
993        // Check if block is already in memory
994        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
995            return Ok(());
996        }
997
998        // Load the block from storage
999        let executed_block = self.canonical_block_by_hash(block_hash)?;
1000        self.canonical_in_memory_state
1001            .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1002
1003        debug!(
1004            target: "engine::tree",
1005            block_number,
1006            block_hash = ?block_hash,
1007            "Added canonical block to in-memory state"
1008        );
1009
1010        Ok(())
1011    }
1012
1013    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1014    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1015    /// chain.
1016    ///
1017    /// These responses should adhere to the [Engine API Spec for
1018    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1019    ///
1020    /// Returns an error if an internal error occurred like a database error.
1021    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1022    fn on_forkchoice_updated(
1023        &mut self,
1024        state: ForkchoiceState,
1025        attrs: Option<T::PayloadAttributes>,
1026    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1027        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1028
1029        // Record metrics
1030        self.record_forkchoice_metrics();
1031
1032        // Pre-validation of forkchoice state
1033        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1034            return Ok(TreeOutcome::new(early_result));
1035        }
1036
1037        // Return early if we are on the correct fork
1038        if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1039            return Ok(result);
1040        }
1041
1042        // Attempt to apply a chain update when the head differs from our canonical chain.
1043        // This handles reorgs and chain extensions by making the specified head canonical.
1044        if let Some(result) = self.apply_chain_update(state, &attrs)? {
1045            return Ok(result);
1046        }
1047
1048        // Fallback that ensures to catch up to the network's state.
1049        self.handle_missing_block(state)
1050    }
1051
1052    /// Records metrics for forkchoice updated calls
1053    fn record_forkchoice_metrics(&self) {
1054        self.canonical_in_memory_state.on_forkchoice_update_received();
1055    }
1056
1057    /// Pre-validates the forkchoice state and returns early if validation fails.
1058    ///
1059    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1060    /// returned. Returns `None` if validation passes and processing should continue.
1061    fn validate_forkchoice_state(
1062        &mut self,
1063        state: ForkchoiceState,
1064    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1065        if state.head_block_hash.is_zero() {
1066            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1067        }
1068
1069        // Check if the new head hash is connected to any ancestor that we previously marked as
1070        // invalid
1071        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1072        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1073            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1074        }
1075
1076        if !self.backfill_sync_state.is_idle() {
1077            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1078            // exclusive access to the database
1079            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1080            return Ok(Some(OnForkChoiceUpdated::syncing()));
1081        }
1082
1083        Ok(None)
1084    }
1085
1086    /// Handles the case where the forkchoice head is already canonical.
1087    ///
1088    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1089    /// processing is complete. Returns `None` if the head is not canonical and processing
1090    /// should continue.
1091    fn handle_canonical_head(
1092        &self,
1093        state: ForkchoiceState,
1094        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1095    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1096        // Process the forkchoice update by trying to make the head block canonical
1097        //
1098        // We can only process this forkchoice update if:
1099        // - we have the `head` block
1100        // - the head block is part of a chain that is connected to the canonical chain. This
1101        //   includes reorgs.
1102        //
1103        // Performing a FCU involves:
1104        // - marking the FCU's head block as canonical
1105        // - updating in memory state to reflect the new canonical chain
1106        // - updating canonical state trackers
1107        // - emitting a canonicalization event for the new chain (including reorg)
1108        // - if we have payload attributes, delegate them to the payload service
1109
1110        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1111            return Ok(None);
1112        }
1113
1114        trace!(target: "engine::tree", "fcu head hash is already canonical");
1115
1116        // Update the safe and finalized blocks and ensure their values are valid
1117        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1118            // safe or finalized hashes are invalid
1119            return Ok(Some(TreeOutcome::new(outcome)));
1120        }
1121
1122        // Process payload attributes if the head is already canonical
1123        if let Some(attr) = attrs {
1124            let tip = self
1125                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1126                .ok_or_else(|| {
1127                    // If we can't find the canonical block, then something is wrong and we need
1128                    // to return an error
1129                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1130                })?;
1131            // Clone only when we actually need to process the attributes
1132            let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1133            return Ok(Some(TreeOutcome::new(updated)));
1134        }
1135
1136        // The head block is already canonical
1137        Ok(Some(Self::valid_outcome(state)))
1138    }
1139
1140    /// Applies chain update for the new head block and processes payload attributes.
1141    ///
1142    /// This method handles the case where the forkchoice head differs from our current canonical
1143    /// head. It attempts to make the specified head block canonical by:
1144    /// - Checking if the head is already part of the canonical chain
1145    /// - Applying chain reorganizations (reorgs) if necessary
1146    /// - Processing payload attributes if provided
1147    /// - Returning the appropriate forkchoice update response
1148    ///
1149    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1150    /// Returns `None` if no chain update was needed or possible.
1151    fn apply_chain_update(
1152        &mut self,
1153        state: ForkchoiceState,
1154        attrs: &Option<T::PayloadAttributes>,
1155    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1156        // Check if the head is already part of the canonical chain
1157        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1158            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1159
1160            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1161            // own chain at will, so we need to always trigger a new payload job if requested.
1162            if self.engine_kind.is_opstack() ||
1163                self.config.always_process_payload_attributes_on_canonical_head()
1164            {
1165                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1166                // part of the canonical chain. We need to update the latest block state to reflect
1167                // the canonical ancestor. This ensures that state providers and the transaction
1168                // pool operate with the correct chain state after forkchoice update processing, and
1169                // new payloads built on the reorg'd head will be added to the tree immediately.
1170                if self.config.unwind_canonical_header() {
1171                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1172                }
1173
1174                if let Some(attr) = attrs {
1175                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1176                    // Clone only when we actually need to process the attributes
1177                    let updated =
1178                        self.process_payload_attributes(attr.clone(), &canonical_header, state);
1179                    return Ok(Some(TreeOutcome::new(updated)));
1180                }
1181            }
1182
1183            // According to the Engine API specification, client software MAY skip an update of the
1184            // forkchoice state and MUST NOT begin a payload build process if
1185            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1186            // of canonical chain, i.e. the ancestor passed payload validation process
1187            // and deemed `VALID`. In the case of such an event, client software MUST
1188            // return `{payloadStatus: {status: VALID, latestValidHash:
1189            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1190
1191            // The head block is already canonical and we're not processing payload attributes,
1192            // so we're not triggering a payload job and can return right away
1193            return Ok(Some(Self::valid_outcome(state)));
1194        }
1195
1196        // Ensure we can apply a new chain update for the head block
1197        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1198            let tip = chain_update.tip().clone_sealed_header();
1199            self.on_canonical_chain_update(chain_update);
1200
1201            // Update the safe and finalized blocks and ensure their values are valid
1202            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1203                // safe or finalized hashes are invalid
1204                return Ok(Some(TreeOutcome::new(outcome)));
1205            }
1206
1207            if let Some(attr) = attrs {
1208                // Clone only when we actually need to process the attributes
1209                let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1210                return Ok(Some(TreeOutcome::new(updated)));
1211            }
1212
1213            return Ok(Some(Self::valid_outcome(state)));
1214        }
1215
1216        Ok(None)
1217    }
1218
1219    /// Handles the case where the head block is missing and needs to be downloaded.
1220    ///
1221    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1222    /// Returns a `TreeOutcome` with syncing status and download event.
1223    fn handle_missing_block(
1224        &self,
1225        state: ForkchoiceState,
1226    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1227        // We don't have the block to perform the forkchoice update
1228        // We assume the FCU is valid and at least the head is missing,
1229        // so we need to start syncing to it
1230        //
1231        // find the appropriate target to sync to, if we don't have the safe block hash then we
1232        // start syncing to the safe block via backfill first
1233        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1234        // check that safe block is valid and missing
1235        !state.safe_block_hash.is_zero() &&
1236        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1237        {
1238            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1239            state.safe_block_hash
1240        } else {
1241            state.head_block_hash
1242        };
1243
1244        let target = self.lowest_buffered_ancestor_or(target);
1245        trace!(target: "engine::tree", %target, "downloading missing block");
1246
1247        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1248            PayloadStatusEnum::Syncing,
1249        )))
1250        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1251    }
1252
1253    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1254    /// the current persistence action while we're removing blocks.
1255    fn remove_blocks(&mut self, new_tip_num: u64) {
1256        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1257        if new_tip_num < self.persistence_state.last_persisted_block.number {
1258            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1259            let (tx, rx) = crossbeam_channel::bounded(1);
1260            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1261            self.persistence_state.start_remove(new_tip_num, rx);
1262        }
1263    }
1264
1265    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1266    /// the current persistence action while we're saving blocks.
1267    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1268        if blocks_to_persist.is_empty() {
1269            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1270            return
1271        }
1272
1273        // NOTE: checked non-empty above
1274        let highest_num_hash = blocks_to_persist
1275            .iter()
1276            .max_by_key(|block| block.recovered_block().number())
1277            .map(|b| b.recovered_block().num_hash())
1278            .expect("Checked non-empty persisting blocks");
1279
1280        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1281        let (tx, rx) = crossbeam_channel::bounded(1);
1282        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1283
1284        self.persistence_state.start_save(highest_num_hash, rx);
1285    }
1286
1287    /// Triggers new persistence actions if no persistence task is currently in progress.
1288    ///
1289    /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
1290    /// Persistence completion is handled separately via the `wait_for_event` method.
1291    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1292        if !self.persistence_state.in_progress() {
1293            if let Some(new_tip_num) = self.find_disk_reorg()? {
1294                self.remove_blocks(new_tip_num)
1295            } else if self.should_persist() {
1296                let blocks_to_persist =
1297                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1298                self.persist_blocks(blocks_to_persist);
1299            }
1300        }
1301
1302        Ok(())
1303    }
1304
1305    /// Finishes termination by persisting all remaining blocks and signaling completion.
1306    ///
1307    /// This blocks until all persistence is complete. Always signals completion,
1308    /// even if an error occurs.
1309    fn finish_termination(
1310        &mut self,
1311        pending_termination: oneshot::Sender<()>,
1312    ) -> Result<(), AdvancePersistenceError> {
1313        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1314        let result = self.persist_until_complete();
1315        let _ = pending_termination.send(());
1316        result
1317    }
1318
1319    /// Persists all remaining blocks until none are left.
1320    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1321        loop {
1322            // Wait for any in-progress persistence to complete (blocking)
1323            if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1324                debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1325                let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1326                self.on_persistence_complete(result, start_time)?;
1327            }
1328
1329            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1330
1331            if blocks_to_persist.is_empty() {
1332                debug!(target: "engine::tree", "persistence complete, signaling termination");
1333                return Ok(())
1334            }
1335
1336            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1337            self.persist_blocks(blocks_to_persist);
1338        }
1339    }
1340
1341    /// Tries to poll for a completed persistence task (non-blocking).
1342    ///
1343    /// Returns `true` if a persistence task was completed, `false` otherwise.
1344    #[cfg(test)]
1345    pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1346        let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1347            return Ok(false);
1348        };
1349
1350        match rx.try_recv() {
1351            Ok(result) => {
1352                self.on_persistence_complete(result, start_time)?;
1353                Ok(true)
1354            }
1355            Err(crossbeam_channel::TryRecvError::Empty) => {
1356                // Not ready yet, put it back
1357                self.persistence_state.rx = Some((rx, start_time, action));
1358                Ok(false)
1359            }
1360            Err(crossbeam_channel::TryRecvError::Disconnected) => {
1361                Err(AdvancePersistenceError::ChannelClosed)
1362            }
1363        }
1364    }
1365
1366    /// Handles a completed persistence task.
1367    fn on_persistence_complete(
1368        &mut self,
1369        result: PersistenceResult,
1370        start_time: Instant,
1371    ) -> Result<(), AdvancePersistenceError> {
1372        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1373
1374        let commit_duration = result.commit_duration;
1375        let Some(BlockNumHash {
1376            hash: last_persisted_block_hash,
1377            number: last_persisted_block_number,
1378        }) = result.last_block
1379        else {
1380            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1381            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1382            return Ok(())
1383        };
1384
1385        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1386        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1387
1388        // Evict trie changesets for blocks below the eviction threshold.
1389        // Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
1390        // the finalized block if set.
1391        let min_threshold =
1392            last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1393        let eviction_threshold =
1394            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1395                // Use the minimum of finalized block and retention threshold to be conservative
1396                finalized.number.min(min_threshold)
1397            } else {
1398                // When finalized is not set (e.g., on L2s), use the retention threshold
1399                min_threshold
1400            };
1401        debug!(
1402            target: "engine::tree",
1403            last_persisted = last_persisted_block_number,
1404            finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1405            eviction_threshold,
1406            "Evicting changesets below threshold"
1407        );
1408        self.changeset_cache.evict(eviction_threshold);
1409
1410        // Invalidate cached overlay since the anchor has changed
1411        self.state.tree_state.invalidate_cached_overlay();
1412
1413        self.on_new_persisted_block()?;
1414
1415        // Re-prepare overlay for the current canonical head with the new anchor.
1416        // Spawn a background task to trigger computation so it's ready when the next payload
1417        // arrives.
1418        if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1419            self.runtime.spawn_blocking_named("prepare-overlay", move || {
1420                let _ = overlay.get();
1421            });
1422        }
1423
1424        self.purge_timing_stats(last_persisted_block_number, commit_duration);
1425
1426        Ok(())
1427    }
1428
1429    /// Handles a message from the engine.
1430    ///
1431    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1432    fn on_engine_message(
1433        &mut self,
1434        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1435    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1436        match msg {
1437            FromEngine::Event(event) => match event {
1438                FromOrchestrator::BackfillSyncStarted => {
1439                    debug!(target: "engine::tree", "received backfill sync started event");
1440                    self.backfill_sync_state = BackfillSyncState::Active;
1441                }
1442                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1443                    self.on_backfill_sync_finished(ctrl)?;
1444                }
1445                FromOrchestrator::Terminate { tx } => {
1446                    debug!(target: "engine::tree", "received terminate request");
1447                    if let Err(err) = self.finish_termination(tx) {
1448                        error!(target: "engine::tree", %err, "Termination failed");
1449                    }
1450                    return Ok(ops::ControlFlow::Break(()))
1451                }
1452            },
1453            FromEngine::Request(request) => {
1454                match request {
1455                    EngineApiRequest::InsertExecutedBlock(block) => {
1456                        let block_num_hash = block.recovered_block().num_hash();
1457                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1458                            // outdated block that can be skipped
1459                            return Ok(ops::ControlFlow::Continue(()))
1460                        }
1461
1462                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1463                        let now = Instant::now();
1464
1465                        // if the parent is the canonical head, we can insert the block as the
1466                        // pending block
1467                        if self.state.tree_state.canonical_block_hash() ==
1468                            block.recovered_block().parent_hash()
1469                        {
1470                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1471                            self.canonical_in_memory_state.set_pending_block(block.clone());
1472                        }
1473
1474                        self.state.tree_state.insert_executed(block.clone());
1475                        self.payload_validator.on_inserted_executed_block(block.clone());
1476                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1477                        self.emit_event(EngineApiEvent::BeaconConsensus(
1478                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1479                        ));
1480                    }
1481                    EngineApiRequest::Beacon(request) => {
1482                        match request {
1483                            BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1484                                let has_attrs = payload_attrs.is_some();
1485
1486                                let start = Instant::now();
1487                                let mut output = self.on_forkchoice_updated(state, payload_attrs);
1488
1489                                if let Ok(res) = &mut output {
1490                                    // track last received forkchoice state
1491                                    self.state
1492                                        .forkchoice_state_tracker
1493                                        .set_latest(state, res.outcome.forkchoice_status());
1494
1495                                    // emit an event about the handled FCU
1496                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1497                                        state,
1498                                        res.outcome.forkchoice_status(),
1499                                    ));
1500
1501                                    // handle the event if any
1502                                    self.on_maybe_tree_event(res.event.take())?;
1503                                }
1504
1505                                if let Err(ref err) = output {
1506                                    error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1507                                }
1508
1509                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1510                                    start,
1511                                    &mut self.metrics.engine.new_payload.latest_finish_at,
1512                                    has_attrs,
1513                                    &output,
1514                                );
1515
1516                                if let Err(err) =
1517                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1518                                {
1519                                    self.metrics
1520                                        .engine
1521                                        .failed_forkchoice_updated_response_deliveries
1522                                        .increment(1);
1523                                    warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1524                                }
1525                            }
1526                            BeaconEngineMessage::NewPayload { payload, tx } => {
1527                                let start = Instant::now();
1528                                let gas_used = payload.gas_used();
1529                                let num_hash = payload.num_hash();
1530                                let mut output = self.on_new_payload(payload);
1531                                self.metrics.engine.new_payload.update_response_metrics(
1532                                    start,
1533                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1534                                    &output,
1535                                    gas_used,
1536                                );
1537
1538                                let maybe_event =
1539                                    output.as_mut().ok().and_then(|out| out.event.take());
1540
1541                                // emit response
1542                                if let Err(err) =
1543                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1544                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1545                                    }))
1546                                {
1547                                    warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1548                                    self.metrics
1549                                        .engine
1550                                        .failed_new_payload_response_deliveries
1551                                        .increment(1);
1552                                }
1553
1554                                // handle the event if any
1555                                self.on_maybe_tree_event(maybe_event)?;
1556                            }
1557                            BeaconEngineMessage::RethNewPayload {
1558                                payload,
1559                                wait_for_persistence,
1560                                wait_for_caches,
1561                                tx,
1562                            } => {
1563                                debug!(
1564                                    target: "engine::tree",
1565                                    wait_for_persistence,
1566                                    wait_for_caches,
1567                                    "Processing reth_newPayload"
1568                                );
1569
1570                                let persistence_wait = if wait_for_persistence {
1571                                    let pending_persistence = self.persistence_state.rx.take();
1572                                    if let Some((rx, start_time, _action)) = pending_persistence {
1573                                        let (persistence_tx, persistence_rx) =
1574                                            std::sync::mpsc::channel();
1575                                        self.runtime.spawn_blocking_named(
1576                                            "wait-persist",
1577                                            move || {
1578                                                let start = Instant::now();
1579                                                let result = rx
1580                                                    .recv()
1581                                                    .expect("persistence state channel closed");
1582                                                let _ = persistence_tx.send((
1583                                                    result,
1584                                                    start_time,
1585                                                    start.elapsed(),
1586                                                ));
1587                                            },
1588                                        );
1589                                        let (result, start_time, wait_duration) = persistence_rx
1590                                            .recv()
1591                                            .expect("persistence result channel closed");
1592                                        let _ = self.on_persistence_complete(result, start_time);
1593                                        Some(wait_duration)
1594                                    } else {
1595                                        Some(Duration::ZERO)
1596                                    }
1597                                } else {
1598                                    None
1599                                };
1600
1601                                let cache_wait = wait_for_caches
1602                                    .then(|| self.payload_validator.wait_for_caches());
1603
1604                                let start = Instant::now();
1605                                let gas_used = payload.gas_used();
1606                                let num_hash = payload.num_hash();
1607                                let mut output = self.on_new_payload(payload);
1608                                let latency = start.elapsed();
1609                                self.metrics.engine.new_payload.update_response_metrics(
1610                                    start,
1611                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1612                                    &output,
1613                                    gas_used,
1614                                );
1615
1616                                let maybe_event =
1617                                    output.as_mut().ok().and_then(|out| out.event.take());
1618
1619                                let timings = NewPayloadTimings {
1620                                    latency,
1621                                    persistence_wait,
1622                                    execution_cache_wait: cache_wait
1623                                        .map(|wait| wait.execution_cache),
1624                                    sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1625                                };
1626                                if let Err(err) =
1627                                    tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1628                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1629                                    }))
1630                                {
1631                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1632                                    self.metrics
1633                                        .engine
1634                                        .failed_new_payload_response_deliveries
1635                                        .increment(1);
1636                                }
1637
1638                                self.on_maybe_tree_event(maybe_event)?;
1639                            }
1640                        }
1641                    }
1642                }
1643            }
1644            FromEngine::DownloadedBlocks(blocks) => {
1645                if let Some(event) = self.on_downloaded(blocks)? {
1646                    self.on_tree_event(event)?;
1647                }
1648            }
1649        }
1650        Ok(ops::ControlFlow::Continue(()))
1651    }
1652
1653    /// Invoked if the backfill sync has finished to target.
1654    ///
1655    /// At this point we consider the block synced to the backfill target.
1656    ///
1657    /// Checks the tracked finalized block against the block on disk and requests another backfill
1658    /// run if the distance to the tip exceeds the threshold for another backfill run.
1659    ///
1660    /// This will also do the necessary housekeeping of the tree state, this includes:
1661    ///  - removing all blocks below the backfill height
1662    ///  - resetting the canonical in-memory state
1663    ///
1664    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1665    /// target block.
1666    fn on_backfill_sync_finished(
1667        &mut self,
1668        ctrl: ControlFlow,
1669    ) -> Result<(), InsertBlockFatalError> {
1670        debug!(target: "engine::tree", "received backfill sync finished event");
1671        self.backfill_sync_state = BackfillSyncState::Idle;
1672
1673        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1674        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1675            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1676            // update the `invalid_headers` cache with the new invalid header
1677            self.state.invalid_headers.insert(**bad_block);
1678
1679            // if this was an unwind then the target is the new height
1680            Some(*target)
1681        } else {
1682            // backfill height is the block number that the backfill finished at
1683            ctrl.block_number()
1684        };
1685
1686        // backfill height is the block number that the backfill finished at
1687        let Some(backfill_height) = backfill_height else { return Ok(()) };
1688
1689        // state house keeping after backfill sync
1690        // remove all executed blocks below the backfill height
1691        //
1692        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1693        // before that
1694        let Some(backfill_num_hash) = self
1695            .provider
1696            .block_hash(backfill_height)?
1697            .map(|hash| BlockNumHash { hash, number: backfill_height })
1698        else {
1699            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1700            return Ok(())
1701        };
1702
1703        if ctrl.is_unwind() {
1704            // the node reset so we need to clear everything above that height so that backfill
1705            // height is the new canonical block.
1706            self.state.tree_state.reset(backfill_num_hash)
1707        } else {
1708            self.state.tree_state.remove_until(
1709                backfill_num_hash,
1710                self.persistence_state.last_persisted_block.hash,
1711                Some(backfill_num_hash),
1712            );
1713        }
1714
1715        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1716        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1717
1718        // remove all buffered blocks below the backfill height
1719        self.state.buffer.remove_old_blocks(backfill_height);
1720        self.purge_timing_stats(backfill_height, None);
1721        // we remove all entries because now we're synced to the backfill target and consider this
1722        // the canonical chain
1723        self.canonical_in_memory_state.clear_state();
1724
1725        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1726            // update the tracked chain height, after backfill sync both the canonical height and
1727            // persisted height are the same
1728            self.state.tree_state.set_canonical_head(new_head.num_hash());
1729            self.persistence_state.finish(new_head.hash(), new_head.number());
1730
1731            // update the tracked canonical head
1732            self.canonical_in_memory_state.set_canonical_head(new_head);
1733        }
1734
1735        // check if we need to run backfill again by comparing the most recent finalized height to
1736        // the backfill height
1737        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1738        else {
1739            return Ok(())
1740        };
1741        if sync_target_state.finalized_block_hash.is_zero() {
1742            // no finalized block, can't check distance
1743            return Ok(())
1744        }
1745        // get the block number of the finalized block, if we have it
1746        let newest_finalized = self
1747            .state
1748            .buffer
1749            .block(&sync_target_state.finalized_block_hash)
1750            .map(|block| block.number());
1751
1752        // The block number that the backfill finished at - if the progress or newest
1753        // finalized is None then we can't check the distance anyways.
1754        //
1755        // If both are Some, we perform another distance check and return the desired
1756        // backfill target
1757        if let Some(backfill_target) =
1758            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1759                // Determines whether or not we should run backfill again, in case
1760                // the new gap is still large enough and requires running backfill again
1761                self.backfill_sync_target(progress, finalized_number, None)
1762            })
1763        {
1764            // request another backfill run
1765            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1766                backfill_target.into(),
1767            )));
1768            return Ok(())
1769        };
1770
1771        // Check if there are more blocks to sync between current head and FCU target
1772        if let Some(lowest_buffered) =
1773            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1774        {
1775            let current_head_num = self.state.tree_state.current_canonical_head.number;
1776            let target_head_num = lowest_buffered.number();
1777
1778            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1779            {
1780                // There are blocks between current head and FCU target, download them
1781                debug!(
1782                    target: "engine::tree",
1783                    %current_head_num,
1784                    %target_head_num,
1785                    %distance,
1786                    "Backfill complete, downloading remaining blocks to reach FCU target"
1787                );
1788
1789                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1790                    lowest_buffered.parent_hash(),
1791                    distance,
1792                )));
1793                return Ok(());
1794            }
1795        } else {
1796            // We don't have the head block or any of its ancestors buffered. Request
1797            // a download for the head block which will then trigger further sync.
1798            debug!(
1799                target: "engine::tree",
1800                head_hash = %sync_target_state.head_block_hash,
1801                "Backfill complete but head block not buffered, requesting download"
1802            );
1803            self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1804                sync_target_state.head_block_hash,
1805            )));
1806            return Ok(());
1807        }
1808
1809        // try to close the gap by executing buffered blocks that are child blocks of the new head
1810        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1811    }
1812
1813    /// Attempts to make the given target canonical.
1814    ///
1815    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1816    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1817        if let Some(chain_update) = self.on_new_head(target)? {
1818            self.on_canonical_chain_update(chain_update);
1819        }
1820
1821        Ok(())
1822    }
1823
1824    /// Convenience function to handle an optional tree event.
1825    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1826        if let Some(event) = event {
1827            self.on_tree_event(event)?;
1828        }
1829
1830        Ok(())
1831    }
1832
1833    /// Handles a tree event.
1834    ///
1835    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1836    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1837        match event {
1838            TreeEvent::TreeAction(action) => match action {
1839                TreeAction::MakeCanonical { sync_target_head } => {
1840                    self.make_canonical(sync_target_head)?;
1841                }
1842            },
1843            TreeEvent::BackfillAction(action) => {
1844                self.emit_event(EngineApiEvent::BackfillAction(action));
1845            }
1846            TreeEvent::Download(action) => {
1847                self.emit_event(EngineApiEvent::Download(action));
1848            }
1849        }
1850
1851        Ok(())
1852    }
1853
1854    /// Removes timing stats for blocks at or below `below_number`.
1855    ///
1856    /// No-op when detailed block logging is disabled (no stats are recorded in that case).
1857    /// When `commit_duration` is provided and a slow block threshold is configured, checks
1858    /// each removed block against the threshold and emits a [`ConsensusEngineEvent::SlowBlock`]
1859    /// event for blocks that exceed it.
1860    fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
1861        let threshold = self.config.slow_block_threshold();
1862        let check_slow = commit_duration.is_some() && threshold.is_some();
1863
1864        // Two-pass: collect keys first because emit_event borrows &mut self.
1865        let keys_to_remove: Vec<B256> = self
1866            .execution_timing_stats
1867            .iter()
1868            .filter(|(_, stats)| stats.block_number <= below_number)
1869            .map(|(k, _)| *k)
1870            .collect();
1871
1872        for key in keys_to_remove {
1873            let stats = self.execution_timing_stats.remove(&key).expect("key just found");
1874            if check_slow {
1875                let commit_dur = commit_duration.expect("checked above");
1876                // state_read_duration is already included in execution_duration
1877                let total_duration =
1878                    stats.execution_duration + stats.state_hash_duration + commit_dur;
1879
1880                if total_duration > threshold.expect("checked above") {
1881                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
1882                        stats,
1883                        commit_duration: Some(commit_dur),
1884                        total_duration,
1885                    }));
1886                }
1887            }
1888        }
1889    }
1890
1891    /// Emits an outgoing event to the engine.
1892    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1893        let event = event.into();
1894
1895        if event.is_backfill_action() {
1896            debug_assert_eq!(
1897                self.backfill_sync_state,
1898                BackfillSyncState::Idle,
1899                "backfill action should only be emitted when backfill is idle"
1900            );
1901
1902            if self.persistence_state.in_progress() {
1903                // backfill sync and persisting data are mutually exclusive, so we can't start
1904                // backfill while we're still persisting
1905                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1906                return
1907            }
1908
1909            self.backfill_sync_state = BackfillSyncState::Pending;
1910            self.metrics.engine.pipeline_runs.increment(1);
1911            debug!(target: "engine::tree", "emitting backfill action event");
1912        }
1913
1914        let _ = self.outgoing.send(event).inspect_err(
1915            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1916        );
1917    }
1918
1919    /// Returns true if the canonical chain length minus the last persisted
1920    /// block is greater than or equal to the persistence threshold and
1921    /// backfill is not running.
1922    pub const fn should_persist(&self) -> bool {
1923        if !self.backfill_sync_state.is_idle() {
1924            // can't persist if backfill is running
1925            return false
1926        }
1927
1928        let min_block = self.persistence_state.last_persisted_block.number;
1929        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1930            self.config.persistence_threshold()
1931    }
1932
1933    /// Returns a batch of consecutive canonical blocks to persist in the range
1934    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
1935    fn get_canonical_blocks_to_persist(
1936        &self,
1937        target: PersistTarget,
1938    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1939        // We will calculate the state root using the database, so we need to be sure there are no
1940        // changes
1941        debug_assert!(!self.persistence_state.in_progress());
1942
1943        let mut blocks_to_persist = Vec::new();
1944        let mut current_hash = self.state.tree_state.canonical_block_hash();
1945        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1946        let canonical_head_number = self.state.tree_state.canonical_block_number();
1947
1948        let target_number = match target {
1949            PersistTarget::Head => canonical_head_number,
1950            PersistTarget::Threshold => {
1951                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1952            }
1953        };
1954
1955        debug!(
1956            target: "engine::tree",
1957            ?current_hash,
1958            ?last_persisted_number,
1959            ?canonical_head_number,
1960            ?target_number,
1961            "Returning canonical blocks to persist"
1962        );
1963        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1964            if block.recovered_block().number() <= last_persisted_number {
1965                break;
1966            }
1967
1968            if block.recovered_block().number() <= target_number {
1969                blocks_to_persist.push(block.clone());
1970            }
1971
1972            current_hash = block.recovered_block().parent_hash();
1973        }
1974
1975        // Reverse the order so that the oldest block comes first
1976        blocks_to_persist.reverse();
1977
1978        Ok(blocks_to_persist)
1979    }
1980
1981    /// This clears the blocks from the in-memory tree state that have been persisted to the
1982    /// database.
1983    ///
1984    /// This also updates the canonical in-memory state to reflect the newest persisted block
1985    /// height.
1986    ///
1987    /// Assumes that `finish` has been called on the `persistence_state` at least once
1988    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1989        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1990        // state.
1991        if let Some(remove_above) = self.find_disk_reorg()? {
1992            self.remove_blocks(remove_above);
1993            return Ok(())
1994        }
1995
1996        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1997        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1998        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1999            number: self.persistence_state.last_persisted_block.number,
2000            hash: self.persistence_state.last_persisted_block.hash,
2001        });
2002        Ok(())
2003    }
2004
2005    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
2006    ///
2007    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
2008    /// or the database. If the required historical data (such as trie change sets) has been
2009    /// pruned for a given block, this operation will return an error. On archive nodes, it
2010    /// can retrieve any block.
2011    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2012    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2013        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2014        // check memory first
2015        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2016            return Ok(block.clone())
2017        }
2018
2019        let (block, senders) = self
2020            .provider
2021            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2022            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2023            .split_sealed();
2024        let mut execution_output = self
2025            .provider
2026            .get_state(block.header().number())?
2027            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2028        let hashed_state = self.provider.hashed_post_state(execution_output.state());
2029
2030        debug!(
2031            target: "engine::tree",
2032            number = ?block.number(),
2033            "computing block trie updates",
2034        );
2035        let db_provider = self.provider.database_provider_ro()?;
2036        let trie_updates = reth_trie_db::compute_block_trie_updates(
2037            &self.changeset_cache,
2038            &db_provider,
2039            block.number(),
2040        )?;
2041
2042        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2043        let sorted_trie_updates = Arc::new(trie_updates);
2044        // Skip building trie input and anchor for DB-loaded blocks.
2045        let trie_data =
2046            ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2047
2048        let execution_output = Arc::new(BlockExecutionOutput {
2049            state: execution_output.bundle,
2050            result: BlockExecutionResult {
2051                receipts: execution_output.receipts.pop().unwrap_or_default(),
2052                requests: execution_output.requests.pop().unwrap_or_default(),
2053                gas_used: block.gas_used(),
2054                blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2055            },
2056        });
2057
2058        Ok(ExecutedBlock::new(
2059            Arc::new(RecoveredBlock::new_sealed(block, senders)),
2060            execution_output,
2061            trie_data,
2062        ))
2063    }
2064
2065    /// Returns `true` if a block with the given hash is known, either in memory or in the
2066    /// database. This is a lightweight existence check that avoids constructing a full
2067    /// [`SealedHeader`].
2068    fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2069        if self.state.tree_state.contains_hash(&hash) {
2070            Ok(true)
2071        } else {
2072            self.provider.is_known(hash)
2073        }
2074    }
2075
2076    /// Return sealed block header from in-memory state or database by hash.
2077    fn sealed_header_by_hash(
2078        &self,
2079        hash: B256,
2080    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2081        // check memory first
2082        let header = self.state.tree_state.sealed_header_by_hash(&hash);
2083
2084        if header.is_some() {
2085            Ok(header)
2086        } else {
2087            self.provider.sealed_header_by_hash(hash)
2088        }
2089    }
2090
2091    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
2092    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
2093    /// not exist in the buffer, this returns the hash that is passed in.
2094    ///
2095    /// Returns the parent hash of the block itself if the block is buffered and has no other
2096    /// buffered ancestors.
2097    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2098        self.state
2099            .buffer
2100            .lowest_ancestor(&hash)
2101            .map(|block| block.parent_hash())
2102            .unwrap_or_else(|| hash)
2103    }
2104
2105    /// If validation fails, the response MUST contain the latest valid hash:
2106    ///
2107    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
2108    ///     conditions:
2109    ///     - It is fully validated and deemed VALID
2110    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
2111    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
2112    ///     conditions are satisfied by a `PoW` block.
2113    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
2114    ///     the above conditions.
2115    fn latest_valid_hash_for_invalid_payload(
2116        &mut self,
2117        parent_hash: B256,
2118    ) -> ProviderResult<Option<B256>> {
2119        // Check if parent exists in side chain or in canonical chain.
2120        if self.has_block_by_hash(parent_hash)? {
2121            return Ok(Some(parent_hash))
2122        }
2123
2124        // iterate over ancestors in the invalid cache
2125        // until we encounter the first valid ancestor
2126        let mut current_hash = parent_hash;
2127        let mut current_block = self.state.invalid_headers.get(&current_hash);
2128        while let Some(block_with_parent) = current_block {
2129            current_hash = block_with_parent.parent;
2130            current_block = self.state.invalid_headers.get(&current_hash);
2131
2132            // If current_header is None, then the current_hash does not have an invalid
2133            // ancestor in the cache, check its presence in blockchain tree
2134            if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2135                return Ok(Some(current_hash))
2136            }
2137        }
2138        Ok(None)
2139    }
2140
2141    /// Prepares the invalid payload response for the given hash, checking the
2142    /// database for the parent hash and populating the payload status with the latest valid hash
2143    /// according to the engine api spec.
2144    fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2145        let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2146            // Edge case: the `latestValid` field is the zero hash if the parent block is the
2147            // terminal PoW block, which we need to identify by looking at the parent's block
2148            // difficulty
2149            Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2150            Some(_) => Some(parent_hash),
2151            None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2152        };
2153
2154        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2155            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2156        })
2157        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2158    }
2159
2160    /// Returns true if the given hash is the last received sync target block.
2161    ///
2162    /// See [`ForkchoiceStateTracker::sync_target_state`]
2163    fn is_sync_target_head(&self, block_hash: B256) -> bool {
2164        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2165            return target.head_block_hash == block_hash
2166        }
2167        false
2168    }
2169
2170    /// Returns true if the given hash is part of the last received sync target fork choice update.
2171    ///
2172    /// See [`ForkchoiceStateTracker::sync_target_state`]
2173    fn is_any_sync_target(&self, block_hash: B256) -> bool {
2174        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2175            return target.contains(block_hash)
2176        }
2177        false
2178    }
2179
2180    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
2181    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
2182    ///
2183    /// Returns a payload status response according to the engine API spec if the block is known to
2184    /// be invalid.
2185    fn check_invalid_ancestor_with_head(
2186        &mut self,
2187        check: B256,
2188        head: &SealedBlock<N::Block>,
2189    ) -> ProviderResult<Option<PayloadStatus>> {
2190        // check if the check hash was previously marked as invalid
2191        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2192
2193        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2194    }
2195
2196    /// Invoked when a new payload received is invalid.
2197    fn on_invalid_new_payload(
2198        &mut self,
2199        head: SealedBlock<N::Block>,
2200        invalid: BlockWithParent,
2201    ) -> ProviderResult<PayloadStatus> {
2202        // populate the latest valid hash field
2203        let status = self.prepare_invalid_response(invalid.parent)?;
2204
2205        // insert the head block into the invalid header cache
2206        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2207        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2208
2209        Ok(status)
2210    }
2211
2212    /// Finds any invalid ancestor for the given payload.
2213    ///
2214    /// This function first checks if the block itself is in the invalid headers cache (to
2215    /// avoid re-executing a known-invalid block). Then it walks up the chain of buffered
2216    /// ancestors and checks if any ancestor is marked as invalid.
2217    ///
2218    /// The check works by:
2219    /// 1. Checking if the block hash itself is in the `invalid_headers` map
2220    /// 2. Finding the lowest buffered ancestor for the given block hash
2221    /// 3. If the ancestor is the same as the block hash itself, using the parent hash instead
2222    /// 4. Checking if this ancestor is in the `invalid_headers` map
2223    ///
2224    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
2225    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2226        let parent_hash = payload.parent_hash();
2227        let block_hash = payload.block_hash();
2228
2229        // Check if the block itself is already known to be invalid, avoiding re-execution
2230        if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2231            return Some(entry);
2232        }
2233
2234        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2235        if lowest_buffered_ancestor == block_hash {
2236            lowest_buffered_ancestor = parent_hash;
2237        }
2238
2239        // Check if the block has an invalid ancestor
2240        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2241    }
2242
2243    /// Handles a payload that has an invalid ancestor.
2244    ///
2245    /// This function validates the payload and processes it according to whether it's
2246    /// well-formed or malformed:
2247    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2248    ///    known-bad block, which violates consensus rules
2249    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2250    ///    validated due to its own structural issues
2251    fn handle_invalid_ancestor_payload(
2252        &mut self,
2253        payload: T::ExecutionData,
2254        invalid: BlockWithParent,
2255    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2256        let parent_hash = payload.parent_hash();
2257        let num_hash = payload.num_hash();
2258
2259        // Here we might have 2 cases
2260        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2261        //    remember it as invalid
2262        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2263        //    an error and forget it
2264        let block = match self.payload_validator.convert_payload_to_block(payload) {
2265            Ok(block) => block,
2266            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2267        };
2268
2269        Ok(self.on_invalid_new_payload(block, invalid)?)
2270    }
2271
2272    /// Checks if the given `head` points to an invalid header, which requires a specific response
2273    /// to a forkchoice update.
2274    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2275        // check if the head was previously marked as invalid
2276        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2277
2278        // Try to prepare invalid response, but handle errors gracefully
2279        match self.prepare_invalid_response(header.parent) {
2280            Ok(status) => Ok(Some(status)),
2281            Err(err) => {
2282                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2283                // Return a basic invalid status without latest valid hash
2284                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2285                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2286                })))
2287            }
2288        }
2289    }
2290
2291    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2292    /// and block body itself.
2293    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2294        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2295            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2296            return Err(e)
2297        }
2298
2299        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2300            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2301            return Err(e)
2302        }
2303
2304        Ok(())
2305    }
2306
2307    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2308    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2309    fn try_connect_buffered_blocks(
2310        &mut self,
2311        parent: BlockNumHash,
2312    ) -> Result<(), InsertBlockFatalError> {
2313        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2314
2315        if blocks.is_empty() {
2316            // nothing to append
2317            return Ok(())
2318        }
2319
2320        let now = Instant::now();
2321        let block_count = blocks.len();
2322        for child in blocks {
2323            let child_num_hash = child.num_hash();
2324            match self.insert_block(child) {
2325                Ok(res) => {
2326                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2327                    if self.is_any_sync_target(child_num_hash.hash) &&
2328                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2329                    {
2330                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2331                        // we just inserted a block that we know is part of the canonical chain, so
2332                        // we can make it canonical
2333                        self.make_canonical(child_num_hash.hash)?;
2334                    }
2335                }
2336                Err(err) => {
2337                    if let InsertPayloadError::Block(err) = err {
2338                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2339                        if let Err(fatal) = self.on_insert_block_error(err) {
2340                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2341                            return Err(fatal)
2342                        }
2343                    }
2344                }
2345            }
2346        }
2347
2348        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2349        Ok(())
2350    }
2351
2352    /// Pre-validates the block and inserts it into the buffer.
2353    fn buffer_block(
2354        &mut self,
2355        block: SealedBlock<N::Block>,
2356    ) -> Result<(), InsertBlockError<N::Block>> {
2357        if let Err(err) = self.validate_block(&block) {
2358            return Err(InsertBlockError::consensus_error(err, block))
2359        }
2360        self.state.buffer.insert_block(block);
2361        Ok(())
2362    }
2363
2364    /// Returns true if the distance from the local tip to the block is greater than the configured
2365    /// threshold.
2366    ///
2367    /// If the `local_tip` is greater than the `block`, then this will return false.
2368    #[inline]
2369    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2370        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2371    }
2372
2373    /// Returns how far the local tip is from the given block. If the local tip is at the same
2374    /// height or its block number is greater than the given block, this returns None.
2375    #[inline]
2376    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2377        if block > local_tip {
2378            Some(block - local_tip)
2379        } else {
2380            None
2381        }
2382    }
2383
2384    /// Returns the target hash to sync to if the distance from the local tip to the block is
2385    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2386    /// that block already).
2387    ///
2388    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2389    /// (missing) finalized block.
2390    fn backfill_sync_target(
2391        &self,
2392        canonical_tip_num: u64,
2393        target_block_number: u64,
2394        downloaded_block: Option<BlockNumHash>,
2395    ) -> Option<B256> {
2396        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2397
2398        // check if the downloaded block is the tracked finalized block
2399        let exceeds_backfill_threshold =
2400            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2401                // if we downloaded the finalized block we can now check how far we're off
2402                (Some(downloaded_block), Some(state))
2403                    if downloaded_block.hash == state.finalized_block_hash =>
2404                {
2405                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2406                }
2407                _ => match sync_target_state
2408                    .as_ref()
2409                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2410                {
2411                    Some(buffered_finalized) => {
2412                        // if we have buffered the finalized block, we should check how far we're
2413                        // off
2414                        self.exceeds_backfill_run_threshold(
2415                            canonical_tip_num,
2416                            buffered_finalized.number(),
2417                        )
2418                    }
2419                    None => {
2420                        // check if the distance exceeds the threshold for backfill sync
2421                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2422                    }
2423                },
2424            };
2425
2426        // if the number of missing blocks is greater than the max, trigger backfill
2427        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2428            // if we have already canonicalized the finalized block, we should skip backfill
2429            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2430                Err(err) => {
2431                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2432                }
2433                Ok(None) => {
2434                    // ensure the finalized block is known (not the zero hash)
2435                    if !state.finalized_block_hash.is_zero() {
2436                        // we don't have the block yet and the distance exceeds the allowed
2437                        // threshold
2438                        return Some(state.finalized_block_hash)
2439                    }
2440
2441                    // OPTIMISTIC SYNCING
2442                    //
2443                    // It can happen when the node is doing an
2444                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2445                    // but is expecting the EL to sync as high
2446                    // as possible before finalizing.
2447                    //
2448                    // This usually doesn't happen on ETH mainnet since CLs use the more
2449                    // secure checkpoint syncing.
2450                    //
2451                    // However, optimism chains will do this. The risk of a reorg is however
2452                    // low.
2453                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2454                    return Some(state.head_block_hash)
2455                }
2456                Ok(Some(_)) => {
2457                    // we're fully synced to the finalized block
2458                }
2459            }
2460        }
2461
2462        None
2463    }
2464
2465    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2466    /// happen if a reorg is happening while we are persisting a block.
2467    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2468        let mut canonical = self.state.tree_state.current_canonical_head;
2469        let mut persisted = self.persistence_state.last_persisted_block;
2470
2471        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2472            Ok(self
2473                .sealed_header_by_hash(num_hash.hash)?
2474                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2475                .parent_num_hash())
2476        };
2477
2478        // Happy path, canonical chain is ahead or equal to persisted chain.
2479        // Walk canonical chain back to make sure that it connects to persisted chain.
2480        while canonical.number > persisted.number {
2481            canonical = parent_num_hash(canonical)?;
2482        }
2483
2484        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2485        if canonical == persisted {
2486            return Ok(None);
2487        }
2488
2489        // At this point, we know that `persisted` block can't be reached by walking the canonical
2490        // chain back. In this case we need to truncate it to the first canonical block it connects
2491        // to.
2492
2493        // Firstly, walk back until we reach the same height as `canonical`.
2494        while persisted.number > canonical.number {
2495            persisted = parent_num_hash(persisted)?;
2496        }
2497
2498        debug_assert_eq!(persisted.number, canonical.number);
2499
2500        // Now walk both chains back until we find a common ancestor.
2501        while persisted.hash != canonical.hash {
2502            canonical = parent_num_hash(canonical)?;
2503            persisted = parent_num_hash(persisted)?;
2504        }
2505
2506        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2507
2508        Ok(Some(persisted.number))
2509    }
2510
2511    /// Invoked when we the canonical chain has been updated.
2512    ///
2513    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2514    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2515        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2516        let start = Instant::now();
2517
2518        // update the tracked canonical head
2519        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2520
2521        let tip = chain_update.tip().clone_sealed_header();
2522        let notification = chain_update.to_chain_notification();
2523
2524        // reinsert any missing reorged blocks
2525        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2526            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2527            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2528            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2529
2530            self.update_reorg_metrics(old.len(), old_first);
2531            self.reinsert_reorged_blocks(new.clone());
2532            self.reinsert_reorged_blocks(old.clone());
2533        }
2534
2535        // update the tracked in-memory state with the new chain
2536        self.canonical_in_memory_state.update_chain(chain_update);
2537        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2538
2539        // Update metrics based on new tip
2540        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2541
2542        // sends an event to all active listeners about the new canonical chain
2543        self.canonical_in_memory_state.notify_canon_state(notification);
2544
2545        // emit event
2546        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2547            Box::new(tip),
2548            start.elapsed(),
2549        ));
2550    }
2551
2552    /// This updates metrics based on the given reorg length and first reorged block number.
2553    fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2554        if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2555            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2556                first_reorged_block <= finalized.number
2557            {
2558                self.metrics.tree.reorgs.finalized.increment(1);
2559            } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2560                first_reorged_block <= safe.number
2561            {
2562                self.metrics.tree.reorgs.safe.increment(1);
2563            } else {
2564                self.metrics.tree.reorgs.head.increment(1);
2565            }
2566        } else {
2567            debug_unreachable!("Reorged chain doesn't have any blocks");
2568        }
2569        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2570    }
2571
2572    /// This reinserts any blocks in the new chain that do not already exist in the tree
2573    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2574        for block in new_chain {
2575            if self
2576                .state
2577                .tree_state
2578                .executed_block_by_hash(block.recovered_block().hash())
2579                .is_none()
2580            {
2581                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2582                self.state.tree_state.insert_executed(block);
2583            }
2584        }
2585    }
2586
2587    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2588    ///
2589    /// This mainly compares the missing parent of the downloaded block with the current canonical
2590    /// tip, and decides whether or not backfill sync should be triggered.
2591    fn on_disconnected_downloaded_block(
2592        &self,
2593        downloaded_block: BlockNumHash,
2594        missing_parent: BlockNumHash,
2595        head: BlockNumHash,
2596    ) -> Option<TreeEvent> {
2597        // compare the missing parent with the canonical tip
2598        if let Some(target) =
2599            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2600        {
2601            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2602            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2603        }
2604
2605        // continue downloading the missing parent
2606        //
2607        // this happens if either:
2608        //  * the missing parent block num < canonical tip num
2609        //    * this case represents a missing block on a fork that is shorter than the canonical
2610        //      chain
2611        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2612        //    less than the backfill threshold
2613        //    * this case represents a potentially long range of blocks to download and execute
2614        let request = if let Some(distance) =
2615            self.distance_from_local_tip(head.number, missing_parent.number)
2616        {
2617            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2618            DownloadRequest::BlockRange(missing_parent.hash, distance)
2619        } else {
2620            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2621            // This happens when the missing parent is on an outdated
2622            // sidechain and we can only download the missing block itself
2623            DownloadRequest::single_block(missing_parent.hash)
2624        };
2625
2626        Some(TreeEvent::Download(request))
2627    }
2628
2629    /// Handles a downloaded block that was successfully inserted as valid.
2630    ///
2631    /// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
2632    /// If it matches a non-head sync target (safe or finalized), makes it canonical inline
2633    /// and triggers a download for the remaining blocks towards the actual head.
2634    /// Otherwise, tries to connect buffered blocks.
2635    fn on_valid_downloaded_block(
2636        &mut self,
2637        block_num_hash: BlockNumHash,
2638    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2639        // check if we just inserted a block that's part of sync targets,
2640        // i.e. head, safe, or finalized
2641        if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2642            sync_target.contains(block_num_hash.hash)
2643        {
2644            debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2645
2646            if sync_target.head_block_hash == block_num_hash.hash {
2647                // we just inserted the sync target head block, make it canonical
2648                return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2649                    sync_target_head: block_num_hash.hash,
2650                })))
2651            }
2652
2653            // This block is part of the sync target (safe or finalized) but not the
2654            // head. Make it canonical and try to connect any buffered children, then
2655            // continue downloading towards the actual head if needed.
2656            self.make_canonical(block_num_hash.hash)?;
2657            self.try_connect_buffered_blocks(block_num_hash)?;
2658
2659            // Check if we've reached the sync target head after connecting buffered
2660            // blocks (e.g. the head block may have already been buffered).
2661            if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2662                let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2663                trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2664                return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2665            }
2666
2667            return Ok(None)
2668        }
2669        trace!(target: "engine::tree", "appended downloaded block");
2670        self.try_connect_buffered_blocks(block_num_hash)?;
2671        Ok(None)
2672    }
2673
2674    /// Invoked with a block downloaded from the network
2675    ///
2676    /// Returns an event with the appropriate action to take, such as:
2677    ///  - download more missing blocks
2678    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2679    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2680    fn on_downloaded_block(
2681        &mut self,
2682        block: SealedBlock<N::Block>,
2683    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2684        let block_num_hash = block.num_hash();
2685        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2686        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2687            return Ok(None)
2688        }
2689
2690        if !self.backfill_sync_state.is_idle() {
2691            return Ok(None)
2692        }
2693
2694        // try to append the block
2695        match self.insert_block(block) {
2696            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2697                return self.on_valid_downloaded_block(block_num_hash);
2698            }
2699            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2700                // block is not connected to the canonical head, we need to download
2701                // its missing branch first
2702                return Ok(self.on_disconnected_downloaded_block(
2703                    block_num_hash,
2704                    missing_ancestor,
2705                    head,
2706                ))
2707            }
2708            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2709                trace!(target: "engine::tree", "downloaded block already executed");
2710            }
2711            Err(err) => {
2712                if let InsertPayloadError::Block(err) = err {
2713                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2714                    if let Err(fatal) = self.on_insert_block_error(err) {
2715                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2716                        return Err(fatal)
2717                    }
2718                }
2719            }
2720        }
2721        Ok(None)
2722    }
2723
2724    /// Inserts a payload into the tree and executes it.
2725    ///
2726    /// This function validates the payload's basic structure, then executes it using the
2727    /// payload validator. The execution includes running all transactions in the payload
2728    /// and validating the resulting state transitions.
2729    ///
2730    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2731    /// or `InsertPayloadError` if validation or execution failed.
2732    fn insert_payload(
2733        &mut self,
2734        payload: T::ExecutionData,
2735    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2736        self.insert_block_or_payload(
2737            payload.block_with_parent(),
2738            payload,
2739            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2740            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2741        )
2742    }
2743
2744    fn insert_block(
2745        &mut self,
2746        block: SealedBlock<N::Block>,
2747    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2748        self.insert_block_or_payload(
2749            block.block_with_parent(),
2750            block,
2751            |validator, block, ctx| validator.validate_block(block, ctx),
2752            |_, block| Ok(block),
2753        )
2754    }
2755
2756    /// Inserts a block or payload into the blockchain tree with full execution.
2757    ///
2758    /// This is a generic function that handles both blocks and payloads by accepting
2759    /// a block identifier, input data, and execution/validation functions. It performs
2760    /// comprehensive checks and execution:
2761    ///
2762    /// - Validates that the block doesn't already exist in the tree
2763    /// - Ensures parent state is available, buffering if necessary
2764    /// - Executes the block/payload using the provided execute function
2765    /// - Handles both canonical and fork chain insertions
2766    /// - Updates pending block state when appropriate
2767    /// - Emits consensus engine events and records metrics
2768    ///
2769    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2770    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2771    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2772    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2773    fn insert_block_or_payload<Input, Err>(
2774        &mut self,
2775        block_id: BlockWithParent,
2776        input: Input,
2777        execute: impl FnOnce(
2778            &mut V,
2779            Input,
2780            TreeCtx<'_, N>,
2781        )
2782            -> Result<(ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>), Err>,
2783        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2784    ) -> Result<InsertPayloadOk, Err>
2785    where
2786        Err: From<InsertBlockError<N::Block>>,
2787    {
2788        let block_insert_start = Instant::now();
2789        let block_num_hash = block_id.block;
2790        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2791
2792        // Check if block already exists - first in memory, then DB only if it could be persisted
2793        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2794            convert_to_block(self, input)?;
2795            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2796        }
2797
2798        // Only query DB if block could be persisted (number <= last persisted block).
2799        // New blocks from CL always have number > last persisted, so skip DB lookup for them.
2800        if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2801            match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2802                Err(err) => {
2803                    let block = convert_to_block(self, input)?;
2804                    return Err(InsertBlockError::new(block, err.into()).into());
2805                }
2806                Ok(Some(_)) => {
2807                    convert_to_block(self, input)?;
2808                    return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2809                }
2810                Ok(None) => {}
2811            }
2812        }
2813
2814        // Ensure that the parent state is available.
2815        match self.state_provider_builder(block_id.parent) {
2816            Err(err) => {
2817                let block = convert_to_block(self, input)?;
2818                return Err(InsertBlockError::new(block, err.into()).into());
2819            }
2820            Ok(None) => {
2821                let block = convert_to_block(self, input)?;
2822
2823                // we don't have the state required to execute this block, buffering it and find the
2824                // missing parent block
2825                let missing_ancestor = self
2826                    .state
2827                    .buffer
2828                    .lowest_ancestor(&block.parent_hash())
2829                    .map(|block| block.parent_num_hash())
2830                    .unwrap_or_else(|| block.parent_num_hash());
2831
2832                self.state.buffer.insert_block(block);
2833
2834                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2835                    head: self.state.tree_state.current_canonical_head,
2836                    missing_ancestor,
2837                }))
2838            }
2839            Ok(Some(_)) => {}
2840        }
2841
2842        // determine whether we are on a fork chain by comparing the block number with the
2843        // canonical head. This is a simple check that is sufficient for the event emission below.
2844        // A block is considered a fork if its number is less than or equal to the canonical head,
2845        // as this indicates there's already a canonical block at that height.
2846        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2847
2848        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2849
2850        let start = Instant::now();
2851
2852        let (executed, timing_stats) = execute(&mut self.payload_validator, input, ctx)?;
2853
2854        // Emit slow block event immediately after execution so it appears even when
2855        // persistence hasn't completed yet (e.g. blocks arriving faster than persistence).
2856        if let Some(stats) = timing_stats {
2857            if let Some(threshold) = self.config.slow_block_threshold() {
2858                let total_duration = stats.execution_duration + stats.state_hash_duration;
2859                if total_duration > threshold {
2860                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2861                        stats: stats.clone(),
2862                        commit_duration: None,
2863                        total_duration,
2864                    }));
2865                }
2866            }
2867            self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
2868        }
2869
2870        // if the parent is the canonical head, we can insert the block as the pending block
2871        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2872        {
2873            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2874            self.canonical_in_memory_state.set_pending_block(executed.clone());
2875        }
2876
2877        self.state.tree_state.insert_executed(executed.clone());
2878        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2879
2880        // emit insert event
2881        let elapsed = start.elapsed();
2882        let engine_event = if is_fork {
2883            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2884        } else {
2885            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2886        };
2887        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2888
2889        self.metrics
2890            .engine
2891            .block_insert_total_duration
2892            .record(block_insert_start.elapsed().as_secs_f64());
2893        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2894        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2895    }
2896
2897    /// Handles an error that occurred while inserting a block.
2898    ///
2899    /// If this is a validation error this will mark the block as invalid.
2900    ///
2901    /// Returns the proper payload status response if the block is invalid.
2902    fn on_insert_block_error(
2903        &mut self,
2904        error: InsertBlockError<N::Block>,
2905    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2906        let (block, error) = error.split();
2907
2908        // if invalid block, we check the validation error. Otherwise return the fatal
2909        // error.
2910        let validation_err = error.ensure_validation_error()?;
2911
2912        // If the error was due to an invalid payload, the payload is added to the
2913        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2914        // returned.
2915        warn!(
2916            target: "engine::tree",
2917            invalid_hash=%block.hash(),
2918            invalid_number=block.number(),
2919            %validation_err,
2920            "Invalid block error on new payload",
2921        );
2922        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2923
2924        // keep track of the invalid header
2925        self.state.invalid_headers.insert(block.block_with_parent());
2926        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2927            Box::new(block),
2928        )));
2929
2930        Ok(PayloadStatus::new(
2931            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2932            latest_valid_hash,
2933        ))
2934    }
2935
2936    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2937    fn on_new_payload_error(
2938        &mut self,
2939        error: NewPayloadError,
2940        payload_num_hash: NumHash,
2941        parent_hash: B256,
2942    ) -> ProviderResult<PayloadStatus> {
2943        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2944        // we need to convert the error to a payload status (response to the CL)
2945
2946        let latest_valid_hash =
2947            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2948                // Engine-API rules:
2949                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2950                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2951                None
2952            } else {
2953                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2954            };
2955
2956        let status = PayloadStatusEnum::from(error);
2957        Ok(PayloadStatus::new(status, latest_valid_hash))
2958    }
2959
2960    /// Attempts to find the header for the given block hash if it is canonical.
2961    pub fn find_canonical_header(
2962        &self,
2963        hash: B256,
2964    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2965        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2966
2967        if canonical.is_none() {
2968            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2969        }
2970
2971        Ok(canonical)
2972    }
2973
2974    /// Updates the tracked finalized block if we have it.
2975    fn update_finalized_block(
2976        &self,
2977        finalized_block_hash: B256,
2978    ) -> Result<(), OnForkChoiceUpdated> {
2979        if finalized_block_hash.is_zero() {
2980            return Ok(())
2981        }
2982
2983        match self.find_canonical_header(finalized_block_hash) {
2984            Ok(None) => {
2985                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2986                // if the finalized block is not known, we can't update the finalized block
2987                return Err(OnForkChoiceUpdated::invalid_state())
2988            }
2989            Ok(Some(finalized)) => {
2990                if Some(finalized.num_hash()) !=
2991                    self.canonical_in_memory_state.get_finalized_num_hash()
2992                {
2993                    // we're also persisting the finalized block on disk so we can reload it on
2994                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2995                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2996                    self.canonical_in_memory_state.set_finalized(finalized.clone());
2997                    // Update finalized block height metric
2998                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2999                }
3000            }
3001            Err(err) => {
3002                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3003            }
3004        }
3005
3006        Ok(())
3007    }
3008
3009    /// Updates the tracked safe block if we have it
3010    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3011        if safe_block_hash.is_zero() {
3012            return Ok(())
3013        }
3014
3015        match self.find_canonical_header(safe_block_hash) {
3016            Ok(None) => {
3017                debug!(target: "engine::tree", "Safe block not found in canonical chain");
3018                // if the safe block is not known, we can't update the safe block
3019                return Err(OnForkChoiceUpdated::invalid_state())
3020            }
3021            Ok(Some(safe)) => {
3022                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3023                    // we're also persisting the safe block on disk so we can reload it on
3024                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3025                    let _ = self.persistence.save_safe_block_number(safe.number());
3026                    self.canonical_in_memory_state.set_safe(safe.clone());
3027                    // Update safe block height metric
3028                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
3029                }
3030            }
3031            Err(err) => {
3032                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3033            }
3034        }
3035
3036        Ok(())
3037    }
3038
3039    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
3040    /// made canonical.
3041    ///
3042    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
3043    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
3044    ///
3045    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
3046    /// are consistent with the head block.
3047    fn ensure_consistent_forkchoice_state(
3048        &self,
3049        state: ForkchoiceState,
3050    ) -> Result<(), OnForkChoiceUpdated> {
3051        // Ensure that the finalized block, if not zero, is known and in the canonical chain
3052        // after the head block is canonicalized.
3053        //
3054        // This ensures that the finalized block is consistent with the head block, i.e. the
3055        // finalized block is an ancestor of the head block.
3056        self.update_finalized_block(state.finalized_block_hash)?;
3057
3058        // Also ensure that the safe block, if not zero, is known and in the canonical chain
3059        // after the head block is canonicalized.
3060        //
3061        // This ensures that the safe block is consistent with the head block, i.e. the safe
3062        // block is an ancestor of the head block.
3063        self.update_safe_block(state.safe_block_hash)
3064    }
3065
3066    /// Validates the payload attributes with respect to the header and fork choice state.
3067    ///
3068    /// This is called during `engine_forkchoiceUpdated` when the CL provides payload attributes,
3069    /// indicating it wants the EL to start building a new block.
3070    ///
3071    /// Runs [`PayloadValidator::validate_payload_attributes_against_header`](reth_engine_primitives::PayloadValidator::validate_payload_attributes_against_header) to ensure
3072    /// `payloadAttributes.timestamp > headBlock.timestamp` per the Engine API spec.
3073    ///
3074    /// If validation passes, sends the attributes to the payload builder to start a new
3075    /// payload job. If it fails, returns `INVALID_PAYLOAD_ATTRIBUTES` without rolling back
3076    /// the forkchoice update.
3077    ///
3078    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
3079    /// return an error if the payload attributes are invalid.
3080    fn process_payload_attributes(
3081        &self,
3082        attrs: T::PayloadAttributes,
3083        head: &N::BlockHeader,
3084        state: ForkchoiceState,
3085    ) -> OnForkChoiceUpdated {
3086        if let Err(err) =
3087            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
3088        {
3089            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3090            return OnForkChoiceUpdated::invalid_payload_attributes()
3091        }
3092
3093        // 8. Client software MUST begin a payload build process building on top of
3094        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
3095        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
3096        //    The build process is specified in the Payload building section.
3097
3098        // send the payload to the builder and return the receiver for the pending payload
3099        // id, initiating payload job is handled asynchronously
3100        let pending_payload_id =
3101            self.payload_builder.send_new_payload(state.head_block_hash, attrs);
3102
3103        // Client software MUST respond to this method call in the following way:
3104        // {
3105        //      payloadStatus: {
3106        //          status: VALID,
3107        //          latestValidHash: forkchoiceState.headBlockHash,
3108        //          validationError: null
3109        //      },
3110        //      payloadId: buildProcessId
3111        // }
3112        //
3113        // if the payload is deemed VALID and the build process has begun.
3114        OnForkChoiceUpdated::updated_with_pending_payload_id(
3115            PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3116            pending_payload_id,
3117        )
3118    }
3119
3120    /// Remove all blocks up to __and including__ the given block number.
3121    ///
3122    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
3123    /// those which have a fork point at or below the finalized hash.
3124    ///
3125    /// Canonical blocks below the upper bound will still be removed.
3126    pub(crate) fn remove_before(
3127        &mut self,
3128        upper_bound: BlockNumHash,
3129        finalized_hash: Option<B256>,
3130    ) -> ProviderResult<()> {
3131        // first fetch the finalized block number and then call the remove_before method on
3132        // tree_state
3133        let num = if let Some(hash) = finalized_hash {
3134            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3135        } else {
3136            None
3137        };
3138
3139        self.state.tree_state.remove_until(
3140            upper_bound,
3141            self.persistence_state.last_persisted_block.hash,
3142            num,
3143        );
3144        Ok(())
3145    }
3146
3147    /// Returns a builder for creating state providers for the given hash.
3148    ///
3149    /// This is an optimization for parallel execution contexts where we want to avoid
3150    /// creating state providers in the critical path.
3151    pub fn state_provider_builder(
3152        &self,
3153        hash: B256,
3154    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3155    where
3156        P: BlockReader + StateProviderFactory + StateReader + Clone,
3157    {
3158        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3159            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3160            // the block leads back to the canonical chain
3161            return Ok(Some(StateProviderBuilder::new(
3162                self.provider.clone(),
3163                historical,
3164                Some(blocks),
3165            )))
3166        }
3167
3168        // Check if the block is persisted
3169        if let Some(header) = self.provider.header(hash)? {
3170            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3171            // For persisted blocks, we create a builder that will fetch state directly from the
3172            // database
3173            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3174        }
3175
3176        debug!(target: "engine::tree", %hash, "no canonical state found for block");
3177        Ok(None)
3178    }
3179}
3180
3181/// Events received in the main engine loop.
3182#[derive(Debug)]
3183enum LoopEvent<T, N>
3184where
3185    N: NodePrimitives,
3186    T: PayloadTypes,
3187{
3188    /// An engine API message was received.
3189    EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3190    /// A persistence task completed.
3191    PersistenceComplete {
3192        /// The unified result of the persistence operation.
3193        result: PersistenceResult,
3194        /// When the persistence operation started.
3195        start_time: Instant,
3196    },
3197    /// A channel was disconnected.
3198    Disconnected,
3199}
3200
3201/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3202/// variant.
3203///
3204/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3205/// is valid or not.
3206#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3207pub enum BlockStatus {
3208    /// The block is valid and block extends canonical chain.
3209    Valid,
3210    /// The block may be valid and has an unknown missing ancestor.
3211    Disconnected {
3212        /// Current canonical head.
3213        head: BlockNumHash,
3214        /// The lowest ancestor block that is not connected to the canonical chain.
3215        missing_ancestor: BlockNumHash,
3216    },
3217}
3218
3219/// How a payload was inserted if it was valid.
3220///
3221/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3222/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3223#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3224pub enum InsertPayloadOk {
3225    /// The payload was valid, but we have already seen it.
3226    AlreadySeen(BlockStatus),
3227    /// The payload was valid and inserted into the tree.
3228    Inserted(BlockStatus),
3229}
3230
3231/// Target for block persistence.
3232#[derive(Debug, Clone, Copy)]
3233enum PersistTarget {
3234    /// Persist up to `canonical_head - memory_block_buffer_target`.
3235    Threshold,
3236    /// Persist all blocks up to and including the canonical head.
3237    Head,
3238}
3239
3240/// Result of waiting for caches to become available.
3241#[derive(Debug, Clone, Copy, Default)]
3242pub struct CacheWaitDurations {
3243    /// Time spent waiting for the execution cache lock.
3244    pub execution_cache: Duration,
3245    /// Time spent waiting for the sparse trie lock.
3246    pub sparse_trie: Duration,
3247}
3248
3249/// Trait for types that can wait for caches to become available.
3250///
3251/// This is used by `reth_newPayload` endpoint to ensure that payload processing
3252/// waits for any ongoing operations to complete before starting.
3253pub trait WaitForCaches {
3254    /// Waits for cache updates to complete.
3255    ///
3256    /// Returns the time spent waiting for each cache separately.
3257    fn wait_for_caches(&self) -> CacheWaitDurations;
3258}