Skip to main content

reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17    MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29    FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32    BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33    DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34    StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35    StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48    oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64
65use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
66pub use block_buffer::BlockBuffer;
67pub use invalid_headers::InvalidHeaderCache;
68pub use metrics::EngineApiMetrics;
69pub use payload_processor::*;
70pub use payload_validator::{BasicEngineValidator, EngineValidator};
71pub use persistence_state::PersistenceState;
72pub use reth_engine_primitives::TreeConfig;
73pub use reth_execution_cache::{
74    CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider, ExecutionCache,
75    PayloadExecutionCache, SavedCache,
76};
77
78pub mod state;
79
80/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
81///
82/// This is the default threshold, and represents the distance (gap) from the local head to a
83/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
84/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
85///
86/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
87/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
88/// backfill this gap.
89pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91/// The minimum number of blocks to retain in the changeset cache after eviction.
92///
93/// This ensures that recent trie changesets are kept in memory for potential reorgs,
94/// even when the finalized block is not set (e.g., on L2s like Optimism).
95const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
96
97/// A builder for creating state providers that can be used across threads.
98#[derive(Clone, Debug)]
99pub struct StateProviderBuilder<N: NodePrimitives, P> {
100    /// The provider factory used to create providers.
101    provider_factory: P,
102    /// The historical block hash to fetch state from.
103    historical: B256,
104    /// The blocks that form the chain from historical to target and are in memory.
105    overlay: Option<Vec<ExecutedBlock<N>>>,
106}
107
108impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
109    /// Creates a new state provider from the provider factory, historical block hash and optional
110    /// overlaid blocks.
111    pub const fn new(
112        provider_factory: P,
113        historical: B256,
114        overlay: Option<Vec<ExecutedBlock<N>>>,
115    ) -> Self {
116        Self { provider_factory, historical, overlay }
117    }
118}
119
120impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
121where
122    P: BlockReader + StateProviderFactory + StateReader + Clone,
123{
124    /// Creates a new state provider from this builder.
125    pub fn build(&self) -> ProviderResult<StateProviderBox> {
126        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
127        if let Some(overlay) = self.overlay.clone() {
128            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
129        }
130        Ok(provider)
131    }
132}
133
134/// Tracks the state of the engine api internals.
135///
136/// This type is not shareable.
137#[derive(Debug)]
138pub struct EngineApiTreeState<N: NodePrimitives> {
139    /// Tracks the state of the blockchain tree.
140    tree_state: TreeState<N>,
141    /// Tracks the forkchoice state updates received by the CL.
142    forkchoice_state_tracker: ForkchoiceStateTracker,
143    /// Buffer of detached blocks.
144    buffer: BlockBuffer<N::Block>,
145    /// Tracks the header of invalid payloads that were rejected by the engine because they're
146    /// invalid.
147    invalid_headers: InvalidHeaderCache,
148}
149
150impl<N: NodePrimitives> EngineApiTreeState<N> {
151    fn new(
152        block_buffer_limit: u32,
153        max_invalid_header_cache_length: u32,
154        canonical_block: BlockNumHash,
155        engine_kind: EngineApiKind,
156    ) -> Self {
157        Self {
158            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
159            buffer: BlockBuffer::new(block_buffer_limit),
160            tree_state: TreeState::new(canonical_block, engine_kind),
161            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
162        }
163    }
164
165    /// Returns a reference to the tree state.
166    pub const fn tree_state(&self) -> &TreeState<N> {
167        &self.tree_state
168    }
169
170    /// Returns true if the block has been marked as invalid.
171    pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
172        self.invalid_headers.get(hash).is_some()
173    }
174}
175
176/// The outcome of a tree operation.
177#[derive(Debug)]
178pub struct TreeOutcome<T> {
179    /// The outcome of the operation.
180    pub outcome: T,
181    /// An optional event to tell the caller to do something.
182    pub event: Option<TreeEvent>,
183    /// Whether the block was already seen, meaning no real execution happened during this
184    /// `newPayload` call.
185    pub already_seen: bool,
186}
187
188impl<T> TreeOutcome<T> {
189    /// Create new tree outcome.
190    pub const fn new(outcome: T) -> Self {
191        Self { outcome, event: None, already_seen: false }
192    }
193
194    /// Set event on the outcome.
195    pub fn with_event(mut self, event: TreeEvent) -> Self {
196        self.event = Some(event);
197        self
198    }
199
200    /// Set the `already_seen` flag on the outcome.
201    pub const fn with_already_seen(mut self, value: bool) -> Self {
202        self.already_seen = value;
203        self
204    }
205}
206
207/// Result of trying to insert a new payload in [`EngineApiTreeHandler`].
208#[derive(Debug)]
209pub struct TryInsertPayloadResult {
210    /// - `Valid`: Payload successfully validated and inserted
211    /// - `Syncing`: Parent missing, payload buffered for later
212    /// - Error status: Payload is invalid
213    pub status: PayloadStatus,
214    /// Whether the block was already seen
215    pub already_seen: bool,
216}
217
218impl TryInsertPayloadResult {
219    /// Convert the result into a [`TreeOutcome`].
220    #[inline]
221    pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
222        TreeOutcome::new(self.status).with_already_seen(self.already_seen)
223    }
224}
225
226/// Events that are triggered by Tree Chain
227#[derive(Debug)]
228pub enum TreeEvent {
229    /// Tree action is needed.
230    TreeAction(TreeAction),
231    /// Backfill action is needed.
232    BackfillAction(BackfillAction),
233    /// Block download is needed.
234    Download(DownloadRequest),
235}
236
237impl TreeEvent {
238    /// Returns true if the event is a backfill action.
239    const fn is_backfill_action(&self) -> bool {
240        matches!(self, Self::BackfillAction(_))
241    }
242}
243
244/// The actions that can be performed on the tree.
245#[derive(Debug)]
246pub enum TreeAction {
247    /// Make target canonical.
248    MakeCanonical {
249        /// The sync target head hash
250        sync_target_head: B256,
251    },
252}
253
254/// The engine API tree handler implementation.
255///
256/// This type is responsible for processing engine API requests, maintaining the canonical state and
257/// emitting events.
258pub struct EngineApiTreeHandler<N, P, T, V, C>
259where
260    N: NodePrimitives,
261    T: PayloadTypes,
262    C: ConfigureEvm<Primitives = N> + 'static,
263{
264    provider: P,
265    consensus: Arc<dyn FullConsensus<N>>,
266    payload_validator: V,
267    /// Keeps track of internals such as executed and buffered blocks.
268    state: EngineApiTreeState<N>,
269    /// The half for sending messages to the engine.
270    ///
271    /// This is kept so that we can queue in messages to ourself that we can process later, for
272    /// example distributing workload across multiple messages that would otherwise take too long
273    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
274    /// them one by one so that we can handle incoming engine API in between and don't become
275    /// unresponsive. This can happen during live sync transition where we're trying to close the
276    /// gap (up to 3 epochs of blocks in the worst case).
277    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
278    /// Incoming engine API requests.
279    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
280    /// Outgoing events that are emitted to the handler.
281    outgoing: UnboundedSender<EngineApiEvent<N>>,
282    /// Channels to the persistence layer.
283    persistence: PersistenceHandle<N>,
284    /// Tracks the state changes of the persistence task.
285    persistence_state: PersistenceState,
286    /// Flag indicating the state of the node's backfill synchronization process.
287    backfill_sync_state: BackfillSyncState,
288    /// Keeps track of the state of the canonical chain that isn't persisted yet.
289    /// This is intended to be accessed from external sources, such as rpc.
290    canonical_in_memory_state: CanonicalInMemoryState<N>,
291    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
292    /// updates
293    payload_builder: PayloadBuilderHandle<T>,
294    /// Configuration settings.
295    config: TreeConfig,
296    /// Metrics for the engine api.
297    metrics: EngineApiMetrics,
298    /// The engine API variant of this handler
299    engine_kind: EngineApiKind,
300    /// The EVM configuration.
301    evm_config: C,
302    /// Changeset cache for in-memory trie changesets
303    changeset_cache: ChangesetCache,
304    /// Timing statistics for executed blocks, keyed by block hash.
305    /// Stored here (not in `ExecutedBlock`) to avoid leaking observability concerns into the block
306    /// type. Entries are removed when blocks are persisted or invalidated.
307    execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
308    /// Task runtime for spawning blocking work on named, reusable threads.
309    runtime: reth_tasks::Runtime,
310}
311
312impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
313    for EngineApiTreeHandler<N, P, T, V, C>
314where
315    N: NodePrimitives,
316    C: Debug + ConfigureEvm<Primitives = N>,
317{
318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319        f.debug_struct("EngineApiTreeHandler")
320            .field("provider", &self.provider)
321            .field("consensus", &self.consensus)
322            .field("payload_validator", &self.payload_validator)
323            .field("state", &self.state)
324            .field("incoming_tx", &self.incoming_tx)
325            .field("persistence", &self.persistence)
326            .field("persistence_state", &self.persistence_state)
327            .field("backfill_sync_state", &self.backfill_sync_state)
328            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
329            .field("payload_builder", &self.payload_builder)
330            .field("config", &self.config)
331            .field("metrics", &self.metrics)
332            .field("engine_kind", &self.engine_kind)
333            .field("evm_config", &self.evm_config)
334            .field("changeset_cache", &self.changeset_cache)
335            .field("execution_timing_stats", &self.execution_timing_stats.len())
336            .field("runtime", &self.runtime)
337            .finish()
338    }
339}
340
341impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
342where
343    N: NodePrimitives,
344    P: DatabaseProviderFactory
345        + BlockReader<Block = N::Block, Header = N::BlockHeader>
346        + StateProviderFactory
347        + StateReader<Receipt = N::Receipt>
348        + HashedPostStateProvider
349        + Clone
350        + 'static,
351    P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
352        + StageCheckpointReader
353        + ChangeSetReader
354        + StorageChangeSetReader
355        + StorageSettingsCache,
356    C: ConfigureEvm<Primitives = N> + 'static,
357    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
358    V: EngineValidator<T> + WaitForCaches,
359{
360    /// Creates a new [`EngineApiTreeHandler`].
361    #[expect(clippy::too_many_arguments)]
362    pub fn new(
363        provider: P,
364        consensus: Arc<dyn FullConsensus<N>>,
365        payload_validator: V,
366        outgoing: UnboundedSender<EngineApiEvent<N>>,
367        state: EngineApiTreeState<N>,
368        canonical_in_memory_state: CanonicalInMemoryState<N>,
369        persistence: PersistenceHandle<N>,
370        persistence_state: PersistenceState,
371        payload_builder: PayloadBuilderHandle<T>,
372        config: TreeConfig,
373        engine_kind: EngineApiKind,
374        evm_config: C,
375        changeset_cache: ChangesetCache,
376        runtime: reth_tasks::Runtime,
377    ) -> Self {
378        let (incoming_tx, incoming) = crossbeam_channel::unbounded();
379
380        Self {
381            provider,
382            consensus,
383            payload_validator,
384            incoming,
385            outgoing,
386            persistence,
387            persistence_state,
388            backfill_sync_state: BackfillSyncState::Idle,
389            state,
390            canonical_in_memory_state,
391            payload_builder,
392            config,
393            metrics: Default::default(),
394            incoming_tx,
395            engine_kind,
396            evm_config,
397            changeset_cache,
398            execution_timing_stats: HashMap::new(),
399            runtime,
400        }
401    }
402
403    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
404    /// own thread.
405    ///
406    /// Returns the sender through which incoming requests can be sent to the task and the receiver
407    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
408    #[expect(clippy::complexity)]
409    pub fn spawn_new(
410        provider: P,
411        consensus: Arc<dyn FullConsensus<N>>,
412        payload_validator: V,
413        persistence: PersistenceHandle<N>,
414        payload_builder: PayloadBuilderHandle<T>,
415        canonical_in_memory_state: CanonicalInMemoryState<N>,
416        config: TreeConfig,
417        kind: EngineApiKind,
418        evm_config: C,
419        changeset_cache: ChangesetCache,
420        runtime: reth_tasks::Runtime,
421    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
422    {
423        let best_block_number = provider.best_block_number().unwrap_or(0);
424        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
425
426        let persistence_state = PersistenceState {
427            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
428            rx: None,
429        };
430
431        let (tx, outgoing) = unbounded_channel();
432        let state = EngineApiTreeState::new(
433            config.block_buffer_limit(),
434            config.max_invalid_header_cache_length(),
435            header.num_hash(),
436            kind,
437        );
438
439        let task = Self::new(
440            provider,
441            consensus,
442            payload_validator,
443            tx,
444            state,
445            canonical_in_memory_state,
446            persistence,
447            persistence_state,
448            payload_builder,
449            config,
450            kind,
451            evm_config,
452            changeset_cache,
453            runtime,
454        );
455        let incoming = task.incoming_tx.clone();
456        spawn_os_thread("engine", || {
457            increase_thread_priority();
458            task.run()
459        });
460        (incoming, outgoing)
461    }
462
463    /// Returns a [`TreeOutcome`] indicating the forkchoice head is valid and canonical.
464    fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
465        TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
466            PayloadStatusEnum::Valid,
467            Some(state.head_block_hash),
468        )))
469    }
470
471    /// Returns a new [`Sender`] to send messages to this type.
472    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
473        self.incoming_tx.clone()
474    }
475
476    /// How many blocks the canonical tip is ahead of the last persisted block. A large gap means
477    /// persistence is falling behind execution.
478    const fn persistence_gap(&self) -> u64 {
479        self.state
480            .tree_state
481            .canonical_block_number()
482            .saturating_sub(self.persistence_state.last_persisted_block.number)
483    }
484
485    /// Returns `true` when the main loop should stop draining the tree input channel.
486    ///
487    /// This is the case when persistence is already running and the gap between the canonical tip
488    /// and the last persisted block has reached the configured threshold.
489    const fn should_backpressure(&self) -> bool {
490        self.persistence_state.in_progress() &&
491            self.persistence_gap() >= self.config.persistence_backpressure_threshold()
492    }
493
494    /// Run the engine API handler.
495    ///
496    /// This will block the current thread and process incoming messages.
497    pub fn run(mut self) {
498        loop {
499            // Each iteration has three phases:
500            //
501            // 1. Non-blocking poll for persistence completion. If the background flush already
502            //    landed, absorb the result now so the gap calculation below is fresh.
503            // 2. Decide how to wait for the next event. When the canonical-to-persisted gap exceeds
504            //    the backpressure threshold we only block on the persistence receiver, leaving new
505            //    engine requests sitting in the unbounded upstream channel.
506            // 3. Handle the event (engine message or persistence completion) and kick off a new
507            //    persistence cycle if the threshold is met again.
508            //
509            // The net effect: when the persistence gap exceeds the threshold, we stop
510            // processing incoming messages and let them queue in the channel. This is only a
511            // soft form of backpressure: it delays replies and, more importantly, prevents
512            // executing further blocks that would pile up in the persistence queue - where each
513            // block carries heavier state (eg. trie updates) than the raw payload sitting in the
514            // engine channel.
515            //
516            // Standard Ethereum CLs won't truly back off - the engine API has no
517            // backpressure semantics, and CLs typically timeout after ≈8s and resend - so
518            // this cannot prevent the incoming channel from growing under sustained load.
519            // But it shifts the bottleneck to the lighter-weight incoming queue rather than
520            // the costlier persistence pipeline. Other clients that respect reply latency
521            // can treat the delayed responses as a signal to chill out.
522            match self.try_poll_persistence() {
523                Ok(true) => {
524                    if let Err(err) = self.advance_persistence() {
525                        error!(target: "engine::tree", %err, "Advancing persistence failed");
526                        return
527                    }
528                    continue;
529                }
530                Ok(false) => {}
531                Err(err) => {
532                    error!(target: "engine::tree", %err, "Polling persistence failed");
533                    return
534                }
535            }
536
537            let event = if self.should_backpressure() {
538                self.metrics.engine.backpressure_active.set(1.0);
539                let stall_start = Instant::now();
540                let event = self.wait_for_persistence_event();
541                self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
542                event
543            } else {
544                self.metrics.engine.backpressure_active.set(0.0);
545                self.wait_for_event()
546            };
547
548            match event {
549                LoopEvent::EngineMessage(msg) => {
550                    debug!(target: "engine::tree", %msg, "received new engine message");
551                    match self.on_engine_message(msg) {
552                        Ok(ops::ControlFlow::Break(())) => return,
553                        Ok(ops::ControlFlow::Continue(())) => {}
554                        Err(fatal) => {
555                            error!(target: "engine::tree", %fatal, "insert block fatal error");
556                            return
557                        }
558                    }
559                }
560                LoopEvent::PersistenceComplete { result, start_time } => {
561                    if let Err(err) = self.on_persistence_complete(result, start_time) {
562                        error!(target: "engine::tree", %err, "Persistence complete handling failed");
563                        return
564                    }
565                }
566                LoopEvent::Disconnected => {
567                    error!(target: "engine::tree", "Channel disconnected");
568                    return
569                }
570            }
571
572            // Always check if we need to trigger new persistence after any event:
573            // - After engine messages: new blocks may have been inserted that exceed the
574            //   persistence threshold
575            // - After persistence completion: we can now persist more blocks if needed
576            if let Err(err) = self.advance_persistence() {
577                error!(target: "engine::tree", %err, "Advancing persistence failed");
578                return
579            }
580        }
581    }
582
583    /// Blocks until the in-flight persistence task completes, used when we are under
584    /// backpressure.
585    ///
586    /// Unlike `wait_for_event`, this deliberately does not read from the tree input channel. Any
587    /// requests sent to the tree remain queued upstream until persistence catches up.
588    fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
589        let maybe_persistence = self.persistence_state.rx.take();
590
591        if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
592            match persistence_rx.recv() {
593                Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
594                Err(_) => LoopEvent::Disconnected,
595            }
596        } else {
597            self.wait_for_event()
598        }
599    }
600
601    /// Blocks until the next event is ready: either an incoming engine message or a persistence
602    /// completion (if one is in progress).
603    ///
604    /// Uses biased selection to prioritize persistence completion to update in-memory state and
605    /// unblock further writes.
606    fn wait_for_event(&mut self) -> LoopEvent<T, N> {
607        // Take ownership of persistence rx if present
608        let maybe_persistence = self.persistence_state.rx.take();
609
610        if let Some((persistence_rx, start_time, action)) = maybe_persistence {
611            // Biased select prioritizes persistence completion to update in memory state and
612            // unblock further writes
613            crossbeam_channel::select_biased! {
614                recv(persistence_rx) -> result => {
615                    // Don't put it back - consumed (oneshot-like behavior)
616                    match result {
617                        Ok(result) => LoopEvent::PersistenceComplete {
618                            result,
619                            start_time,
620                        },
621                        Err(_) => LoopEvent::Disconnected,
622                    }
623                },
624                recv(self.incoming) -> msg => {
625                    // Put the persistence rx back - we didn't consume it
626                    self.persistence_state.rx = Some((persistence_rx, start_time, action));
627                    match msg {
628                        Ok(m) => LoopEvent::EngineMessage(m),
629                        Err(_) => LoopEvent::Disconnected,
630                    }
631                },
632            }
633        } else {
634            // No persistence in progress - just wait on incoming
635            match self.incoming.recv() {
636                Ok(m) => LoopEvent::EngineMessage(m),
637                Err(_) => LoopEvent::Disconnected,
638            }
639        }
640    }
641
642    /// Invoked when previously requested blocks were downloaded.
643    ///
644    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
645    /// will execute the first batch and send the remaining blocks back through the channel so that
646    /// block request processing isn't blocked for a long time.
647    fn on_downloaded(
648        &mut self,
649        mut blocks: Vec<SealedBlock<N::Block>>,
650    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
651        if blocks.is_empty() {
652            // nothing to execute
653            return Ok(None)
654        }
655
656        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
657        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
658        for block in blocks.drain(..batch) {
659            if let Some(event) = self.on_downloaded_block(block)? {
660                let needs_backfill = event.is_backfill_action();
661                self.on_tree_event(event)?;
662                if needs_backfill {
663                    // can exit early if backfill is needed
664                    return Ok(None)
665                }
666            }
667        }
668
669        // if we still have blocks to execute, send them as a followup request
670        if !blocks.is_empty() {
671            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
672        }
673
674        Ok(None)
675    }
676
677    /// When the Consensus layer receives a new block via the consensus gossip protocol,
678    /// the transactions in the block are sent to the execution layer in the form of a
679    /// [`PayloadTypes::ExecutionData`], for example
680    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
681    /// Execution layer executes the transactions and validates the state in the block header,
682    /// then passes validation data back to Consensus layer, that adds the block to the head of
683    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
684    /// network in the form of a "Beacon block".
685    ///
686    /// These responses should adhere to the [Engine API Spec for
687    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
688    ///
689    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
690    /// returns an error if an internal error occurred.
691    #[instrument(
692        level = "debug",
693        target = "engine::tree",
694        skip_all,
695        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
696    )]
697    fn on_new_payload(
698        &mut self,
699        payload: T::ExecutionData,
700    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
701        trace!(target: "engine::tree", "invoked new payload");
702
703        // start timing for the new payload process
704        let start = Instant::now();
705
706        // Ensures that the given payload does not violate any consensus rules that concern the
707        // block's layout, like:
708        //    - missing or invalid base fee
709        //    - invalid extra data
710        //    - invalid transactions
711        //    - incorrect hash
712        //    - the versioned hashes passed with the payload do not exactly match transaction
713        //      versioned hashes
714        //    - the block does not contain blob transactions if it is pre-cancun
715        //
716        // This validates the following engine API rule:
717        //
718        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
719        //    validation by taking the following steps:
720        //
721        //   1. Obtain the actual array by concatenating blob versioned hashes lists
722        //      (`tx.blob_versioned_hashes`) of each [blob
723        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
724        //      in the payload, respecting the order of inclusion. If the payload has no blob
725        //      transactions the expected array **MUST** be `[]`.
726        //
727        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
728        //      null}` if the expected and the actual arrays don't match.
729        //
730        // This validation **MUST** be instantly run in all cases even during active sync process.
731
732        let num_hash = payload.num_hash();
733        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
734        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
735
736        let block_hash = num_hash.hash;
737
738        // Check for invalid ancestors
739        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
740            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
741            return Ok(TreeOutcome::new(status));
742        }
743
744        // record pre-execution phase duration
745        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
746
747        let mut outcome = if self.backfill_sync_state.is_idle() {
748            self.try_insert_payload(payload)?.into_outcome()
749        } else {
750            TreeOutcome::new(self.try_buffer_payload(payload)?)
751        };
752
753        // if the block is valid and it is the current sync target head, make it canonical
754        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
755            // Only create the canonical event if this block isn't already the canonical head
756            if self.state.tree_state.canonical_block_hash() != block_hash {
757                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
758                    sync_target_head: block_hash,
759                }));
760            }
761        }
762
763        // record total newPayload duration
764        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
765
766        Ok(outcome)
767    }
768
769    /// Processes a payload during normal sync operation.
770    #[instrument(level = "debug", target = "engine::tree", skip_all)]
771    fn try_insert_payload(
772        &mut self,
773        payload: T::ExecutionData,
774    ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
775        let block_hash = payload.block_hash();
776        let num_hash = payload.num_hash();
777        let parent_hash = payload.parent_hash();
778        let mut latest_valid_hash = None;
779
780        match self.insert_payload(payload) {
781            Ok(status) => {
782                let (status, already_seen) = match status {
783                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
784                        latest_valid_hash = Some(block_hash);
785                        self.try_connect_buffered_blocks(num_hash)?;
786                        (PayloadStatusEnum::Valid, false)
787                    }
788                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
789                        latest_valid_hash = Some(block_hash);
790                        (PayloadStatusEnum::Valid, true)
791                    }
792                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
793                        (PayloadStatusEnum::Syncing, false)
794                    }
795                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
796                        // not known to be invalid, but we don't know anything else
797                        (PayloadStatusEnum::Syncing, true)
798                    }
799                };
800
801                Ok(TryInsertPayloadResult {
802                    status: PayloadStatus::new(status, latest_valid_hash),
803                    already_seen,
804                })
805            }
806            Err(error) => {
807                let status = match error {
808                    InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
809                    InsertPayloadError::Payload(error) => {
810                        self.on_new_payload_error(error, num_hash, parent_hash)?
811                    }
812                };
813
814                Ok(TryInsertPayloadResult { status, already_seen: false })
815            }
816        }
817    }
818
819    /// Stores a payload for later processing during backfill sync.
820    ///
821    /// During backfill, the node lacks the state needed to validate payloads,
822    /// so they are buffered (stored in memory) until their parent blocks are synced.
823    ///
824    /// Returns:
825    /// - `Syncing`: Payload successfully buffered
826    /// - Error status: Payload is malformed or invalid
827    fn try_buffer_payload(
828        &mut self,
829        payload: T::ExecutionData,
830    ) -> Result<PayloadStatus, InsertBlockFatalError> {
831        let parent_hash = payload.parent_hash();
832        let num_hash = payload.num_hash();
833
834        match self.payload_validator.convert_payload_to_block(payload) {
835            // if the block is well-formed, buffer it for later
836            Ok(block) => {
837                if let Err(error) = self.buffer_block(block) {
838                    Ok(self.on_insert_block_error(error)?)
839                } else {
840                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
841                }
842            }
843            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
844        }
845    }
846
847    /// Returns the new chain for the given head.
848    ///
849    /// This also handles reorgs.
850    ///
851    /// Note: This does not update the tracked state and instead returns the new chain based on the
852    /// given head.
853    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
854        // get the executed new head block
855        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
856            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
857            self.metrics.engine.executed_new_block_cache_miss.increment(1);
858            return Ok(None)
859        };
860
861        let new_head_number = new_head_block.recovered_block().number();
862        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
863
864        let mut new_chain = vec![new_head_block.clone()];
865        let mut current_hash = new_head_block.recovered_block().parent_hash();
866        let mut current_number = new_head_number - 1;
867
868        // Walk back the new chain until we reach a block we know about
869        //
870        // This is only done for in-memory blocks, because we should not have persisted any blocks
871        // that are _above_ the current canonical head.
872        while current_number > current_canonical_number {
873            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
874            {
875                current_hash = block.recovered_block().parent_hash();
876                current_number -= 1;
877                new_chain.push(block);
878            } else {
879                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
880                // This should never happen as we're walking back a chain that should connect to
881                // the canonical chain
882                return Ok(None)
883            }
884        }
885
886        // If we have reached the current canonical head by walking back from the target, then we
887        // know this represents an extension of the canonical chain.
888        if current_hash == self.state.tree_state.current_canonical_head.hash {
889            new_chain.reverse();
890
891            // Simple extension of the current chain
892            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
893        }
894
895        // We have a reorg. Walk back both chains to find the fork point.
896        let mut old_chain = Vec::new();
897        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
898
899        // If the canonical chain is ahead of the new chain,
900        // gather all blocks until new head number.
901        while current_canonical_number > current_number {
902            let block = self.canonical_block_by_hash(old_hash)?;
903            old_hash = block.recovered_block().parent_hash();
904            old_chain.push(block);
905            current_canonical_number -= 1;
906        }
907
908        // Both new and old chain pointers are now at the same height.
909        debug_assert_eq!(current_number, current_canonical_number);
910
911        // Walk both chains from specified hashes at same height until
912        // a common ancestor (fork block) is reached.
913        while old_hash != current_hash {
914            let block = self.canonical_block_by_hash(old_hash)?;
915            old_hash = block.recovered_block().parent_hash();
916            old_chain.push(block);
917
918            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
919            {
920                current_hash = block.recovered_block().parent_hash();
921                new_chain.push(block);
922            } else {
923                // This shouldn't happen as we've already walked this path
924                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
925                return Ok(None)
926            }
927        }
928        new_chain.reverse();
929        old_chain.reverse();
930
931        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
932    }
933
934    /// Updates the latest block state to the specified canonical ancestor.
935    ///
936    /// This method ensures that the latest block tracks the given canonical header by resetting
937    ///
938    /// # Arguments
939    /// * `canonical_header` - The canonical header to set as the new head
940    ///
941    /// # Returns
942    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
943    ///
944    /// Caution: This unwinds the canonical chain
945    fn update_latest_block_to_canonical_ancestor(
946        &mut self,
947        canonical_header: &SealedHeader<N::BlockHeader>,
948    ) -> ProviderResult<()> {
949        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
950        let current_head_number = self.state.tree_state.canonical_block_number();
951        let new_head_number = canonical_header.number();
952        let new_head_hash = canonical_header.hash();
953
954        // Update tree state with the new canonical head
955        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
956
957        // Handle the state update based on whether this is an unwind scenario
958        if new_head_number < current_head_number {
959            debug!(
960                target: "engine::tree",
961                current_head = current_head_number,
962                new_head = new_head_number,
963                new_head_hash = ?new_head_hash,
964                "FCU unwind detected: reverting to canonical ancestor"
965            );
966
967            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
968        } else {
969            debug!(
970                target: "engine::tree",
971                previous_head = current_head_number,
972                new_head = new_head_number,
973                new_head_hash = ?new_head_hash,
974                "Advancing latest block to canonical ancestor"
975            );
976            self.handle_chain_advance_or_same_height(canonical_header)
977        }
978    }
979
980    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
981    /// to the canonical header
982    fn handle_canonical_chain_unwind(
983        &self,
984        current_head_number: u64,
985        canonical_header: &SealedHeader<N::BlockHeader>,
986    ) -> ProviderResult<()> {
987        let new_head_number = canonical_header.number();
988        debug!(
989            target: "engine::tree",
990            from = current_head_number,
991            to = new_head_number,
992            "Handling unwind: collecting blocks to remove from in-memory state"
993        );
994
995        // Collect blocks that need to be removed from memory
996        let old_blocks =
997            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
998
999        // Load and apply the canonical ancestor block
1000        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1001    }
1002
1003    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
1004    fn collect_blocks_for_canonical_unwind(
1005        &self,
1006        new_head_number: u64,
1007        current_head_number: u64,
1008    ) -> Vec<ExecutedBlock<N>> {
1009        let mut old_blocks =
1010            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1011
1012        for block_num in (new_head_number + 1)..=current_head_number {
1013            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1014                let executed_block = block_state.block_ref().clone();
1015                old_blocks.push(executed_block);
1016                debug!(
1017                    target: "engine::tree",
1018                    block_number = block_num,
1019                    "Collected block for removal from in-memory state"
1020                );
1021            }
1022        }
1023
1024        if old_blocks.is_empty() {
1025            debug!(
1026                target: "engine::tree",
1027                "No blocks found in memory to remove, will clear and reset state"
1028            );
1029        }
1030
1031        old_blocks
1032    }
1033
1034    /// Applies the canonical ancestor block via a reorg operation.
1035    fn apply_canonical_ancestor_via_reorg(
1036        &self,
1037        canonical_header: &SealedHeader<N::BlockHeader>,
1038        old_blocks: Vec<ExecutedBlock<N>>,
1039    ) -> ProviderResult<()> {
1040        let new_head_hash = canonical_header.hash();
1041        let new_head_number = canonical_header.number();
1042
1043        // Load the canonical ancestor's block
1044        let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1045        // Perform the reorg to properly handle the unwind
1046        self.canonical_in_memory_state
1047            .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1048
1049        // CRITICAL: Update the canonical head after the reorg
1050        // This ensures get_canonical_head() returns the correct block
1051        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1052
1053        debug!(
1054            target: "engine::tree",
1055            block_number = new_head_number,
1056            block_hash = ?new_head_hash,
1057            "Successfully loaded canonical ancestor into memory via reorg"
1058        );
1059
1060        Ok(())
1061    }
1062
1063    /// Handles chain advance or same height scenarios.
1064    fn handle_chain_advance_or_same_height(
1065        &self,
1066        canonical_header: &SealedHeader<N::BlockHeader>,
1067    ) -> ProviderResult<()> {
1068        // Load the block into memory if it's not already present
1069        self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1070
1071        // Update the canonical head header
1072        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1073
1074        Ok(())
1075    }
1076
1077    /// Ensures a block is loaded into memory if not already present.
1078    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1079        // Check if block is already in memory
1080        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1081            return Ok(());
1082        }
1083
1084        // Load the block from storage
1085        let executed_block = self.canonical_block_by_hash(block_hash)?;
1086        self.canonical_in_memory_state
1087            .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1088
1089        debug!(
1090            target: "engine::tree",
1091            block_number,
1092            block_hash = ?block_hash,
1093            "Added canonical block to in-memory state"
1094        );
1095
1096        Ok(())
1097    }
1098
1099    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1100    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1101    /// chain.
1102    ///
1103    /// These responses should adhere to the [Engine API Spec for
1104    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1105    ///
1106    /// Returns an error if an internal error occurred like a database error.
1107    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1108    fn on_forkchoice_updated(
1109        &mut self,
1110        state: ForkchoiceState,
1111        attrs: Option<T::PayloadAttributes>,
1112    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1113        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1114
1115        // Record metrics
1116        self.record_forkchoice_metrics();
1117
1118        // Pre-validation of forkchoice state
1119        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1120            return Ok(TreeOutcome::new(early_result));
1121        }
1122
1123        // Return early if we are on the correct fork
1124        if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1125            return Ok(result);
1126        }
1127
1128        // Attempt to apply a chain update when the head differs from our canonical chain.
1129        // This handles reorgs and chain extensions by making the specified head canonical.
1130        if let Some(result) = self.apply_chain_update(state, &attrs)? {
1131            return Ok(result);
1132        }
1133
1134        // Fallback that ensures to catch up to the network's state.
1135        self.handle_missing_block(state)
1136    }
1137
1138    /// Records metrics for forkchoice updated calls
1139    fn record_forkchoice_metrics(&self) {
1140        self.canonical_in_memory_state.on_forkchoice_update_received();
1141    }
1142
1143    /// Pre-validates the forkchoice state and returns early if validation fails.
1144    ///
1145    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1146    /// returned. Returns `None` if validation passes and processing should continue.
1147    fn validate_forkchoice_state(
1148        &mut self,
1149        state: ForkchoiceState,
1150    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1151        if state.head_block_hash.is_zero() {
1152            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1153        }
1154
1155        // Check if the new head hash is connected to any ancestor that we previously marked as
1156        // invalid
1157        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1158        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1159            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1160        }
1161
1162        if !self.backfill_sync_state.is_idle() {
1163            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1164            // exclusive access to the database
1165            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1166            return Ok(Some(OnForkChoiceUpdated::syncing()));
1167        }
1168
1169        Ok(None)
1170    }
1171
1172    /// Handles the case where the forkchoice head is already canonical.
1173    ///
1174    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1175    /// processing is complete. Returns `None` if the head is not canonical and processing
1176    /// should continue.
1177    fn handle_canonical_head(
1178        &self,
1179        state: ForkchoiceState,
1180        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1181    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1182        // Process the forkchoice update by trying to make the head block canonical
1183        //
1184        // We can only process this forkchoice update if:
1185        // - we have the `head` block
1186        // - the head block is part of a chain that is connected to the canonical chain. This
1187        //   includes reorgs.
1188        //
1189        // Performing a FCU involves:
1190        // - marking the FCU's head block as canonical
1191        // - updating in memory state to reflect the new canonical chain
1192        // - updating canonical state trackers
1193        // - emitting a canonicalization event for the new chain (including reorg)
1194        // - if we have payload attributes, delegate them to the payload service
1195
1196        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1197            return Ok(None);
1198        }
1199
1200        trace!(target: "engine::tree", "fcu head hash is already canonical");
1201
1202        // Update the safe and finalized blocks and ensure their values are valid
1203        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1204            // safe or finalized hashes are invalid
1205            return Ok(Some(TreeOutcome::new(outcome)));
1206        }
1207
1208        // Process payload attributes if the head is already canonical
1209        if let Some(attr) = attrs {
1210            let tip = self
1211                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1212                .ok_or_else(|| {
1213                    // If we can't find the canonical block, then something is wrong and we need
1214                    // to return an error
1215                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1216                })?;
1217            // Clone only when we actually need to process the attributes
1218            let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1219            return Ok(Some(TreeOutcome::new(updated)));
1220        }
1221
1222        // The head block is already canonical
1223        Ok(Some(Self::valid_outcome(state)))
1224    }
1225
1226    /// Applies chain update for the new head block and processes payload attributes.
1227    ///
1228    /// This method handles the case where the forkchoice head differs from our current canonical
1229    /// head. It attempts to make the specified head block canonical by:
1230    /// - Checking if the head is already part of the canonical chain
1231    /// - Applying chain reorganizations (reorgs) if necessary
1232    /// - Processing payload attributes if provided
1233    /// - Returning the appropriate forkchoice update response
1234    ///
1235    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1236    /// Returns `None` if no chain update was needed or possible.
1237    fn apply_chain_update(
1238        &mut self,
1239        state: ForkchoiceState,
1240        attrs: &Option<T::PayloadAttributes>,
1241    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1242        // Check if the head is already part of the canonical chain
1243        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1244            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1245
1246            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1247            // own chain at will, so we need to always trigger a new payload job if requested.
1248            if self.engine_kind.is_opstack() ||
1249                self.config.always_process_payload_attributes_on_canonical_head()
1250            {
1251                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1252                // part of the canonical chain. We need to update the latest block state to reflect
1253                // the canonical ancestor. This ensures that state providers and the transaction
1254                // pool operate with the correct chain state after forkchoice update processing, and
1255                // new payloads built on the reorg'd head will be added to the tree immediately.
1256                if self.config.unwind_canonical_header() {
1257                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1258                }
1259
1260                if let Some(attr) = attrs {
1261                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1262                    // Clone only when we actually need to process the attributes
1263                    let updated =
1264                        self.process_payload_attributes(attr.clone(), &canonical_header, state);
1265                    return Ok(Some(TreeOutcome::new(updated)));
1266                }
1267            }
1268
1269            // According to the Engine API specification, client software MAY skip an update of the
1270            // forkchoice state and MUST NOT begin a payload build process if
1271            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1272            // of canonical chain, i.e. the ancestor passed payload validation process
1273            // and deemed `VALID`. In the case of such an event, client software MUST
1274            // return `{payloadStatus: {status: VALID, latestValidHash:
1275            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1276
1277            // The head block is already canonical and we're not processing payload attributes,
1278            // so we're not triggering a payload job and can return right away
1279            return Ok(Some(Self::valid_outcome(state)));
1280        }
1281
1282        // Ensure we can apply a new chain update for the head block
1283        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1284            let tip = chain_update.tip().clone_sealed_header();
1285            self.on_canonical_chain_update(chain_update);
1286
1287            // Update the safe and finalized blocks and ensure their values are valid
1288            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1289                // safe or finalized hashes are invalid
1290                return Ok(Some(TreeOutcome::new(outcome)));
1291            }
1292
1293            if let Some(attr) = attrs {
1294                // Clone only when we actually need to process the attributes
1295                let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1296                return Ok(Some(TreeOutcome::new(updated)));
1297            }
1298
1299            return Ok(Some(Self::valid_outcome(state)));
1300        }
1301
1302        Ok(None)
1303    }
1304
1305    /// Handles the case where the head block is missing and needs to be downloaded.
1306    ///
1307    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1308    /// Returns a `TreeOutcome` with syncing status and download event.
1309    fn handle_missing_block(
1310        &self,
1311        state: ForkchoiceState,
1312    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1313        // We don't have the block to perform the forkchoice update
1314        // We assume the FCU is valid and at least the head is missing,
1315        // so we need to start syncing to it
1316        //
1317        // find the appropriate target to sync to, if we don't have the safe block hash then we
1318        // start syncing to the safe block via backfill first
1319        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1320        // check that safe block is valid and missing
1321        !state.safe_block_hash.is_zero() &&
1322        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1323        {
1324            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1325            state.safe_block_hash
1326        } else {
1327            state.head_block_hash
1328        };
1329
1330        let target = self.lowest_buffered_ancestor_or(target);
1331        trace!(target: "engine::tree", %target, "downloading missing block");
1332
1333        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1334            PayloadStatusEnum::Syncing,
1335        )))
1336        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1337    }
1338
1339    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1340    /// the current persistence action while we're removing blocks.
1341    fn remove_blocks(&mut self, new_tip_num: u64) {
1342        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1343        if new_tip_num < self.persistence_state.last_persisted_block.number {
1344            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1345            let (tx, rx) = crossbeam_channel::bounded(1);
1346            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1347            self.persistence_state.start_remove(new_tip_num, rx);
1348        }
1349    }
1350
1351    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1352    /// the current persistence action while we're saving blocks.
1353    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1354        if blocks_to_persist.is_empty() {
1355            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1356            return
1357        }
1358
1359        // NOTE: checked non-empty above
1360        let highest_num_hash = blocks_to_persist
1361            .iter()
1362            .max_by_key(|block| block.recovered_block().number())
1363            .map(|b| b.recovered_block().num_hash())
1364            .expect("Checked non-empty persisting blocks");
1365
1366        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1367        let (tx, rx) = crossbeam_channel::bounded(1);
1368        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1369
1370        self.persistence_state.start_save(highest_num_hash, rx);
1371    }
1372
1373    /// Triggers new persistence actions if no persistence task is currently in progress.
1374    ///
1375    /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
1376    /// Persistence completion is handled separately via the `wait_for_event` method.
1377    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1378        if !self.persistence_state.in_progress() {
1379            if let Some(new_tip_num) = self.find_disk_reorg()? {
1380                self.remove_blocks(new_tip_num)
1381            } else if self.should_persist() {
1382                let blocks_to_persist =
1383                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1384                self.persist_blocks(blocks_to_persist);
1385            }
1386        }
1387
1388        Ok(())
1389    }
1390
1391    /// Finishes termination by persisting all remaining blocks and signaling completion.
1392    ///
1393    /// This blocks until all persistence is complete. Always signals completion,
1394    /// even if an error occurs.
1395    fn finish_termination(
1396        &mut self,
1397        pending_termination: oneshot::Sender<()>,
1398    ) -> Result<(), AdvancePersistenceError> {
1399        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1400        let result = self.persist_until_complete();
1401        let _ = pending_termination.send(());
1402        result
1403    }
1404
1405    /// Persists all remaining blocks until none are left.
1406    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1407        loop {
1408            // Wait for any in-progress persistence to complete (blocking)
1409            if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1410                debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1411                let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1412                self.on_persistence_complete(result, start_time)?;
1413            }
1414
1415            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1416
1417            if blocks_to_persist.is_empty() {
1418                debug!(target: "engine::tree", "persistence complete, signaling termination");
1419                return Ok(())
1420            }
1421
1422            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1423            self.persist_blocks(blocks_to_persist);
1424        }
1425    }
1426
1427    /// Tries to poll for a completed persistence task (non-blocking).
1428    ///
1429    /// Returns `true` if a persistence task was completed, `false` otherwise.
1430    fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1431        let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1432            return Ok(false);
1433        };
1434
1435        match rx.try_recv() {
1436            Ok(result) => {
1437                self.on_persistence_complete(result, start_time)?;
1438                Ok(true)
1439            }
1440            Err(crossbeam_channel::TryRecvError::Empty) => {
1441                // Not ready yet, put it back
1442                self.persistence_state.rx = Some((rx, start_time, action));
1443                Ok(false)
1444            }
1445            Err(crossbeam_channel::TryRecvError::Disconnected) => {
1446                Err(AdvancePersistenceError::ChannelClosed)
1447            }
1448        }
1449    }
1450
1451    /// Handles a completed persistence task.
1452    fn on_persistence_complete(
1453        &mut self,
1454        result: PersistenceResult,
1455        start_time: Instant,
1456    ) -> Result<(), AdvancePersistenceError> {
1457        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1458
1459        let commit_duration = result.commit_duration;
1460        let Some(BlockNumHash {
1461            hash: last_persisted_block_hash,
1462            number: last_persisted_block_number,
1463        }) = result.last_block
1464        else {
1465            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1466            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1467            return Ok(())
1468        };
1469
1470        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1471        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1472
1473        // Evict trie changesets for blocks below the eviction threshold.
1474        // Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
1475        // the finalized block if set.
1476        let min_threshold =
1477            last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1478        let eviction_threshold =
1479            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1480                // Use the minimum of finalized block and retention threshold to be conservative
1481                finalized.number.min(min_threshold)
1482            } else {
1483                // When finalized is not set (e.g., on L2s), use the retention threshold
1484                min_threshold
1485            };
1486        debug!(
1487            target: "engine::tree",
1488            last_persisted = last_persisted_block_number,
1489            finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1490            eviction_threshold,
1491            "Evicting changesets below threshold"
1492        );
1493        self.changeset_cache.evict(eviction_threshold);
1494
1495        // Invalidate cached overlay since the anchor has changed
1496        self.state.tree_state.invalidate_cached_overlay();
1497
1498        self.on_new_persisted_block()?;
1499
1500        // Re-prepare overlay for the current canonical head with the new anchor.
1501        // Spawn a background task to trigger computation so it's ready when the next payload
1502        // arrives.
1503        if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1504            self.runtime.spawn_blocking_named("prepare-overlay", move || {
1505                let _ = overlay.get();
1506            });
1507        }
1508
1509        self.purge_timing_stats(last_persisted_block_number, commit_duration);
1510
1511        Ok(())
1512    }
1513
1514    /// Handles a message from the engine.
1515    ///
1516    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1517    fn on_engine_message(
1518        &mut self,
1519        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1520    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1521        match msg {
1522            FromEngine::Event(event) => match event {
1523                FromOrchestrator::BackfillSyncStarted => {
1524                    debug!(target: "engine::tree", "received backfill sync started event");
1525                    self.backfill_sync_state = BackfillSyncState::Active;
1526                }
1527                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1528                    self.on_backfill_sync_finished(ctrl)?;
1529                }
1530                FromOrchestrator::Terminate { tx } => {
1531                    debug!(target: "engine::tree", "received terminate request");
1532                    if let Err(err) = self.finish_termination(tx) {
1533                        error!(target: "engine::tree", %err, "Termination failed");
1534                    }
1535                    return Ok(ops::ControlFlow::Break(()))
1536                }
1537            },
1538            FromEngine::Request(request) => {
1539                match request {
1540                    EngineApiRequest::InsertExecutedBlock(block) => {
1541                        let block_num_hash = block.recovered_block().num_hash();
1542                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1543                            // outdated block that can be skipped
1544                            return Ok(ops::ControlFlow::Continue(()))
1545                        }
1546
1547                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1548                        let now = Instant::now();
1549
1550                        // if the parent is the canonical head, we can insert the block as the
1551                        // pending block
1552                        if self.state.tree_state.canonical_block_hash() ==
1553                            block.recovered_block().parent_hash()
1554                        {
1555                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1556                            self.canonical_in_memory_state.set_pending_block(block.clone());
1557                        }
1558
1559                        self.state.tree_state.insert_executed(block.clone());
1560                        self.payload_validator.on_inserted_executed_block(block.clone());
1561                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1562                        self.emit_event(EngineApiEvent::BeaconConsensus(
1563                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1564                        ));
1565                    }
1566                    EngineApiRequest::Beacon(request) => {
1567                        match request {
1568                            BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1569                                let has_attrs = payload_attrs.is_some();
1570
1571                                let start = Instant::now();
1572                                let mut output = self.on_forkchoice_updated(state, payload_attrs);
1573
1574                                if let Ok(res) = &mut output {
1575                                    // track last received forkchoice state
1576                                    self.state
1577                                        .forkchoice_state_tracker
1578                                        .set_latest(state, res.outcome.forkchoice_status());
1579
1580                                    // emit an event about the handled FCU
1581                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1582                                        state,
1583                                        res.outcome.forkchoice_status(),
1584                                    ));
1585
1586                                    // handle the event if any
1587                                    self.on_maybe_tree_event(res.event.take())?;
1588                                }
1589
1590                                if let Err(ref err) = output {
1591                                    error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1592                                }
1593
1594                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1595                                    start,
1596                                    &mut self.metrics.engine.new_payload.latest_finish_at,
1597                                    has_attrs,
1598                                    &output,
1599                                );
1600
1601                                if let Err(err) =
1602                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1603                                {
1604                                    self.metrics
1605                                        .engine
1606                                        .failed_forkchoice_updated_response_deliveries
1607                                        .increment(1);
1608                                    warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1609                                }
1610                            }
1611                            BeaconEngineMessage::NewPayload { payload, tx } => {
1612                                let start = Instant::now();
1613                                let gas_used = payload.gas_used();
1614                                let num_hash = payload.num_hash();
1615                                let mut output = self.on_new_payload(payload);
1616                                self.metrics.engine.new_payload.update_response_metrics(
1617                                    start,
1618                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1619                                    &output,
1620                                    gas_used,
1621                                );
1622
1623                                let maybe_event =
1624                                    output.as_mut().ok().and_then(|out| out.event.take());
1625
1626                                // emit response
1627                                if let Err(err) =
1628                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1629                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1630                                    }))
1631                                {
1632                                    warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1633                                    self.metrics
1634                                        .engine
1635                                        .failed_new_payload_response_deliveries
1636                                        .increment(1);
1637                                }
1638
1639                                // handle the event if any
1640                                self.on_maybe_tree_event(maybe_event)?;
1641                            }
1642                            BeaconEngineMessage::RethNewPayload {
1643                                payload,
1644                                wait_for_persistence,
1645                                wait_for_caches,
1646                                tx,
1647                                enqueued_at,
1648                            } => {
1649                                debug!(
1650                                    target: "engine::tree",
1651                                    wait_for_persistence,
1652                                    wait_for_caches,
1653                                    "Processing reth_newPayload"
1654                                );
1655
1656                                let backpressure_wait = enqueued_at.elapsed();
1657
1658                                let explicit_persistence_wait = if wait_for_persistence {
1659                                    let pending_persistence = self.persistence_state.rx.take();
1660                                    if let Some((rx, start_time, _action)) = pending_persistence {
1661                                        let (persistence_tx, persistence_rx) =
1662                                            std::sync::mpsc::channel();
1663                                        self.runtime.spawn_blocking_named(
1664                                            "wait-persist",
1665                                            move || {
1666                                                let start = Instant::now();
1667                                                let result = rx
1668                                                    .recv()
1669                                                    .expect("persistence state channel closed");
1670                                                let _ = persistence_tx.send((
1671                                                    result,
1672                                                    start_time,
1673                                                    start.elapsed(),
1674                                                ));
1675                                            },
1676                                        );
1677                                        let (result, start_time, wait_duration) = persistence_rx
1678                                            .recv()
1679                                            .expect("persistence result channel closed");
1680                                        let _ = self.on_persistence_complete(result, start_time);
1681                                        wait_duration
1682                                    } else {
1683                                        Duration::ZERO
1684                                    }
1685                                } else {
1686                                    Duration::ZERO
1687                                };
1688
1689                                let cache_wait = wait_for_caches
1690                                    .then(|| self.payload_validator.wait_for_caches());
1691
1692                                let start = Instant::now();
1693                                let gas_used = payload.gas_used();
1694                                let num_hash = payload.num_hash();
1695                                let mut output = self.on_new_payload(payload);
1696                                self.metrics.engine.new_payload.update_response_metrics(
1697                                    start,
1698                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1699                                    &output,
1700                                    gas_used,
1701                                );
1702
1703                                // Latency measures time from enqueue to completion, excluding
1704                                // only the explicit persistence wait. This means backpressure
1705                                // (time spent queued due to the engine being busy) is included,
1706                                // reflecting real-world engine responsiveness.
1707                                let latency = enqueued_at.elapsed() - explicit_persistence_wait;
1708
1709                                let maybe_event =
1710                                    output.as_mut().ok().and_then(|out| out.event.take());
1711
1712                                let timings = NewPayloadTimings {
1713                                    latency,
1714                                    persistence_wait: backpressure_wait + explicit_persistence_wait,
1715                                    execution_cache_wait: cache_wait
1716                                        .map(|wait| wait.execution_cache),
1717                                    sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1718                                };
1719                                if let Err(err) =
1720                                    tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1721                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1722                                    }))
1723                                {
1724                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1725                                    self.metrics
1726                                        .engine
1727                                        .failed_new_payload_response_deliveries
1728                                        .increment(1);
1729                                }
1730
1731                                self.on_maybe_tree_event(maybe_event)?;
1732                            }
1733                        }
1734                    }
1735                }
1736            }
1737            FromEngine::DownloadedBlocks(blocks) => {
1738                if let Some(event) = self.on_downloaded(blocks)? {
1739                    self.on_tree_event(event)?;
1740                }
1741            }
1742        }
1743        Ok(ops::ControlFlow::Continue(()))
1744    }
1745
1746    /// Invoked if the backfill sync has finished to target.
1747    ///
1748    /// At this point we consider the block synced to the backfill target.
1749    ///
1750    /// Checks the tracked finalized block against the block on disk and requests another backfill
1751    /// run if the distance to the tip exceeds the threshold for another backfill run.
1752    ///
1753    /// This will also do the necessary housekeeping of the tree state, this includes:
1754    ///  - removing all blocks below the backfill height
1755    ///  - resetting the canonical in-memory state
1756    ///
1757    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1758    /// target block.
1759    fn on_backfill_sync_finished(
1760        &mut self,
1761        ctrl: ControlFlow,
1762    ) -> Result<(), InsertBlockFatalError> {
1763        debug!(target: "engine::tree", "received backfill sync finished event");
1764        self.backfill_sync_state = BackfillSyncState::Idle;
1765
1766        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1767        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1768            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1769            // update the `invalid_headers` cache with the new invalid header
1770            self.state.invalid_headers.insert(**bad_block);
1771
1772            // if this was an unwind then the target is the new height
1773            Some(*target)
1774        } else {
1775            // backfill height is the block number that the backfill finished at
1776            ctrl.block_number()
1777        };
1778
1779        // backfill height is the block number that the backfill finished at
1780        let Some(backfill_height) = backfill_height else { return Ok(()) };
1781
1782        // state house keeping after backfill sync
1783        // remove all executed blocks below the backfill height
1784        //
1785        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1786        // before that
1787        let Some(backfill_num_hash) = self
1788            .provider
1789            .block_hash(backfill_height)?
1790            .map(|hash| BlockNumHash { hash, number: backfill_height })
1791        else {
1792            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1793            return Ok(())
1794        };
1795
1796        if ctrl.is_unwind() {
1797            // the node reset so we need to clear everything above that height so that backfill
1798            // height is the new canonical block.
1799            self.state.tree_state.reset(backfill_num_hash)
1800        } else {
1801            self.state.tree_state.remove_until(
1802                backfill_num_hash,
1803                self.persistence_state.last_persisted_block.hash,
1804                Some(backfill_num_hash),
1805            );
1806        }
1807
1808        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1809        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1810
1811        // remove all buffered blocks below the backfill height
1812        self.state.buffer.remove_old_blocks(backfill_height);
1813        self.purge_timing_stats(backfill_height, None);
1814        // we remove all entries because now we're synced to the backfill target and consider this
1815        // the canonical chain
1816        self.canonical_in_memory_state.clear_state();
1817
1818        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1819            // update the tracked chain height, after backfill sync both the canonical height and
1820            // persisted height are the same
1821            self.state.tree_state.set_canonical_head(new_head.num_hash());
1822            self.persistence_state.finish(new_head.hash(), new_head.number());
1823
1824            // update the tracked canonical head
1825            self.canonical_in_memory_state.set_canonical_head(new_head);
1826        }
1827
1828        // check if we need to run backfill again by comparing the most recent finalized height to
1829        // the backfill height
1830        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1831        else {
1832            return Ok(())
1833        };
1834        if sync_target_state.finalized_block_hash.is_zero() {
1835            // no finalized block, can't check distance
1836            return Ok(())
1837        }
1838        // get the block number of the finalized block, if we have it
1839        let newest_finalized = self
1840            .state
1841            .buffer
1842            .block(&sync_target_state.finalized_block_hash)
1843            .map(|block| block.number());
1844
1845        // The block number that the backfill finished at - if the progress or newest
1846        // finalized is None then we can't check the distance anyways.
1847        //
1848        // If both are Some, we perform another distance check and return the desired
1849        // backfill target
1850        if let Some(backfill_target) =
1851            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1852                // Determines whether or not we should run backfill again, in case
1853                // the new gap is still large enough and requires running backfill again
1854                self.backfill_sync_target(progress, finalized_number, None)
1855            })
1856        {
1857            // request another backfill run
1858            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1859                backfill_target.into(),
1860            )));
1861            return Ok(())
1862        };
1863
1864        // Check if there are more blocks to sync between current head and FCU target
1865        if let Some(lowest_buffered) =
1866            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1867        {
1868            let current_head_num = self.state.tree_state.current_canonical_head.number;
1869            let target_head_num = lowest_buffered.number();
1870
1871            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1872            {
1873                // There are blocks between current head and FCU target, download them
1874                debug!(
1875                    target: "engine::tree",
1876                    %current_head_num,
1877                    %target_head_num,
1878                    %distance,
1879                    "Backfill complete, downloading remaining blocks to reach FCU target"
1880                );
1881
1882                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1883                    lowest_buffered.parent_hash(),
1884                    distance,
1885                )));
1886                return Ok(());
1887            }
1888        } else {
1889            // We don't have the head block or any of its ancestors buffered. Request
1890            // a download for the head block which will then trigger further sync.
1891            debug!(
1892                target: "engine::tree",
1893                head_hash = %sync_target_state.head_block_hash,
1894                "Backfill complete but head block not buffered, requesting download"
1895            );
1896            self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1897                sync_target_state.head_block_hash,
1898            )));
1899            return Ok(());
1900        }
1901
1902        // try to close the gap by executing buffered blocks that are child blocks of the new head
1903        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1904    }
1905
1906    /// Attempts to make the given target canonical.
1907    ///
1908    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1909    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1910        if let Some(chain_update) = self.on_new_head(target)? {
1911            self.on_canonical_chain_update(chain_update);
1912        }
1913
1914        Ok(())
1915    }
1916
1917    /// Convenience function to handle an optional tree event.
1918    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1919        if let Some(event) = event {
1920            self.on_tree_event(event)?;
1921        }
1922
1923        Ok(())
1924    }
1925
1926    /// Handles a tree event.
1927    ///
1928    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1929    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1930        match event {
1931            TreeEvent::TreeAction(action) => match action {
1932                TreeAction::MakeCanonical { sync_target_head } => {
1933                    self.make_canonical(sync_target_head)?;
1934                }
1935            },
1936            TreeEvent::BackfillAction(action) => {
1937                self.emit_event(EngineApiEvent::BackfillAction(action));
1938            }
1939            TreeEvent::Download(action) => {
1940                self.emit_event(EngineApiEvent::Download(action));
1941            }
1942        }
1943
1944        Ok(())
1945    }
1946
1947    /// Removes timing stats for blocks at or below `below_number`.
1948    ///
1949    /// No-op when detailed block logging is disabled (no stats are recorded in that case).
1950    /// When `commit_duration` is provided and a slow block threshold is configured, checks
1951    /// each removed block against the threshold and emits a [`ConsensusEngineEvent::SlowBlock`]
1952    /// event for blocks that exceed it.
1953    fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
1954        let threshold = self.config.slow_block_threshold();
1955        let check_slow = commit_duration.is_some() && threshold.is_some();
1956
1957        // Two-pass: collect keys first because emit_event borrows &mut self.
1958        let keys_to_remove: Vec<B256> = self
1959            .execution_timing_stats
1960            .iter()
1961            .filter(|(_, stats)| stats.block_number <= below_number)
1962            .map(|(k, _)| *k)
1963            .collect();
1964
1965        for key in keys_to_remove {
1966            let stats = self.execution_timing_stats.remove(&key).expect("key just found");
1967            if check_slow {
1968                let commit_dur = commit_duration.expect("checked above");
1969                // state_read_duration is already included in execution_duration
1970                let total_duration =
1971                    stats.execution_duration + stats.state_hash_duration + commit_dur;
1972
1973                if total_duration > threshold.expect("checked above") {
1974                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
1975                        stats,
1976                        commit_duration: Some(commit_dur),
1977                        total_duration,
1978                    }));
1979                }
1980            }
1981        }
1982    }
1983
1984    /// Emits an outgoing event to the engine.
1985    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1986        let event = event.into();
1987
1988        if event.is_backfill_action() {
1989            debug_assert_eq!(
1990                self.backfill_sync_state,
1991                BackfillSyncState::Idle,
1992                "backfill action should only be emitted when backfill is idle"
1993            );
1994
1995            if self.persistence_state.in_progress() {
1996                // backfill sync and persisting data are mutually exclusive, so we can't start
1997                // backfill while we're still persisting
1998                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1999                return
2000            }
2001
2002            self.backfill_sync_state = BackfillSyncState::Pending;
2003            self.metrics.engine.pipeline_runs.increment(1);
2004            debug!(target: "engine::tree", "emitting backfill action event");
2005        }
2006
2007        let _ = self.outgoing.send(event).inspect_err(
2008            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2009        );
2010    }
2011
2012    /// Returns true if the canonical chain length minus the last persisted
2013    /// block is greater than or equal to the persistence threshold and
2014    /// backfill is not running.
2015    pub const fn should_persist(&self) -> bool {
2016        if !self.backfill_sync_state.is_idle() {
2017            // can't persist if backfill is running
2018            return false
2019        }
2020
2021        let min_block = self.persistence_state.last_persisted_block.number;
2022        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2023            self.config.persistence_threshold()
2024    }
2025
2026    /// Returns a batch of consecutive canonical blocks to persist in the range
2027    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
2028    fn get_canonical_blocks_to_persist(
2029        &self,
2030        target: PersistTarget,
2031    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2032        // We will calculate the state root using the database, so we need to be sure there are no
2033        // changes
2034        debug_assert!(!self.persistence_state.in_progress());
2035
2036        let mut blocks_to_persist = Vec::new();
2037        let mut current_hash = self.state.tree_state.canonical_block_hash();
2038        let last_persisted_number = self.persistence_state.last_persisted_block.number;
2039        let canonical_head_number = self.state.tree_state.canonical_block_number();
2040
2041        let target_number = match target {
2042            PersistTarget::Head => canonical_head_number,
2043            PersistTarget::Threshold => {
2044                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2045            }
2046        };
2047
2048        debug!(
2049            target: "engine::tree",
2050            ?current_hash,
2051            ?last_persisted_number,
2052            ?canonical_head_number,
2053            ?target_number,
2054            "Returning canonical blocks to persist"
2055        );
2056        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
2057            if block.recovered_block().number() <= last_persisted_number {
2058                break;
2059            }
2060
2061            if block.recovered_block().number() <= target_number {
2062                blocks_to_persist.push(block.clone());
2063            }
2064
2065            current_hash = block.recovered_block().parent_hash();
2066        }
2067
2068        // Reverse the order so that the oldest block comes first
2069        blocks_to_persist.reverse();
2070
2071        Ok(blocks_to_persist)
2072    }
2073
2074    /// This clears the blocks from the in-memory tree state that have been persisted to the
2075    /// database.
2076    ///
2077    /// This also updates the canonical in-memory state to reflect the newest persisted block
2078    /// height.
2079    ///
2080    /// Assumes that `finish` has been called on the `persistence_state` at least once
2081    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2082        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
2083        // state.
2084        if let Some(remove_above) = self.find_disk_reorg()? {
2085            self.remove_blocks(remove_above);
2086            return Ok(())
2087        }
2088
2089        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2090        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2091        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2092            number: self.persistence_state.last_persisted_block.number,
2093            hash: self.persistence_state.last_persisted_block.hash,
2094        });
2095        Ok(())
2096    }
2097
2098    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
2099    ///
2100    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
2101    /// or the database. If the required historical data (such as trie change sets) has been
2102    /// pruned for a given block, this operation will return an error. On archive nodes, it
2103    /// can retrieve any block.
2104    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2105    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2106        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2107        // check memory first
2108        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2109            return Ok(block.clone())
2110        }
2111
2112        let (block, senders) = self
2113            .provider
2114            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2115            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2116            .split_sealed();
2117        let mut execution_output = self
2118            .provider
2119            .get_state(block.header().number())?
2120            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2121        let hashed_state = self.provider.hashed_post_state(execution_output.state());
2122
2123        debug!(
2124            target: "engine::tree",
2125            number = ?block.number(),
2126            "computing block trie updates",
2127        );
2128        let db_provider = self.provider.database_provider_ro()?;
2129        let trie_updates = reth_trie_db::compute_block_trie_updates(
2130            &self.changeset_cache,
2131            &db_provider,
2132            block.number(),
2133        )?;
2134
2135        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2136        let sorted_trie_updates = Arc::new(trie_updates);
2137        // Skip building trie input and anchor for DB-loaded blocks.
2138        let trie_data =
2139            ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2140
2141        let execution_output = Arc::new(BlockExecutionOutput {
2142            state: execution_output.bundle,
2143            result: BlockExecutionResult {
2144                receipts: execution_output.receipts.pop().unwrap_or_default(),
2145                requests: execution_output.requests.pop().unwrap_or_default(),
2146                gas_used: block.gas_used(),
2147                blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2148            },
2149        });
2150
2151        Ok(ExecutedBlock::new(
2152            Arc::new(RecoveredBlock::new_sealed(block, senders)),
2153            execution_output,
2154            trie_data,
2155        ))
2156    }
2157
2158    /// Returns `true` if a block with the given hash is known, either in memory or in the
2159    /// database. This is a lightweight existence check that avoids constructing a full
2160    /// [`SealedHeader`].
2161    fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2162        if self.state.tree_state.contains_hash(&hash) {
2163            Ok(true)
2164        } else {
2165            self.provider.is_known(hash)
2166        }
2167    }
2168
2169    /// Return sealed block header from in-memory state or database by hash.
2170    fn sealed_header_by_hash(
2171        &self,
2172        hash: B256,
2173    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2174        // check memory first
2175        let header = self.state.tree_state.sealed_header_by_hash(&hash);
2176
2177        if header.is_some() {
2178            Ok(header)
2179        } else {
2180            self.provider.sealed_header_by_hash(hash)
2181        }
2182    }
2183
2184    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
2185    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
2186    /// not exist in the buffer, this returns the hash that is passed in.
2187    ///
2188    /// Returns the parent hash of the block itself if the block is buffered and has no other
2189    /// buffered ancestors.
2190    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2191        self.state
2192            .buffer
2193            .lowest_ancestor(&hash)
2194            .map(|block| block.parent_hash())
2195            .unwrap_or_else(|| hash)
2196    }
2197
2198    /// If validation fails, the response MUST contain the latest valid hash:
2199    ///
2200    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
2201    ///     conditions:
2202    ///     - It is fully validated and deemed VALID
2203    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
2204    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
2205    ///     conditions are satisfied by a `PoW` block.
2206    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
2207    ///     the above conditions.
2208    fn latest_valid_hash_for_invalid_payload(
2209        &mut self,
2210        parent_hash: B256,
2211    ) -> ProviderResult<Option<B256>> {
2212        // Check if parent exists in side chain or in canonical chain.
2213        if self.has_block_by_hash(parent_hash)? {
2214            return Ok(Some(parent_hash))
2215        }
2216
2217        // iterate over ancestors in the invalid cache
2218        // until we encounter the first valid ancestor
2219        let mut current_hash = parent_hash;
2220        let mut current_block = self.state.invalid_headers.get(&current_hash);
2221        while let Some(block_with_parent) = current_block {
2222            current_hash = block_with_parent.parent;
2223            current_block = self.state.invalid_headers.get(&current_hash);
2224
2225            // If current_header is None, then the current_hash does not have an invalid
2226            // ancestor in the cache, check its presence in blockchain tree
2227            if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2228                return Ok(Some(current_hash))
2229            }
2230        }
2231        Ok(None)
2232    }
2233
2234    /// Prepares the invalid payload response for the given hash, checking the
2235    /// database for the parent hash and populating the payload status with the latest valid hash
2236    /// according to the engine api spec.
2237    fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2238        let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2239            // Edge case: the `latestValid` field is the zero hash if the parent block is the
2240            // terminal PoW block, which we need to identify by looking at the parent's block
2241            // difficulty
2242            Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2243            Some(_) => Some(parent_hash),
2244            None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2245        };
2246
2247        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2248            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2249        })
2250        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2251    }
2252
2253    /// Returns true if the given hash is the last received sync target block.
2254    ///
2255    /// See [`ForkchoiceStateTracker::sync_target_state`]
2256    fn is_sync_target_head(&self, block_hash: B256) -> bool {
2257        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2258            return target.head_block_hash == block_hash
2259        }
2260        false
2261    }
2262
2263    /// Returns true if the given hash is part of the last received sync target fork choice update.
2264    ///
2265    /// See [`ForkchoiceStateTracker::sync_target_state`]
2266    fn is_any_sync_target(&self, block_hash: B256) -> bool {
2267        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2268            return target.contains(block_hash)
2269        }
2270        false
2271    }
2272
2273    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
2274    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
2275    ///
2276    /// Returns a payload status response according to the engine API spec if the block is known to
2277    /// be invalid.
2278    fn check_invalid_ancestor_with_head(
2279        &mut self,
2280        check: B256,
2281        head: &SealedBlock<N::Block>,
2282    ) -> ProviderResult<Option<PayloadStatus>> {
2283        // check if the check hash was previously marked as invalid
2284        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2285
2286        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2287    }
2288
2289    /// Invoked when a new payload received is invalid.
2290    fn on_invalid_new_payload(
2291        &mut self,
2292        head: SealedBlock<N::Block>,
2293        invalid: BlockWithParent,
2294    ) -> ProviderResult<PayloadStatus> {
2295        // populate the latest valid hash field
2296        let status = self.prepare_invalid_response(invalid.parent)?;
2297
2298        // insert the head block into the invalid header cache
2299        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2300        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2301
2302        Ok(status)
2303    }
2304
2305    /// Finds any invalid ancestor for the given payload.
2306    ///
2307    /// This function first checks if the block itself is in the invalid headers cache (to
2308    /// avoid re-executing a known-invalid block). Then it walks up the chain of buffered
2309    /// ancestors and checks if any ancestor is marked as invalid.
2310    ///
2311    /// The check works by:
2312    /// 1. Checking if the block hash itself is in the `invalid_headers` map
2313    /// 2. Finding the lowest buffered ancestor for the given block hash
2314    /// 3. If the ancestor is the same as the block hash itself, using the parent hash instead
2315    /// 4. Checking if this ancestor is in the `invalid_headers` map
2316    ///
2317    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
2318    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2319        let parent_hash = payload.parent_hash();
2320        let block_hash = payload.block_hash();
2321
2322        // Check if the block itself is already known to be invalid, avoiding re-execution
2323        if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2324            return Some(entry);
2325        }
2326
2327        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2328        if lowest_buffered_ancestor == block_hash {
2329            lowest_buffered_ancestor = parent_hash;
2330        }
2331
2332        // Check if the block has an invalid ancestor
2333        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2334    }
2335
2336    /// Handles a payload that has an invalid ancestor.
2337    ///
2338    /// This function validates the payload and processes it according to whether it's
2339    /// well-formed or malformed:
2340    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2341    ///    known-bad block, which violates consensus rules
2342    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2343    ///    validated due to its own structural issues
2344    fn handle_invalid_ancestor_payload(
2345        &mut self,
2346        payload: T::ExecutionData,
2347        invalid: BlockWithParent,
2348    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2349        let parent_hash = payload.parent_hash();
2350        let num_hash = payload.num_hash();
2351
2352        // Here we might have 2 cases
2353        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2354        //    remember it as invalid
2355        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2356        //    an error and forget it
2357        let block = match self.payload_validator.convert_payload_to_block(payload) {
2358            Ok(block) => block,
2359            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2360        };
2361
2362        Ok(self.on_invalid_new_payload(block, invalid)?)
2363    }
2364
2365    /// Checks if the given `head` points to an invalid header, which requires a specific response
2366    /// to a forkchoice update.
2367    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2368        // check if the head was previously marked as invalid
2369        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2370
2371        // Try to prepare invalid response, but handle errors gracefully
2372        match self.prepare_invalid_response(header.parent) {
2373            Ok(status) => Ok(Some(status)),
2374            Err(err) => {
2375                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2376                // Return a basic invalid status without latest valid hash
2377                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2378                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2379                })))
2380            }
2381        }
2382    }
2383
2384    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2385    /// and block body itself.
2386    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2387        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2388            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2389            return Err(e)
2390        }
2391
2392        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2393            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2394            return Err(e)
2395        }
2396
2397        Ok(())
2398    }
2399
2400    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2401    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2402    fn try_connect_buffered_blocks(
2403        &mut self,
2404        parent: BlockNumHash,
2405    ) -> Result<(), InsertBlockFatalError> {
2406        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2407
2408        if blocks.is_empty() {
2409            // nothing to append
2410            return Ok(())
2411        }
2412
2413        let now = Instant::now();
2414        let block_count = blocks.len();
2415        for child in blocks {
2416            let child_num_hash = child.num_hash();
2417            match self.insert_block(child) {
2418                Ok(res) => {
2419                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2420                    if self.is_any_sync_target(child_num_hash.hash) &&
2421                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2422                    {
2423                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2424                        // we just inserted a block that we know is part of the canonical chain, so
2425                        // we can make it canonical
2426                        self.make_canonical(child_num_hash.hash)?;
2427                    }
2428                }
2429                Err(err) => {
2430                    if let InsertPayloadError::Block(err) = err {
2431                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2432                        if let Err(fatal) = self.on_insert_block_error(err) {
2433                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2434                            return Err(fatal)
2435                        }
2436                    }
2437                }
2438            }
2439        }
2440
2441        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2442        Ok(())
2443    }
2444
2445    /// Pre-validates the block and inserts it into the buffer.
2446    fn buffer_block(
2447        &mut self,
2448        block: SealedBlock<N::Block>,
2449    ) -> Result<(), InsertBlockError<N::Block>> {
2450        if let Err(err) = self.validate_block(&block) {
2451            return Err(InsertBlockError::consensus_error(err, block))
2452        }
2453        self.state.buffer.insert_block(block);
2454        Ok(())
2455    }
2456
2457    /// Returns true if the distance from the local tip to the block is greater than the configured
2458    /// threshold.
2459    ///
2460    /// If the `local_tip` is greater than the `block`, then this will return false.
2461    #[inline]
2462    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2463        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2464    }
2465
2466    /// Returns how far the local tip is from the given block. If the local tip is at the same
2467    /// height or its block number is greater than the given block, this returns None.
2468    #[inline]
2469    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2470        if block > local_tip {
2471            Some(block - local_tip)
2472        } else {
2473            None
2474        }
2475    }
2476
2477    /// Returns the target hash to sync to if the distance from the local tip to the block is
2478    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2479    /// that block already).
2480    ///
2481    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2482    /// (missing) finalized block.
2483    fn backfill_sync_target(
2484        &self,
2485        canonical_tip_num: u64,
2486        target_block_number: u64,
2487        downloaded_block: Option<BlockNumHash>,
2488    ) -> Option<B256> {
2489        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2490
2491        // check if the downloaded block is the tracked finalized block
2492        let exceeds_backfill_threshold =
2493            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2494                // if we downloaded the finalized block we can now check how far we're off
2495                (Some(downloaded_block), Some(state))
2496                    if downloaded_block.hash == state.finalized_block_hash =>
2497                {
2498                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2499                }
2500                _ => match sync_target_state
2501                    .as_ref()
2502                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2503                {
2504                    Some(buffered_finalized) => {
2505                        // if we have buffered the finalized block, we should check how far we're
2506                        // off
2507                        self.exceeds_backfill_run_threshold(
2508                            canonical_tip_num,
2509                            buffered_finalized.number(),
2510                        )
2511                    }
2512                    None => {
2513                        // check if the distance exceeds the threshold for backfill sync
2514                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2515                    }
2516                },
2517            };
2518
2519        // if the number of missing blocks is greater than the max, trigger backfill
2520        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2521            // if we have already canonicalized the finalized block, we should skip backfill
2522            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2523                Err(err) => {
2524                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2525                }
2526                Ok(None) => {
2527                    // ensure the finalized block is known (not the zero hash)
2528                    if !state.finalized_block_hash.is_zero() {
2529                        // we don't have the block yet and the distance exceeds the allowed
2530                        // threshold
2531                        return Some(state.finalized_block_hash)
2532                    }
2533
2534                    // OPTIMISTIC SYNCING
2535                    //
2536                    // It can happen when the node is doing an
2537                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2538                    // but is expecting the EL to sync as high
2539                    // as possible before finalizing.
2540                    //
2541                    // This usually doesn't happen on ETH mainnet since CLs use the more
2542                    // secure checkpoint syncing.
2543                    //
2544                    // However, optimism chains will do this. The risk of a reorg is however
2545                    // low.
2546                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2547                    return Some(state.head_block_hash)
2548                }
2549                Ok(Some(_)) => {
2550                    // we're fully synced to the finalized block
2551                }
2552            }
2553        }
2554
2555        None
2556    }
2557
2558    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2559    /// happen if a reorg is happening while we are persisting a block.
2560    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2561        let mut canonical = self.state.tree_state.current_canonical_head;
2562        let mut persisted = self.persistence_state.last_persisted_block;
2563
2564        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2565            Ok(self
2566                .sealed_header_by_hash(num_hash.hash)?
2567                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2568                .parent_num_hash())
2569        };
2570
2571        // Happy path, canonical chain is ahead or equal to persisted chain.
2572        // Walk canonical chain back to make sure that it connects to persisted chain.
2573        while canonical.number > persisted.number {
2574            canonical = parent_num_hash(canonical)?;
2575        }
2576
2577        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2578        if canonical == persisted {
2579            return Ok(None);
2580        }
2581
2582        // At this point, we know that `persisted` block can't be reached by walking the canonical
2583        // chain back. In this case we need to truncate it to the first canonical block it connects
2584        // to.
2585
2586        // Firstly, walk back until we reach the same height as `canonical`.
2587        while persisted.number > canonical.number {
2588            persisted = parent_num_hash(persisted)?;
2589        }
2590
2591        debug_assert_eq!(persisted.number, canonical.number);
2592
2593        // Now walk both chains back until we find a common ancestor.
2594        while persisted.hash != canonical.hash {
2595            canonical = parent_num_hash(canonical)?;
2596            persisted = parent_num_hash(persisted)?;
2597        }
2598
2599        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2600
2601        Ok(Some(persisted.number))
2602    }
2603
2604    /// Invoked when we the canonical chain has been updated.
2605    ///
2606    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2607    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2608        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2609        let start = Instant::now();
2610
2611        // update the tracked canonical head
2612        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2613
2614        let tip = chain_update.tip().clone_sealed_header();
2615        let notification = chain_update.to_chain_notification();
2616
2617        // reinsert any missing reorged blocks
2618        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2619            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2620            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2621            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2622
2623            self.update_reorg_metrics(old.len(), old_first);
2624            self.reinsert_reorged_blocks(new.clone());
2625            self.reinsert_reorged_blocks(old.clone());
2626        }
2627
2628        // update the tracked in-memory state with the new chain
2629        self.canonical_in_memory_state.update_chain(chain_update);
2630        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2631
2632        // Update metrics based on new tip
2633        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2634
2635        // sends an event to all active listeners about the new canonical chain
2636        self.canonical_in_memory_state.notify_canon_state(notification);
2637
2638        // emit event
2639        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2640            Box::new(tip),
2641            start.elapsed(),
2642        ));
2643    }
2644
2645    /// This updates metrics based on the given reorg length and first reorged block number.
2646    fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2647        if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2648            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2649                first_reorged_block <= finalized.number
2650            {
2651                self.metrics.tree.reorgs.finalized.increment(1);
2652            } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2653                first_reorged_block <= safe.number
2654            {
2655                self.metrics.tree.reorgs.safe.increment(1);
2656            } else {
2657                self.metrics.tree.reorgs.head.increment(1);
2658            }
2659        } else {
2660            debug_unreachable!("Reorged chain doesn't have any blocks");
2661        }
2662        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2663    }
2664
2665    /// This reinserts any blocks in the new chain that do not already exist in the tree
2666    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2667        for block in new_chain {
2668            if self
2669                .state
2670                .tree_state
2671                .executed_block_by_hash(block.recovered_block().hash())
2672                .is_none()
2673            {
2674                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2675                self.state.tree_state.insert_executed(block);
2676            }
2677        }
2678    }
2679
2680    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2681    ///
2682    /// This mainly compares the missing parent of the downloaded block with the current canonical
2683    /// tip, and decides whether or not backfill sync should be triggered.
2684    fn on_disconnected_downloaded_block(
2685        &self,
2686        downloaded_block: BlockNumHash,
2687        missing_parent: BlockNumHash,
2688        head: BlockNumHash,
2689    ) -> Option<TreeEvent> {
2690        // compare the missing parent with the canonical tip
2691        if let Some(target) =
2692            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2693        {
2694            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2695            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2696        }
2697
2698        // continue downloading the missing parent
2699        //
2700        // this happens if either:
2701        //  * the missing parent block num < canonical tip num
2702        //    * this case represents a missing block on a fork that is shorter than the canonical
2703        //      chain
2704        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2705        //    less than the backfill threshold
2706        //    * this case represents a potentially long range of blocks to download and execute
2707        let request = if let Some(distance) =
2708            self.distance_from_local_tip(head.number, missing_parent.number)
2709        {
2710            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2711            DownloadRequest::BlockRange(missing_parent.hash, distance)
2712        } else {
2713            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2714            // This happens when the missing parent is on an outdated
2715            // sidechain and we can only download the missing block itself
2716            DownloadRequest::single_block(missing_parent.hash)
2717        };
2718
2719        Some(TreeEvent::Download(request))
2720    }
2721
2722    /// Handles a downloaded block that was successfully inserted as valid.
2723    ///
2724    /// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
2725    /// If it matches a non-head sync target (safe or finalized), makes it canonical inline
2726    /// and triggers a download for the remaining blocks towards the actual head.
2727    /// Otherwise, tries to connect buffered blocks.
2728    fn on_valid_downloaded_block(
2729        &mut self,
2730        block_num_hash: BlockNumHash,
2731    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2732        // check if we just inserted a block that's part of sync targets,
2733        // i.e. head, safe, or finalized
2734        if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2735            sync_target.contains(block_num_hash.hash)
2736        {
2737            debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2738
2739            if sync_target.head_block_hash == block_num_hash.hash {
2740                // we just inserted the sync target head block, make it canonical
2741                return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2742                    sync_target_head: block_num_hash.hash,
2743                })))
2744            }
2745
2746            // This block is part of the sync target (safe or finalized) but not the
2747            // head. Make it canonical and try to connect any buffered children, then
2748            // continue downloading towards the actual head if needed.
2749            self.make_canonical(block_num_hash.hash)?;
2750            self.try_connect_buffered_blocks(block_num_hash)?;
2751
2752            // Check if we've reached the sync target head after connecting buffered
2753            // blocks (e.g. the head block may have already been buffered).
2754            if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2755                let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2756                trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2757                return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2758            }
2759
2760            return Ok(None)
2761        }
2762        trace!(target: "engine::tree", "appended downloaded block");
2763        self.try_connect_buffered_blocks(block_num_hash)?;
2764        Ok(None)
2765    }
2766
2767    /// Invoked with a block downloaded from the network
2768    ///
2769    /// Returns an event with the appropriate action to take, such as:
2770    ///  - download more missing blocks
2771    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2772    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2773    fn on_downloaded_block(
2774        &mut self,
2775        block: SealedBlock<N::Block>,
2776    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2777        let block_num_hash = block.num_hash();
2778        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2779        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2780            return Ok(None)
2781        }
2782
2783        if !self.backfill_sync_state.is_idle() {
2784            return Ok(None)
2785        }
2786
2787        // try to append the block
2788        match self.insert_block(block) {
2789            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2790                return self.on_valid_downloaded_block(block_num_hash);
2791            }
2792            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2793                // block is not connected to the canonical head, we need to download
2794                // its missing branch first
2795                return Ok(self.on_disconnected_downloaded_block(
2796                    block_num_hash,
2797                    missing_ancestor,
2798                    head,
2799                ))
2800            }
2801            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2802                trace!(target: "engine::tree", "downloaded block already executed");
2803            }
2804            Err(err) => {
2805                if let InsertPayloadError::Block(err) = err {
2806                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2807                    if let Err(fatal) = self.on_insert_block_error(err) {
2808                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2809                        return Err(fatal)
2810                    }
2811                }
2812            }
2813        }
2814        Ok(None)
2815    }
2816
2817    /// Inserts a payload into the tree and executes it.
2818    ///
2819    /// This function validates the payload's basic structure, then executes it using the
2820    /// payload validator. The execution includes running all transactions in the payload
2821    /// and validating the resulting state transitions.
2822    ///
2823    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2824    /// or `InsertPayloadError` if validation or execution failed.
2825    fn insert_payload(
2826        &mut self,
2827        payload: T::ExecutionData,
2828    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2829        self.insert_block_or_payload(
2830            payload.block_with_parent(),
2831            payload,
2832            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2833            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2834        )
2835    }
2836
2837    fn insert_block(
2838        &mut self,
2839        block: SealedBlock<N::Block>,
2840    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2841        self.insert_block_or_payload(
2842            block.block_with_parent(),
2843            block,
2844            |validator, block, ctx| validator.validate_block(block, ctx),
2845            |_, block| Ok(block),
2846        )
2847    }
2848
2849    /// Inserts a block or payload into the blockchain tree with full execution.
2850    ///
2851    /// This is a generic function that handles both blocks and payloads by accepting
2852    /// a block identifier, input data, and execution/validation functions. It performs
2853    /// comprehensive checks and execution:
2854    ///
2855    /// - Validates that the block doesn't already exist in the tree
2856    /// - Ensures parent state is available, buffering if necessary
2857    /// - Executes the block/payload using the provided execute function
2858    /// - Handles both canonical and fork chain insertions
2859    /// - Updates pending block state when appropriate
2860    /// - Emits consensus engine events and records metrics
2861    ///
2862    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2863    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2864    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2865    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2866    fn insert_block_or_payload<Input, Err>(
2867        &mut self,
2868        block_id: BlockWithParent,
2869        input: Input,
2870        execute: impl FnOnce(
2871            &mut V,
2872            Input,
2873            TreeCtx<'_, N>,
2874        )
2875            -> Result<(ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>), Err>,
2876        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2877    ) -> Result<InsertPayloadOk, Err>
2878    where
2879        Err: From<InsertBlockError<N::Block>>,
2880    {
2881        let block_insert_start = Instant::now();
2882        let block_num_hash = block_id.block;
2883        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2884
2885        // Check if block already exists - first in memory, then DB only if it could be persisted
2886        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2887            convert_to_block(self, input)?;
2888            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2889        }
2890
2891        // Only query DB if block could be persisted (number <= last persisted block).
2892        // New blocks from CL always have number > last persisted, so skip DB lookup for them.
2893        if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2894            match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2895                Err(err) => {
2896                    let block = convert_to_block(self, input)?;
2897                    return Err(InsertBlockError::new(block, err.into()).into());
2898                }
2899                Ok(Some(_)) => {
2900                    convert_to_block(self, input)?;
2901                    return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2902                }
2903                Ok(None) => {}
2904            }
2905        }
2906
2907        // Ensure that the parent state is available.
2908        match self.state_provider_builder(block_id.parent) {
2909            Err(err) => {
2910                let block = convert_to_block(self, input)?;
2911                return Err(InsertBlockError::new(block, err.into()).into());
2912            }
2913            Ok(None) => {
2914                let block = convert_to_block(self, input)?;
2915
2916                // we don't have the state required to execute this block, buffering it and find the
2917                // missing parent block
2918                let missing_ancestor = self
2919                    .state
2920                    .buffer
2921                    .lowest_ancestor(&block.parent_hash())
2922                    .map(|block| block.parent_num_hash())
2923                    .unwrap_or_else(|| block.parent_num_hash());
2924
2925                self.state.buffer.insert_block(block);
2926
2927                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2928                    head: self.state.tree_state.current_canonical_head,
2929                    missing_ancestor,
2930                }))
2931            }
2932            Ok(Some(_)) => {}
2933        }
2934
2935        // determine whether we are on a fork chain by comparing the block number with the
2936        // canonical head. This is a simple check that is sufficient for the event emission below.
2937        // A block is considered a fork if its number is less than or equal to the canonical head,
2938        // as this indicates there's already a canonical block at that height.
2939        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2940
2941        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2942
2943        let start = Instant::now();
2944
2945        let (executed, timing_stats) = execute(&mut self.payload_validator, input, ctx)?;
2946
2947        // Emit slow block event immediately after execution so it appears even when
2948        // persistence hasn't completed yet (e.g. blocks arriving faster than persistence).
2949        if let Some(stats) = timing_stats {
2950            if let Some(threshold) = self.config.slow_block_threshold() {
2951                let total_duration = stats.execution_duration + stats.state_hash_duration;
2952                if total_duration > threshold {
2953                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2954                        stats: stats.clone(),
2955                        commit_duration: None,
2956                        total_duration,
2957                    }));
2958                }
2959            }
2960            self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
2961        }
2962
2963        // if the parent is the canonical head, we can insert the block as the pending block
2964        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2965        {
2966            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2967            self.canonical_in_memory_state.set_pending_block(executed.clone());
2968        }
2969
2970        self.state.tree_state.insert_executed(executed.clone());
2971        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2972
2973        // emit insert event
2974        let elapsed = start.elapsed();
2975        let engine_event = if is_fork {
2976            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2977        } else {
2978            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2979        };
2980        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2981
2982        self.metrics
2983            .engine
2984            .block_insert_total_duration
2985            .record(block_insert_start.elapsed().as_secs_f64());
2986        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2987        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2988    }
2989
2990    /// Handles an error that occurred while inserting a block.
2991    ///
2992    /// If this is a validation error this will mark the block as invalid.
2993    ///
2994    /// Returns the proper payload status response if the block is invalid.
2995    fn on_insert_block_error(
2996        &mut self,
2997        error: InsertBlockError<N::Block>,
2998    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2999        let (block, error) = error.split();
3000
3001        // if invalid block, we check the validation error. Otherwise return the fatal
3002        // error.
3003        let validation_err = error.ensure_validation_error()?;
3004
3005        // If the error was due to an invalid payload, the payload is added to the
3006        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
3007        // returned.
3008        warn!(
3009            target: "engine::tree",
3010            invalid_hash=%block.hash(),
3011            invalid_number=block.number(),
3012            %validation_err,
3013            "Invalid block error on new payload",
3014        );
3015        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3016
3017        // keep track of the invalid header
3018        self.state.invalid_headers.insert(block.block_with_parent());
3019        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
3020            Box::new(block),
3021        )));
3022
3023        Ok(PayloadStatus::new(
3024            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3025            latest_valid_hash,
3026        ))
3027    }
3028
3029    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
3030    fn on_new_payload_error(
3031        &mut self,
3032        error: NewPayloadError,
3033        payload_num_hash: NumHash,
3034        parent_hash: B256,
3035    ) -> ProviderResult<PayloadStatus> {
3036        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3037        // we need to convert the error to a payload status (response to the CL)
3038
3039        let latest_valid_hash =
3040            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3041                // Engine-API rules:
3042                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
3043                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
3044                None
3045            } else {
3046                self.latest_valid_hash_for_invalid_payload(parent_hash)?
3047            };
3048
3049        let status = PayloadStatusEnum::from(error);
3050        Ok(PayloadStatus::new(status, latest_valid_hash))
3051    }
3052
3053    /// Attempts to find the header for the given block hash if it is canonical.
3054    pub fn find_canonical_header(
3055        &self,
3056        hash: B256,
3057    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3058        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3059
3060        if canonical.is_none() {
3061            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3062        }
3063
3064        Ok(canonical)
3065    }
3066
3067    /// Updates the tracked finalized block if we have it.
3068    fn update_finalized_block(
3069        &self,
3070        finalized_block_hash: B256,
3071    ) -> Result<(), OnForkChoiceUpdated> {
3072        if finalized_block_hash.is_zero() {
3073            return Ok(())
3074        }
3075
3076        match self.find_canonical_header(finalized_block_hash) {
3077            Ok(None) => {
3078                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3079                // if the finalized block is not known, we can't update the finalized block
3080                return Err(OnForkChoiceUpdated::invalid_state())
3081            }
3082            Ok(Some(finalized)) => {
3083                if Some(finalized.num_hash()) !=
3084                    self.canonical_in_memory_state.get_finalized_num_hash()
3085                {
3086                    // we're also persisting the finalized block on disk so we can reload it on
3087                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3088                    let _ = self.persistence.save_finalized_block_number(finalized.number());
3089                    self.canonical_in_memory_state.set_finalized(finalized.clone());
3090                    // Update finalized block height metric
3091                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3092                }
3093            }
3094            Err(err) => {
3095                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3096            }
3097        }
3098
3099        Ok(())
3100    }
3101
3102    /// Updates the tracked safe block if we have it
3103    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3104        if safe_block_hash.is_zero() {
3105            return Ok(())
3106        }
3107
3108        match self.find_canonical_header(safe_block_hash) {
3109            Ok(None) => {
3110                debug!(target: "engine::tree", "Safe block not found in canonical chain");
3111                // if the safe block is not known, we can't update the safe block
3112                return Err(OnForkChoiceUpdated::invalid_state())
3113            }
3114            Ok(Some(safe)) => {
3115                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3116                    // we're also persisting the safe block on disk so we can reload it on
3117                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3118                    let _ = self.persistence.save_safe_block_number(safe.number());
3119                    self.canonical_in_memory_state.set_safe(safe.clone());
3120                    // Update safe block height metric
3121                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
3122                }
3123            }
3124            Err(err) => {
3125                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3126            }
3127        }
3128
3129        Ok(())
3130    }
3131
3132    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
3133    /// made canonical.
3134    ///
3135    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
3136    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
3137    ///
3138    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
3139    /// are consistent with the head block.
3140    fn ensure_consistent_forkchoice_state(
3141        &self,
3142        state: ForkchoiceState,
3143    ) -> Result<(), OnForkChoiceUpdated> {
3144        // Ensure that the finalized block, if not zero, is known and in the canonical chain
3145        // after the head block is canonicalized.
3146        //
3147        // This ensures that the finalized block is consistent with the head block, i.e. the
3148        // finalized block is an ancestor of the head block.
3149        self.update_finalized_block(state.finalized_block_hash)?;
3150
3151        // Also ensure that the safe block, if not zero, is known and in the canonical chain
3152        // after the head block is canonicalized.
3153        //
3154        // This ensures that the safe block is consistent with the head block, i.e. the safe
3155        // block is an ancestor of the head block.
3156        self.update_safe_block(state.safe_block_hash)
3157    }
3158
3159    /// Validates the payload attributes with respect to the header and fork choice state.
3160    ///
3161    /// This is called during `engine_forkchoiceUpdated` when the CL provides payload attributes,
3162    /// indicating it wants the EL to start building a new block.
3163    ///
3164    /// Runs [`PayloadValidator::validate_payload_attributes_against_header`](reth_engine_primitives::PayloadValidator::validate_payload_attributes_against_header) to ensure
3165    /// `payloadAttributes.timestamp > headBlock.timestamp` per the Engine API spec.
3166    ///
3167    /// If validation passes, sends the attributes to the payload builder to start a new
3168    /// payload job. If it fails, returns `INVALID_PAYLOAD_ATTRIBUTES` without rolling back
3169    /// the forkchoice update.
3170    ///
3171    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
3172    /// return an error if the payload attributes are invalid.
3173    fn process_payload_attributes(
3174        &self,
3175        attributes: T::PayloadAttributes,
3176        head: &N::BlockHeader,
3177        state: ForkchoiceState,
3178    ) -> OnForkChoiceUpdated {
3179        if let Err(err) =
3180            self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3181        {
3182            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3183            return OnForkChoiceUpdated::invalid_payload_attributes()
3184        }
3185
3186        // 8. Client software MUST begin a payload build process building on top of
3187        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
3188        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
3189        //    The build process is specified in the Payload building section.
3190
3191        let cache = if self.config.share_execution_cache_with_payload_builder() {
3192            self.payload_validator.cache_for(state.head_block_hash)
3193        } else {
3194            None
3195        };
3196
3197        let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3198            self.payload_validator.sparse_trie_handle_for(
3199                state.head_block_hash,
3200                head.state_root(),
3201                &self.state,
3202            )
3203        } else {
3204            None
3205        };
3206
3207        // send the payload to the builder and return the receiver for the pending payload
3208        // id, initiating payload job is handled asynchronously
3209        let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3210            parent_hash: state.head_block_hash,
3211            attributes,
3212            cache,
3213            trie_handle,
3214        });
3215
3216        // Client software MUST respond to this method call in the following way:
3217        // {
3218        //      payloadStatus: {
3219        //          status: VALID,
3220        //          latestValidHash: forkchoiceState.headBlockHash,
3221        //          validationError: null
3222        //      },
3223        //      payloadId: buildProcessId
3224        // }
3225        //
3226        // if the payload is deemed VALID and the build process has begun.
3227        OnForkChoiceUpdated::updated_with_pending_payload_id(
3228            PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3229            pending_payload_id,
3230        )
3231    }
3232
3233    /// Remove all blocks up to __and including__ the given block number.
3234    ///
3235    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
3236    /// those which have a fork point at or below the finalized hash.
3237    ///
3238    /// Canonical blocks below the upper bound will still be removed.
3239    pub(crate) fn remove_before(
3240        &mut self,
3241        upper_bound: BlockNumHash,
3242        finalized_hash: Option<B256>,
3243    ) -> ProviderResult<()> {
3244        // first fetch the finalized block number and then call the remove_before method on
3245        // tree_state
3246        let num = if let Some(hash) = finalized_hash {
3247            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3248        } else {
3249            None
3250        };
3251
3252        self.state.tree_state.remove_until(
3253            upper_bound,
3254            self.persistence_state.last_persisted_block.hash,
3255            num,
3256        );
3257        Ok(())
3258    }
3259
3260    /// Returns a builder for creating state providers for the given hash.
3261    ///
3262    /// This is an optimization for parallel execution contexts where we want to avoid
3263    /// creating state providers in the critical path.
3264    pub fn state_provider_builder(
3265        &self,
3266        hash: B256,
3267    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3268    where
3269        P: BlockReader + StateProviderFactory + StateReader + Clone,
3270    {
3271        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3272            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3273            // the block leads back to the canonical chain
3274            return Ok(Some(StateProviderBuilder::new(
3275                self.provider.clone(),
3276                historical,
3277                Some(blocks),
3278            )))
3279        }
3280
3281        // Check if the block is persisted
3282        if let Some(header) = self.provider.header(hash)? {
3283            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3284            // For persisted blocks, we create a builder that will fetch state directly from the
3285            // database
3286            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3287        }
3288
3289        debug!(target: "engine::tree", %hash, "no canonical state found for block");
3290        Ok(None)
3291    }
3292}
3293
3294/// Events received in the main engine loop.
3295#[derive(Debug)]
3296enum LoopEvent<T, N>
3297where
3298    N: NodePrimitives,
3299    T: PayloadTypes,
3300{
3301    /// An engine API message was received.
3302    EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3303    /// A persistence task completed.
3304    PersistenceComplete {
3305        /// The unified result of the persistence operation.
3306        result: PersistenceResult,
3307        /// When the persistence operation started.
3308        start_time: Instant,
3309    },
3310    /// A channel was disconnected.
3311    Disconnected,
3312}
3313
3314/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3315/// variant.
3316///
3317/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3318/// is valid or not.
3319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3320pub enum BlockStatus {
3321    /// The block is valid and block extends canonical chain.
3322    Valid,
3323    /// The block may be valid and has an unknown missing ancestor.
3324    Disconnected {
3325        /// Current canonical head.
3326        head: BlockNumHash,
3327        /// The lowest ancestor block that is not connected to the canonical chain.
3328        missing_ancestor: BlockNumHash,
3329    },
3330}
3331
3332/// How a payload was inserted if it was valid.
3333///
3334/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3335/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3336#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3337pub enum InsertPayloadOk {
3338    /// The payload was valid, but we have already seen it.
3339    AlreadySeen(BlockStatus),
3340    /// The payload was valid and inserted into the tree.
3341    Inserted(BlockStatus),
3342}
3343
3344/// Target for block persistence.
3345#[derive(Debug, Clone, Copy)]
3346enum PersistTarget {
3347    /// Persist up to `canonical_head - memory_block_buffer_target`.
3348    Threshold,
3349    /// Persist all blocks up to and including the canonical head.
3350    Head,
3351}
3352
3353/// Result of waiting for caches to become available.
3354#[derive(Debug, Clone, Copy, Default)]
3355pub struct CacheWaitDurations {
3356    /// Time spent waiting for the execution cache lock.
3357    pub execution_cache: Duration,
3358    /// Time spent waiting for the sparse trie lock.
3359    pub sparse_trie: Duration,
3360}
3361
3362/// Trait for types that can wait for caches to become available.
3363///
3364/// This is used by `reth_newPayload` endpoint to ensure that payload processing
3365/// waits for any ongoing operations to complete before starting.
3366pub trait WaitForCaches {
3367    /// Waits for cache updates to complete.
3368    ///
3369    /// Returns the time spent waiting for each cache separately.
3370    fn wait_for_caches(&self) -> CacheWaitDurations;
3371}