Skip to main content

reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError, InsertBlockValidationError};
15use reth_chain_state::{
16    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17    MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29    FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32    BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33    DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34    StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35    StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48    oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64pub mod types;
65
66use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
67pub use block_buffer::BlockBuffer;
68pub use invalid_headers::InvalidHeaderCache;
69pub use metrics::EngineApiMetrics;
70pub use payload_processor::*;
71pub use payload_validator::{BasicEngineValidator, EngineValidator};
72pub use persistence_state::PersistenceState;
73pub use reth_engine_primitives::TreeConfig;
74pub use reth_execution_cache::{
75    CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider, ExecutionCache,
76    PayloadExecutionCache, SavedCache,
77};
78pub use types::{ValidationOutcome, ValidationOutput};
79
80pub mod state;
81
82/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
83///
84/// This is the default threshold, and represents the distance (gap) from the local head to a
85/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
86/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
87///
88/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
89/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
90/// backfill this gap.
91pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
92
93/// The minimum number of blocks to retain in the changeset cache after eviction.
94///
95/// This ensures that recent trie changesets are kept in memory for potential reorgs,
96/// even when the finalized block is not set (e.g., on L2s like Optimism).
97const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
98
99/// A builder for creating state providers that can be used across threads.
100#[derive(Clone, Debug)]
101pub struct StateProviderBuilder<N: NodePrimitives, P> {
102    /// The provider factory used to create providers.
103    provider_factory: P,
104    /// The historical block hash to fetch state from.
105    historical: B256,
106    /// The blocks that form the chain from historical to target and are in memory.
107    overlay: Option<Vec<ExecutedBlock<N>>>,
108}
109
110impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
111    /// Creates a new state provider from the provider factory, historical block hash and optional
112    /// overlaid blocks.
113    pub const fn new(
114        provider_factory: P,
115        historical: B256,
116        overlay: Option<Vec<ExecutedBlock<N>>>,
117    ) -> Self {
118        Self { provider_factory, historical, overlay }
119    }
120}
121
122impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
123where
124    P: BlockReader + StateProviderFactory + StateReader + Clone,
125{
126    /// Creates a new state provider from this builder.
127    pub fn build(&self) -> ProviderResult<StateProviderBox> {
128        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
129        if let Some(overlay) = self.overlay.clone() {
130            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
131        }
132        Ok(provider)
133    }
134}
135
136/// Tracks the state of the engine api internals.
137///
138/// This type is not shareable.
139#[derive(Debug)]
140pub struct EngineApiTreeState<N: NodePrimitives> {
141    /// Tracks the state of the blockchain tree.
142    tree_state: TreeState<N>,
143    /// Tracks the forkchoice state updates received by the CL.
144    forkchoice_state_tracker: ForkchoiceStateTracker,
145    /// Buffer of detached blocks.
146    buffer: BlockBuffer<N::Block>,
147    /// Tracks the header of invalid payloads that were rejected by the engine because they're
148    /// invalid.
149    invalid_headers: InvalidHeaderCache,
150}
151
152impl<N: NodePrimitives> EngineApiTreeState<N> {
153    fn new(
154        block_buffer_limit: u32,
155        max_invalid_header_cache_length: u32,
156        invalid_header_hit_eviction_threshold: u8,
157        canonical_block: BlockNumHash,
158        engine_kind: EngineApiKind,
159    ) -> Self {
160        Self {
161            invalid_headers: InvalidHeaderCache::new(
162                max_invalid_header_cache_length,
163                invalid_header_hit_eviction_threshold,
164            ),
165            buffer: BlockBuffer::new(block_buffer_limit),
166            tree_state: TreeState::new(canonical_block, engine_kind),
167            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
168        }
169    }
170
171    /// Returns a reference to the tree state.
172    pub const fn tree_state(&self) -> &TreeState<N> {
173        &self.tree_state
174    }
175
176    /// Returns true if the block has been marked as invalid.
177    pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
178        self.invalid_headers.get(hash).is_some()
179    }
180}
181
182/// The outcome of a tree operation.
183#[derive(Debug)]
184pub struct TreeOutcome<T> {
185    /// The outcome of the operation.
186    pub outcome: T,
187    /// An optional event to tell the caller to do something.
188    pub event: Option<TreeEvent>,
189    /// Whether the block was already seen, meaning no real execution happened during this
190    /// `newPayload` call.
191    pub already_seen: bool,
192}
193
194impl<T> TreeOutcome<T> {
195    /// Create new tree outcome.
196    pub const fn new(outcome: T) -> Self {
197        Self { outcome, event: None, already_seen: false }
198    }
199
200    /// Set event on the outcome.
201    pub fn with_event(mut self, event: TreeEvent) -> Self {
202        self.event = Some(event);
203        self
204    }
205
206    /// Set the `already_seen` flag on the outcome.
207    pub const fn with_already_seen(mut self, value: bool) -> Self {
208        self.already_seen = value;
209        self
210    }
211}
212
213/// Result of trying to insert a new payload in [`EngineApiTreeHandler`].
214#[derive(Debug)]
215pub struct TryInsertPayloadResult {
216    /// - `Valid`: Payload successfully validated and inserted
217    /// - `Syncing`: Parent missing, payload buffered for later
218    /// - Error status: Payload is invalid
219    pub status: PayloadStatus,
220    /// Whether the block was already seen
221    pub already_seen: bool,
222}
223
224impl TryInsertPayloadResult {
225    /// Convert the result into a [`TreeOutcome`].
226    #[inline]
227    pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
228        TreeOutcome::new(self.status).with_already_seen(self.already_seen)
229    }
230}
231
232/// Events that are triggered by Tree Chain
233#[derive(Debug)]
234pub enum TreeEvent {
235    /// Tree action is needed.
236    TreeAction(TreeAction),
237    /// Backfill action is needed.
238    BackfillAction(BackfillAction),
239    /// Block download is needed.
240    Download(DownloadRequest),
241}
242
243impl TreeEvent {
244    /// Returns true if the event is a backfill action.
245    const fn is_backfill_action(&self) -> bool {
246        matches!(self, Self::BackfillAction(_))
247    }
248}
249
250/// The actions that can be performed on the tree.
251#[derive(Debug)]
252pub enum TreeAction {
253    /// Make target canonical.
254    MakeCanonical {
255        /// The sync target head hash
256        sync_target_head: B256,
257    },
258}
259
260/// The engine API tree handler implementation.
261///
262/// This type is responsible for processing engine API requests, maintaining the canonical state and
263/// emitting events.
264pub struct EngineApiTreeHandler<N, P, T, V, C>
265where
266    N: NodePrimitives,
267    T: PayloadTypes,
268    C: ConfigureEvm<Primitives = N> + 'static,
269{
270    provider: P,
271    consensus: Arc<dyn FullConsensus<N>>,
272    payload_validator: V,
273    /// Keeps track of internals such as executed and buffered blocks.
274    state: EngineApiTreeState<N>,
275    /// The half for sending messages to the engine.
276    ///
277    /// This is kept so that we can queue in messages to ourself that we can process later, for
278    /// example distributing workload across multiple messages that would otherwise take too long
279    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
280    /// them one by one so that we can handle incoming engine API in between and don't become
281    /// unresponsive. This can happen during live sync transition where we're trying to close the
282    /// gap (up to 3 epochs of blocks in the worst case).
283    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
284    /// Incoming engine API requests.
285    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
286    /// Outgoing events that are emitted to the handler.
287    outgoing: UnboundedSender<EngineApiEvent<N>>,
288    /// Channels to the persistence layer.
289    persistence: PersistenceHandle<N>,
290    /// Tracks the state changes of the persistence task.
291    persistence_state: PersistenceState,
292    /// Flag indicating the state of the node's backfill synchronization process.
293    backfill_sync_state: BackfillSyncState,
294    /// Keeps track of the state of the canonical chain that isn't persisted yet.
295    /// This is intended to be accessed from external sources, such as rpc.
296    canonical_in_memory_state: CanonicalInMemoryState<N>,
297    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
298    /// updates
299    payload_builder: PayloadBuilderHandle<T>,
300    /// Configuration settings.
301    config: TreeConfig,
302    /// Metrics for the engine api.
303    metrics: EngineApiMetrics,
304    /// The engine API variant of this handler
305    engine_kind: EngineApiKind,
306    /// The EVM configuration.
307    evm_config: C,
308    /// Changeset cache for in-memory trie changesets
309    changeset_cache: ChangesetCache,
310    /// Timing statistics for executed blocks, keyed by block hash.
311    /// Stored here (not in `ExecutedBlock`) to avoid leaking observability concerns into the block
312    /// type. Entries are removed when blocks are persisted or invalidated.
313    execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
314    /// Set when an FCU with payload attributes is received, cleared on the next FCU without.
315    /// Suppresses persistence cycles during payload building.
316    building_payload: bool,
317    /// Task runtime for spawning blocking work on named, reusable threads.
318    runtime: reth_tasks::Runtime,
319}
320
321impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
322    for EngineApiTreeHandler<N, P, T, V, C>
323where
324    N: NodePrimitives,
325    C: Debug + ConfigureEvm<Primitives = N>,
326{
327    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328        f.debug_struct("EngineApiTreeHandler")
329            .field("provider", &self.provider)
330            .field("consensus", &self.consensus)
331            .field("payload_validator", &self.payload_validator)
332            .field("state", &self.state)
333            .field("incoming_tx", &self.incoming_tx)
334            .field("persistence", &self.persistence)
335            .field("persistence_state", &self.persistence_state)
336            .field("backfill_sync_state", &self.backfill_sync_state)
337            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
338            .field("payload_builder", &self.payload_builder)
339            .field("config", &self.config)
340            .field("metrics", &self.metrics)
341            .field("engine_kind", &self.engine_kind)
342            .field("evm_config", &self.evm_config)
343            .field("changeset_cache", &self.changeset_cache)
344            .field("execution_timing_stats", &self.execution_timing_stats.len())
345            .field("runtime", &self.runtime)
346            .finish()
347    }
348}
349
350impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
351where
352    N: NodePrimitives,
353    P: DatabaseProviderFactory
354        + BlockReader<Block = N::Block, Header = N::BlockHeader>
355        + StateProviderFactory
356        + StateReader<Receipt = N::Receipt>
357        + HashedPostStateProvider
358        + Clone
359        + 'static,
360    P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
361        + StageCheckpointReader
362        + ChangeSetReader
363        + StorageChangeSetReader
364        + StorageSettingsCache,
365    C: ConfigureEvm<Primitives = N> + 'static,
366    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
367    V: EngineValidator<T> + WaitForCaches,
368{
369    /// Creates a new [`EngineApiTreeHandler`].
370    #[expect(clippy::too_many_arguments)]
371    pub fn new(
372        provider: P,
373        consensus: Arc<dyn FullConsensus<N>>,
374        payload_validator: V,
375        outgoing: UnboundedSender<EngineApiEvent<N>>,
376        state: EngineApiTreeState<N>,
377        canonical_in_memory_state: CanonicalInMemoryState<N>,
378        persistence: PersistenceHandle<N>,
379        persistence_state: PersistenceState,
380        payload_builder: PayloadBuilderHandle<T>,
381        config: TreeConfig,
382        engine_kind: EngineApiKind,
383        evm_config: C,
384        changeset_cache: ChangesetCache,
385        runtime: reth_tasks::Runtime,
386    ) -> Self {
387        let (incoming_tx, incoming) = crossbeam_channel::unbounded();
388
389        Self {
390            provider,
391            consensus,
392            payload_validator,
393            incoming,
394            outgoing,
395            persistence,
396            persistence_state,
397            backfill_sync_state: BackfillSyncState::Idle,
398            state,
399            canonical_in_memory_state,
400            payload_builder,
401            config,
402            metrics: Default::default(),
403            incoming_tx,
404            engine_kind,
405            evm_config,
406            changeset_cache,
407            execution_timing_stats: HashMap::new(),
408            building_payload: false,
409            runtime,
410        }
411    }
412
413    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
414    /// own thread.
415    ///
416    /// Returns the sender through which incoming requests can be sent to the task and the receiver
417    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
418    #[expect(clippy::complexity)]
419    pub fn spawn_new(
420        provider: P,
421        consensus: Arc<dyn FullConsensus<N>>,
422        payload_validator: V,
423        persistence: PersistenceHandle<N>,
424        payload_builder: PayloadBuilderHandle<T>,
425        canonical_in_memory_state: CanonicalInMemoryState<N>,
426        config: TreeConfig,
427        kind: EngineApiKind,
428        evm_config: C,
429        changeset_cache: ChangesetCache,
430        runtime: reth_tasks::Runtime,
431    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
432    {
433        let best_block_number = provider.best_block_number().unwrap_or(0);
434        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
435
436        let persistence_state = PersistenceState {
437            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
438            rx: None,
439        };
440
441        let (tx, outgoing) = unbounded_channel();
442        let state = EngineApiTreeState::new(
443            config.block_buffer_limit(),
444            config.max_invalid_header_cache_length(),
445            config.invalid_header_hit_eviction_threshold(),
446            header.num_hash(),
447            kind,
448        );
449
450        let task = Self::new(
451            provider,
452            consensus,
453            payload_validator,
454            tx,
455            state,
456            canonical_in_memory_state,
457            persistence,
458            persistence_state,
459            payload_builder,
460            config,
461            kind,
462            evm_config,
463            changeset_cache,
464            runtime,
465        );
466        let incoming = task.incoming_tx.clone();
467        spawn_os_thread("engine", || {
468            increase_thread_priority();
469            task.run()
470        });
471        (incoming, outgoing)
472    }
473
474    /// Returns a [`TreeOutcome`] indicating the forkchoice head is valid and canonical.
475    fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
476        TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
477            PayloadStatusEnum::Valid,
478            Some(state.head_block_hash),
479        )))
480    }
481
482    /// Returns a new [`Sender`] to send messages to this type.
483    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
484        self.incoming_tx.clone()
485    }
486
487    /// How many blocks the canonical tip is ahead of the last persisted block. A large gap means
488    /// persistence is falling behind execution.
489    const fn persistence_gap(&self) -> u64 {
490        self.state
491            .tree_state
492            .canonical_block_number()
493            .saturating_sub(self.persistence_state.last_persisted_block.number)
494    }
495
496    /// Returns `true` when the main loop should stop draining the tree input channel.
497    ///
498    /// This is the case when persistence is already running and the gap between the canonical tip
499    /// and the last persisted block has reached the configured threshold.
500    const fn should_backpressure(&self) -> bool {
501        self.persistence_state.in_progress() &&
502            self.persistence_gap() >= self.config.persistence_backpressure_threshold()
503    }
504
505    /// Run the engine API handler.
506    ///
507    /// This will block the current thread and process incoming messages.
508    pub fn run(mut self) {
509        loop {
510            // Each iteration has three phases:
511            //
512            // 1. Non-blocking poll for persistence completion. If the background flush already
513            //    landed, absorb the result now so the gap calculation below is fresh.
514            // 2. Decide how to wait for the next event. When the canonical-to-persisted gap exceeds
515            //    the backpressure threshold we only block on the persistence receiver, leaving new
516            //    engine requests sitting in the unbounded upstream channel.
517            // 3. Handle the event (engine message or persistence completion) and kick off a new
518            //    persistence cycle if the threshold is met again.
519            //
520            // The net effect: when the persistence gap exceeds the threshold, we stop
521            // processing incoming messages and let them queue in the channel. This is only a
522            // soft form of backpressure: it delays replies and, more importantly, prevents
523            // executing further blocks that would pile up in the persistence queue - where each
524            // block carries heavier state (eg. trie updates) than the raw payload sitting in the
525            // engine channel.
526            //
527            // Standard Ethereum CLs won't truly back off - the engine API has no
528            // backpressure semantics, and CLs typically timeout after ≈8s and resend - so
529            // this cannot prevent the incoming channel from growing under sustained load.
530            // But it shifts the bottleneck to the lighter-weight incoming queue rather than
531            // the costlier persistence pipeline. Other clients that respect reply latency
532            // can treat the delayed responses as a signal to chill out.
533            match self.try_poll_persistence() {
534                Ok(true) => {
535                    if let Err(err) = self.advance_persistence() {
536                        error!(target: "engine::tree", %err, "Advancing persistence failed");
537                        return
538                    }
539                    continue;
540                }
541                Ok(false) => {}
542                Err(err) => {
543                    error!(target: "engine::tree", %err, "Polling persistence failed");
544                    return
545                }
546            }
547
548            let event = if self.should_backpressure() {
549                self.metrics.engine.backpressure_active.set(1.0);
550                let stall_start = Instant::now();
551                let event = self.wait_for_persistence_event();
552                self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
553                event
554            } else {
555                self.metrics.engine.backpressure_active.set(0.0);
556                self.wait_for_event()
557            };
558
559            match event {
560                LoopEvent::EngineMessage(msg) => {
561                    debug!(target: "engine::tree", %msg, "received new engine message");
562                    match self.on_engine_message(msg) {
563                        Ok(ops::ControlFlow::Break(())) => return,
564                        Ok(ops::ControlFlow::Continue(())) => {}
565                        Err(fatal) => {
566                            error!(target: "engine::tree", %fatal, "insert block fatal error");
567                            return
568                        }
569                    }
570                }
571                LoopEvent::PersistenceComplete { result, start_time } => {
572                    if let Err(err) = self.on_persistence_complete(result, start_time) {
573                        error!(target: "engine::tree", %err, "Persistence complete handling failed");
574                        return
575                    }
576                }
577                LoopEvent::Disconnected => {
578                    error!(target: "engine::tree", "Channel disconnected");
579                    return
580                }
581            }
582
583            // Always check if we need to trigger new persistence after any event:
584            // - After engine messages: new blocks may have been inserted that exceed the
585            //   persistence threshold
586            // - After persistence completion: we can now persist more blocks if needed
587            if let Err(err) = self.advance_persistence() {
588                error!(target: "engine::tree", %err, "Advancing persistence failed");
589                return
590            }
591        }
592    }
593
594    /// Blocks until the in-flight persistence task completes, used when we are under
595    /// backpressure.
596    ///
597    /// Unlike `wait_for_event`, this deliberately does not read from the tree input channel. Any
598    /// requests sent to the tree remain queued upstream until persistence catches up.
599    fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
600        let maybe_persistence = self.persistence_state.rx.take();
601
602        if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
603            match persistence_rx.recv() {
604                Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
605                Err(_) => LoopEvent::Disconnected,
606            }
607        } else {
608            self.wait_for_event()
609        }
610    }
611
612    /// Blocks until the next event is ready: either an incoming engine message or a persistence
613    /// completion (if one is in progress).
614    ///
615    /// Uses biased selection to prioritize persistence completion to update in-memory state and
616    /// unblock further writes.
617    fn wait_for_event(&mut self) -> LoopEvent<T, N> {
618        // Take ownership of persistence rx if present
619        let maybe_persistence = self.persistence_state.rx.take();
620
621        if let Some((persistence_rx, start_time, action)) = maybe_persistence {
622            // Biased select prioritizes persistence completion to update in memory state and
623            // unblock further writes
624            crossbeam_channel::select_biased! {
625                recv(persistence_rx) -> result => {
626                    // Don't put it back - consumed (oneshot-like behavior)
627                    match result {
628                        Ok(result) => LoopEvent::PersistenceComplete {
629                            result,
630                            start_time,
631                        },
632                        Err(_) => LoopEvent::Disconnected,
633                    }
634                },
635                recv(self.incoming) -> msg => {
636                    // Put the persistence rx back - we didn't consume it
637                    self.persistence_state.rx = Some((persistence_rx, start_time, action));
638                    match msg {
639                        Ok(m) => LoopEvent::EngineMessage(m),
640                        Err(_) => LoopEvent::Disconnected,
641                    }
642                },
643            }
644        } else {
645            // No persistence in progress - just wait on incoming
646            match self.incoming.recv() {
647                Ok(m) => LoopEvent::EngineMessage(m),
648                Err(_) => LoopEvent::Disconnected,
649            }
650        }
651    }
652
653    /// Invoked when previously requested blocks were downloaded.
654    ///
655    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
656    /// will execute the first batch and send the remaining blocks back through the channel so that
657    /// block request processing isn't blocked for a long time.
658    fn on_downloaded(
659        &mut self,
660        mut blocks: Vec<SealedBlock<N::Block>>,
661    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
662        if blocks.is_empty() {
663            // nothing to execute
664            return Ok(None)
665        }
666
667        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
668        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
669        for block in blocks.drain(..batch) {
670            if let Some(event) = self.on_downloaded_block(block)? {
671                let needs_backfill = event.is_backfill_action();
672                self.on_tree_event(event)?;
673                if needs_backfill {
674                    // can exit early if backfill is needed
675                    return Ok(None)
676                }
677            }
678        }
679
680        // if we still have blocks to execute, send them as a followup request
681        if !blocks.is_empty() {
682            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
683        }
684
685        Ok(None)
686    }
687
688    /// When the Consensus layer receives a new block via the consensus gossip protocol,
689    /// the transactions in the block are sent to the execution layer in the form of a
690    /// [`PayloadTypes::ExecutionData`], for example
691    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
692    /// Execution layer executes the transactions and validates the state in the block header,
693    /// then passes validation data back to Consensus layer, that adds the block to the head of
694    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
695    /// network in the form of a "Beacon block".
696    ///
697    /// These responses should adhere to the [Engine API Spec for
698    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
699    ///
700    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
701    /// returns an error if an internal error occurred.
702    #[instrument(
703        level = "debug",
704        target = "engine::tree",
705        skip_all,
706        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
707    )]
708    fn on_new_payload(
709        &mut self,
710        payload: T::ExecutionData,
711    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
712        trace!(target: "engine::tree", "invoked new payload");
713
714        // start timing for the new payload process
715        let start = Instant::now();
716
717        // Ensures that the given payload does not violate any consensus rules that concern the
718        // block's layout, like:
719        //    - missing or invalid base fee
720        //    - invalid extra data
721        //    - invalid transactions
722        //    - incorrect hash
723        //    - the versioned hashes passed with the payload do not exactly match transaction
724        //      versioned hashes
725        //    - the block does not contain blob transactions if it is pre-cancun
726        //
727        // This validates the following engine API rule:
728        //
729        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
730        //    validation by taking the following steps:
731        //
732        //   1. Obtain the actual array by concatenating blob versioned hashes lists
733        //      (`tx.blob_versioned_hashes`) of each [blob
734        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
735        //      in the payload, respecting the order of inclusion. If the payload has no blob
736        //      transactions the expected array **MUST** be `[]`.
737        //
738        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
739        //      null}` if the expected and the actual arrays don't match.
740        //
741        // This validation **MUST** be instantly run in all cases even during active sync process.
742
743        let num_hash = payload.num_hash();
744        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
745        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
746
747        let block_hash = num_hash.hash;
748
749        // Check for invalid ancestors
750        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
751            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
752            return Ok(TreeOutcome::new(status));
753        }
754
755        // record pre-execution phase duration
756        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
757
758        let mut outcome = if self.backfill_sync_state.is_idle() {
759            self.try_insert_payload(payload)?.into_outcome()
760        } else {
761            TreeOutcome::new(self.try_buffer_payload(payload)?)
762        };
763
764        // if the block is valid and it is the current sync target head, make it canonical
765        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
766            // Only create the canonical event if this block isn't already the canonical head
767            if self.state.tree_state.canonical_block_hash() != block_hash {
768                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
769                    sync_target_head: block_hash,
770                }));
771            }
772        }
773
774        // record total newPayload duration
775        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
776
777        Ok(outcome)
778    }
779
780    /// Processes a payload during normal sync operation.
781    #[instrument(level = "debug", target = "engine::tree", skip_all)]
782    fn try_insert_payload(
783        &mut self,
784        payload: T::ExecutionData,
785    ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
786        let block_hash = payload.block_hash();
787        let num_hash = payload.num_hash();
788        let parent_hash = payload.parent_hash();
789        let mut latest_valid_hash = None;
790
791        match self.insert_payload(payload) {
792            Ok(status) => {
793                let (status, already_seen) = match status {
794                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
795                        latest_valid_hash = Some(block_hash);
796                        self.try_connect_buffered_blocks(num_hash)?;
797                        (PayloadStatusEnum::Valid, false)
798                    }
799                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
800                        latest_valid_hash = Some(block_hash);
801                        (PayloadStatusEnum::Valid, true)
802                    }
803                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
804                        (PayloadStatusEnum::Syncing, false)
805                    }
806                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
807                        // not known to be invalid, but we don't know anything else
808                        (PayloadStatusEnum::Syncing, true)
809                    }
810                };
811
812                Ok(TryInsertPayloadResult {
813                    status: PayloadStatus::new(status, latest_valid_hash),
814                    already_seen,
815                })
816            }
817            Err(error) => {
818                let status = match error {
819                    InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
820                    InsertPayloadError::Payload(error) => {
821                        self.on_new_payload_error(error, num_hash, parent_hash)?
822                    }
823                };
824
825                Ok(TryInsertPayloadResult { status, already_seen: false })
826            }
827        }
828    }
829
830    /// Stores a payload for later processing during backfill sync.
831    ///
832    /// During backfill, the node lacks the state needed to validate payloads,
833    /// so they are buffered (stored in memory) until their parent blocks are synced.
834    ///
835    /// Returns:
836    /// - `Syncing`: Payload successfully buffered
837    /// - Error status: Payload is malformed or invalid
838    fn try_buffer_payload(
839        &mut self,
840        payload: T::ExecutionData,
841    ) -> Result<PayloadStatus, InsertBlockFatalError> {
842        let parent_hash = payload.parent_hash();
843        let num_hash = payload.num_hash();
844
845        match self.payload_validator.convert_payload_to_block(payload) {
846            // if the block is well-formed, buffer it for later
847            Ok(block) => {
848                if let Err(error) = self.buffer_block(block) {
849                    Ok(self.on_insert_block_error(error)?)
850                } else {
851                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
852                }
853            }
854            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
855        }
856    }
857
858    /// Returns the new chain for the given head.
859    ///
860    /// This also handles reorgs.
861    ///
862    /// Note: This does not update the tracked state and instead returns the new chain based on the
863    /// given head.
864    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
865        // get the executed new head block
866        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
867            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
868            self.metrics.engine.executed_new_block_cache_miss.increment(1);
869            return Ok(None)
870        };
871
872        let new_head_number = new_head_block.recovered_block().number();
873        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
874
875        let mut new_chain = vec![new_head_block.clone()];
876        let mut current_hash = new_head_block.recovered_block().parent_hash();
877        let mut current_number = new_head_number - 1;
878
879        // Walk back the new chain until we reach a block we know about
880        //
881        // This is only done for in-memory blocks, because we should not have persisted any blocks
882        // that are _above_ the current canonical head.
883        while current_number > current_canonical_number {
884            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
885            {
886                current_hash = block.recovered_block().parent_hash();
887                current_number -= 1;
888                new_chain.push(block);
889            } else {
890                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
891                // This should never happen as we're walking back a chain that should connect to
892                // the canonical chain
893                return Ok(None)
894            }
895        }
896
897        // If we have reached the current canonical head by walking back from the target, then we
898        // know this represents an extension of the canonical chain.
899        if current_hash == self.state.tree_state.current_canonical_head.hash {
900            new_chain.reverse();
901
902            // Simple extension of the current chain
903            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
904        }
905
906        // We have a reorg. Walk back both chains to find the fork point.
907        let mut old_chain = Vec::new();
908        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
909
910        // If the canonical chain is ahead of the new chain,
911        // gather all blocks until new head number.
912        while current_canonical_number > current_number {
913            let block = self.canonical_block_by_hash(old_hash)?;
914            old_hash = block.recovered_block().parent_hash();
915            old_chain.push(block);
916            current_canonical_number -= 1;
917        }
918
919        // Both new and old chain pointers are now at the same height.
920        debug_assert_eq!(current_number, current_canonical_number);
921
922        // Walk both chains from specified hashes at same height until
923        // a common ancestor (fork block) is reached.
924        while old_hash != current_hash {
925            let block = self.canonical_block_by_hash(old_hash)?;
926            old_hash = block.recovered_block().parent_hash();
927            old_chain.push(block);
928
929            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
930            {
931                current_hash = block.recovered_block().parent_hash();
932                new_chain.push(block);
933            } else {
934                // This shouldn't happen as we've already walked this path
935                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
936                return Ok(None)
937            }
938        }
939        new_chain.reverse();
940        old_chain.reverse();
941
942        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
943    }
944
945    /// Updates the latest block state to the specified canonical ancestor.
946    ///
947    /// This method ensures that the latest block tracks the given canonical header by resetting
948    ///
949    /// # Arguments
950    /// * `canonical_header` - The canonical header to set as the new head
951    ///
952    /// # Returns
953    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
954    ///
955    /// Caution: This unwinds the canonical chain
956    fn update_latest_block_to_canonical_ancestor(
957        &mut self,
958        canonical_header: &SealedHeader<N::BlockHeader>,
959    ) -> ProviderResult<()> {
960        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
961        let current_head_number = self.state.tree_state.canonical_block_number();
962        let new_head_number = canonical_header.number();
963        let new_head_hash = canonical_header.hash();
964
965        // Update tree state with the new canonical head
966        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
967
968        // Handle the state update based on whether this is an unwind scenario
969        if new_head_number < current_head_number {
970            debug!(
971                target: "engine::tree",
972                current_head = current_head_number,
973                new_head = new_head_number,
974                new_head_hash = ?new_head_hash,
975                "FCU unwind detected: reverting to canonical ancestor"
976            );
977
978            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
979        } else {
980            debug!(
981                target: "engine::tree",
982                previous_head = current_head_number,
983                new_head = new_head_number,
984                new_head_hash = ?new_head_hash,
985                "Advancing latest block to canonical ancestor"
986            );
987            self.handle_chain_advance_or_same_height(canonical_header)
988        }
989    }
990
991    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
992    /// to the canonical header
993    fn handle_canonical_chain_unwind(
994        &self,
995        current_head_number: u64,
996        canonical_header: &SealedHeader<N::BlockHeader>,
997    ) -> ProviderResult<()> {
998        let new_head_number = canonical_header.number();
999        debug!(
1000            target: "engine::tree",
1001            from = current_head_number,
1002            to = new_head_number,
1003            "Handling unwind: collecting blocks to remove from in-memory state"
1004        );
1005
1006        // Collect blocks that need to be removed from memory
1007        let old_blocks =
1008            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
1009
1010        // Load and apply the canonical ancestor block
1011        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1012    }
1013
1014    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
1015    fn collect_blocks_for_canonical_unwind(
1016        &self,
1017        new_head_number: u64,
1018        current_head_number: u64,
1019    ) -> Vec<ExecutedBlock<N>> {
1020        let mut old_blocks =
1021            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1022
1023        for block_num in (new_head_number + 1)..=current_head_number {
1024            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1025                let executed_block = block_state.block_ref().clone();
1026                old_blocks.push(executed_block);
1027                debug!(
1028                    target: "engine::tree",
1029                    block_number = block_num,
1030                    "Collected block for removal from in-memory state"
1031                );
1032            }
1033        }
1034
1035        if old_blocks.is_empty() {
1036            debug!(
1037                target: "engine::tree",
1038                "No blocks found in memory to remove, will clear and reset state"
1039            );
1040        }
1041
1042        old_blocks
1043    }
1044
1045    /// Applies the canonical ancestor block via a reorg operation.
1046    fn apply_canonical_ancestor_via_reorg(
1047        &self,
1048        canonical_header: &SealedHeader<N::BlockHeader>,
1049        old_blocks: Vec<ExecutedBlock<N>>,
1050    ) -> ProviderResult<()> {
1051        let new_head_hash = canonical_header.hash();
1052        let new_head_number = canonical_header.number();
1053
1054        // Load the canonical ancestor's block
1055        let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1056        // Perform the reorg to properly handle the unwind
1057        self.canonical_in_memory_state
1058            .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1059
1060        // CRITICAL: Update the canonical head after the reorg
1061        // This ensures get_canonical_head() returns the correct block
1062        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1063
1064        debug!(
1065            target: "engine::tree",
1066            block_number = new_head_number,
1067            block_hash = ?new_head_hash,
1068            "Successfully loaded canonical ancestor into memory via reorg"
1069        );
1070
1071        Ok(())
1072    }
1073
1074    /// Handles chain advance or same height scenarios.
1075    fn handle_chain_advance_or_same_height(
1076        &self,
1077        canonical_header: &SealedHeader<N::BlockHeader>,
1078    ) -> ProviderResult<()> {
1079        // Load the block into memory if it's not already present
1080        self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1081
1082        // Update the canonical head header
1083        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1084
1085        Ok(())
1086    }
1087
1088    /// Ensures a block is loaded into memory if not already present.
1089    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1090        // Check if block is already in memory
1091        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1092            return Ok(());
1093        }
1094
1095        // Load the block from storage
1096        let executed_block = self.canonical_block_by_hash(block_hash)?;
1097        self.canonical_in_memory_state
1098            .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1099
1100        debug!(
1101            target: "engine::tree",
1102            block_number,
1103            block_hash = ?block_hash,
1104            "Added canonical block to in-memory state"
1105        );
1106
1107        Ok(())
1108    }
1109
1110    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1111    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1112    /// chain.
1113    ///
1114    /// These responses should adhere to the [Engine API Spec for
1115    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1116    ///
1117    /// Returns an error if an internal error occurred like a database error.
1118    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1119    fn on_forkchoice_updated(
1120        &mut self,
1121        state: ForkchoiceState,
1122        attrs: Option<T::PayloadAttributes>,
1123    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1124        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1125
1126        self.building_payload = attrs.is_some() && self.config.suppress_persistence_during_build();
1127
1128        // Record metrics
1129        self.record_forkchoice_metrics();
1130
1131        // Pre-validation of forkchoice state
1132        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1133            return Ok(TreeOutcome::new(early_result));
1134        }
1135
1136        // Return early if we are on the correct fork
1137        if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1138            return Ok(result);
1139        }
1140
1141        // Attempt to apply a chain update when the head differs from our canonical chain.
1142        // This handles reorgs and chain extensions by making the specified head canonical.
1143        if let Some(result) = self.apply_chain_update(state, &attrs)? {
1144            return Ok(result);
1145        }
1146
1147        // Fallback that ensures to catch up to the network's state.
1148        self.handle_missing_block(state)
1149    }
1150
1151    /// Records metrics for forkchoice updated calls
1152    fn record_forkchoice_metrics(&self) {
1153        self.canonical_in_memory_state.on_forkchoice_update_received();
1154    }
1155
1156    /// Pre-validates the forkchoice state and returns early if validation fails.
1157    ///
1158    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1159    /// returned. Returns `None` if validation passes and processing should continue.
1160    fn validate_forkchoice_state(
1161        &mut self,
1162        state: ForkchoiceState,
1163    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1164        if state.head_block_hash.is_zero() {
1165            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1166        }
1167
1168        // Check if the new head hash is connected to any ancestor that we previously marked as
1169        // invalid
1170        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1171        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1172            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1173        }
1174
1175        if !self.backfill_sync_state.is_idle() {
1176            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1177            // exclusive access to the database
1178            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1179            return Ok(Some(OnForkChoiceUpdated::syncing()));
1180        }
1181
1182        Ok(None)
1183    }
1184
1185    /// Handles the case where the forkchoice head is already canonical.
1186    ///
1187    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1188    /// processing is complete. Returns `None` if the head is not canonical and processing
1189    /// should continue.
1190    fn handle_canonical_head(
1191        &self,
1192        state: ForkchoiceState,
1193        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1194    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1195        // Process the forkchoice update by trying to make the head block canonical
1196        //
1197        // We can only process this forkchoice update if:
1198        // - we have the `head` block
1199        // - the head block is part of a chain that is connected to the canonical chain. This
1200        //   includes reorgs.
1201        //
1202        // Performing a FCU involves:
1203        // - marking the FCU's head block as canonical
1204        // - updating in memory state to reflect the new canonical chain
1205        // - updating canonical state trackers
1206        // - emitting a canonicalization event for the new chain (including reorg)
1207        // - if we have payload attributes, delegate them to the payload service
1208
1209        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1210            return Ok(None);
1211        }
1212
1213        trace!(target: "engine::tree", "fcu head hash is already canonical");
1214
1215        // Update the safe and finalized blocks and ensure their values are valid
1216        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1217            // safe or finalized hashes are invalid
1218            return Ok(Some(TreeOutcome::new(outcome)));
1219        }
1220
1221        // Process payload attributes if the head is already canonical
1222        if let Some(attr) = attrs {
1223            let tip = self
1224                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1225                .ok_or_else(|| {
1226                    // If we can't find the canonical block, then something is wrong and we need
1227                    // to return an error
1228                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1229                })?;
1230            // Clone only when we actually need to process the attributes
1231            let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1232            return Ok(Some(TreeOutcome::new(updated)));
1233        }
1234
1235        // The head block is already canonical
1236        Ok(Some(Self::valid_outcome(state)))
1237    }
1238
1239    /// Applies chain update for the new head block and processes payload attributes.
1240    ///
1241    /// This method handles the case where the forkchoice head differs from our current canonical
1242    /// head. It attempts to make the specified head block canonical by:
1243    /// - Checking if the head is already part of the canonical chain
1244    /// - Applying chain reorganizations (reorgs) if necessary
1245    /// - Processing payload attributes if provided
1246    /// - Returning the appropriate forkchoice update response
1247    ///
1248    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1249    /// Returns `None` if no chain update was needed or possible.
1250    fn apply_chain_update(
1251        &mut self,
1252        state: ForkchoiceState,
1253        attrs: &Option<T::PayloadAttributes>,
1254    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1255        // Check if the head is already part of the canonical chain
1256        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1257            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1258
1259            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1260            // own chain at will, so we need to always trigger a new payload job if requested.
1261            if self.engine_kind.is_opstack() ||
1262                self.config.always_process_payload_attributes_on_canonical_head()
1263            {
1264                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1265                // part of the canonical chain. We need to update the latest block state to reflect
1266                // the canonical ancestor. This ensures that state providers and the transaction
1267                // pool operate with the correct chain state after forkchoice update processing, and
1268                // new payloads built on the reorg'd head will be added to the tree immediately.
1269                if self.config.unwind_canonical_header() {
1270                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1271                }
1272
1273                if let Some(attr) = attrs {
1274                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1275                    // Clone only when we actually need to process the attributes
1276                    let updated =
1277                        self.process_payload_attributes(attr.clone(), &canonical_header, state);
1278                    return Ok(Some(TreeOutcome::new(updated)));
1279                }
1280            }
1281
1282            // According to the Engine API specification, client software MAY skip an update of the
1283            // forkchoice state and MUST NOT begin a payload build process if
1284            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1285            // of canonical chain, i.e. the ancestor passed payload validation process
1286            // and deemed `VALID`. In the case of such an event, client software MUST
1287            // return `{payloadStatus: {status: VALID, latestValidHash:
1288            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1289
1290            // The head block is already canonical and we're not processing payload attributes,
1291            // so we're not triggering a payload job and can return right away
1292            return Ok(Some(Self::valid_outcome(state)));
1293        }
1294
1295        // Ensure we can apply a new chain update for the head block
1296        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1297            let tip = chain_update.tip().clone_sealed_header();
1298            self.on_canonical_chain_update(chain_update);
1299
1300            // Update the safe and finalized blocks and ensure their values are valid
1301            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1302                // safe or finalized hashes are invalid
1303                return Ok(Some(TreeOutcome::new(outcome)));
1304            }
1305
1306            if let Some(attr) = attrs {
1307                // Clone only when we actually need to process the attributes
1308                let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1309                return Ok(Some(TreeOutcome::new(updated)));
1310            }
1311
1312            return Ok(Some(Self::valid_outcome(state)));
1313        }
1314
1315        Ok(None)
1316    }
1317
1318    /// Handles the case where the head block is missing and needs to be downloaded.
1319    ///
1320    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1321    /// Returns a `TreeOutcome` with syncing status and download event.
1322    fn handle_missing_block(
1323        &self,
1324        state: ForkchoiceState,
1325    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1326        // We don't have the block to perform the forkchoice update
1327        // We assume the FCU is valid and at least the head is missing,
1328        // so we need to start syncing to it
1329        //
1330        // find the appropriate target to sync to, if we don't have the safe block hash then we
1331        // start syncing to the safe block via backfill first
1332        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1333        // check that safe block is valid and missing
1334        !state.safe_block_hash.is_zero() &&
1335        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1336        {
1337            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1338            state.safe_block_hash
1339        } else {
1340            state.head_block_hash
1341        };
1342
1343        let target = self.lowest_buffered_ancestor_or(target);
1344        trace!(target: "engine::tree", %target, "downloading missing block");
1345
1346        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1347            PayloadStatusEnum::Syncing,
1348        )))
1349        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1350    }
1351
1352    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1353    /// the current persistence action while we're removing blocks.
1354    fn remove_blocks(&mut self, new_tip_num: u64) {
1355        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1356        if new_tip_num < self.persistence_state.last_persisted_block.number {
1357            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1358            let (tx, rx) = crossbeam_channel::bounded(1);
1359            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1360            self.persistence_state.start_remove(new_tip_num, rx);
1361        }
1362    }
1363
1364    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1365    /// the current persistence action while we're saving blocks.
1366    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1367        if blocks_to_persist.is_empty() {
1368            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1369            return
1370        }
1371
1372        // NOTE: checked non-empty above
1373        let highest_num_hash = blocks_to_persist
1374            .iter()
1375            .max_by_key(|block| block.recovered_block().number())
1376            .map(|b| b.recovered_block().num_hash())
1377            .expect("Checked non-empty persisting blocks");
1378
1379        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1380        let (tx, rx) = crossbeam_channel::bounded(1);
1381        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1382
1383        self.persistence_state.start_save(highest_num_hash, rx);
1384    }
1385
1386    /// Triggers new persistence actions if no persistence task is currently in progress.
1387    ///
1388    /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
1389    /// Persistence completion is handled separately via the `wait_for_event` method.
1390    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1391        if !self.persistence_state.in_progress() {
1392            if let Some(new_tip_num) = self.find_disk_reorg()? {
1393                self.remove_blocks(new_tip_num)
1394            } else if self.should_persist() {
1395                let blocks_to_persist =
1396                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1397                self.persist_blocks(blocks_to_persist);
1398            }
1399        }
1400
1401        Ok(())
1402    }
1403
1404    /// Finishes termination by persisting all remaining blocks and signaling completion.
1405    ///
1406    /// This blocks until all persistence is complete. Always signals completion,
1407    /// even if an error occurs.
1408    fn finish_termination(
1409        &mut self,
1410        pending_termination: oneshot::Sender<()>,
1411    ) -> Result<(), AdvancePersistenceError> {
1412        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1413        let result = self.persist_until_complete();
1414        let _ = pending_termination.send(());
1415        result
1416    }
1417
1418    /// Persists all remaining blocks until none are left.
1419    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1420        loop {
1421            // Wait for any in-progress persistence to complete (blocking)
1422            if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1423                debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1424                let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1425                self.on_persistence_complete(result, start_time)?;
1426            }
1427
1428            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1429
1430            if blocks_to_persist.is_empty() {
1431                debug!(target: "engine::tree", "persistence complete, signaling termination");
1432                return Ok(())
1433            }
1434
1435            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1436            self.persist_blocks(blocks_to_persist);
1437        }
1438    }
1439
1440    /// Tries to poll for a completed persistence task (non-blocking).
1441    ///
1442    /// Returns `true` if a persistence task was completed, `false` otherwise.
1443    fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1444        let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1445            return Ok(false);
1446        };
1447
1448        match rx.try_recv() {
1449            Ok(result) => {
1450                self.on_persistence_complete(result, start_time)?;
1451                Ok(true)
1452            }
1453            Err(crossbeam_channel::TryRecvError::Empty) => {
1454                // Not ready yet, put it back
1455                self.persistence_state.rx = Some((rx, start_time, action));
1456                Ok(false)
1457            }
1458            Err(crossbeam_channel::TryRecvError::Disconnected) => {
1459                Err(AdvancePersistenceError::ChannelClosed)
1460            }
1461        }
1462    }
1463
1464    /// Handles a completed persistence task.
1465    fn on_persistence_complete(
1466        &mut self,
1467        result: PersistenceResult,
1468        start_time: Instant,
1469    ) -> Result<(), AdvancePersistenceError> {
1470        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1471
1472        let commit_duration = result.commit_duration;
1473        let Some(BlockNumHash {
1474            hash: last_persisted_block_hash,
1475            number: last_persisted_block_number,
1476        }) = result.last_block
1477        else {
1478            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1479            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1480            return Ok(())
1481        };
1482
1483        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1484        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1485
1486        // Evict trie changesets for blocks below the eviction threshold.
1487        // Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
1488        // the finalized block if set.
1489        let min_threshold =
1490            last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1491        let eviction_threshold =
1492            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1493                // Use the minimum of finalized block and retention threshold to be conservative
1494                finalized.number.min(min_threshold)
1495            } else {
1496                // When finalized is not set (e.g., on L2s), use the retention threshold
1497                min_threshold
1498            };
1499        debug!(
1500            target: "engine::tree",
1501            last_persisted = last_persisted_block_number,
1502            finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1503            eviction_threshold,
1504            "Evicting changesets below threshold"
1505        );
1506        self.changeset_cache.evict(eviction_threshold);
1507
1508        // Invalidate cached overlay since the anchor has changed
1509        self.state.tree_state.invalidate_cached_overlay();
1510
1511        self.on_new_persisted_block()?;
1512
1513        // Re-prepare overlay for the current canonical head with the new anchor.
1514        // Spawn a background task to trigger computation so it's ready when the next payload
1515        // arrives.
1516        if let Some(prepared) = self.state.tree_state.prepare_canonical_overlay() {
1517            self.runtime.spawn_blocking_named("prepare-overlay", move || {
1518                let _ = prepared.overlay.get(prepared.anchor_hash);
1519            });
1520        }
1521
1522        self.purge_timing_stats(last_persisted_block_number, commit_duration);
1523
1524        Ok(())
1525    }
1526
1527    /// Handles a message from the engine.
1528    ///
1529    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1530    fn on_engine_message(
1531        &mut self,
1532        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1533    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1534        match msg {
1535            FromEngine::Event(event) => match event {
1536                FromOrchestrator::BackfillSyncStarted => {
1537                    debug!(target: "engine::tree", "received backfill sync started event");
1538                    self.backfill_sync_state = BackfillSyncState::Active;
1539                }
1540                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1541                    self.on_backfill_sync_finished(ctrl)?;
1542                }
1543                FromOrchestrator::Terminate { tx } => {
1544                    debug!(target: "engine::tree", "received terminate request");
1545                    if let Err(err) = self.finish_termination(tx) {
1546                        error!(target: "engine::tree", %err, "Termination failed");
1547                    }
1548                    return Ok(ops::ControlFlow::Break(()))
1549                }
1550            },
1551            FromEngine::Request(request) => {
1552                match request {
1553                    EngineApiRequest::InsertExecutedBlock(payload) => {
1554                        let block_num_hash = payload.recovered_block.num_hash();
1555                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1556                            // outdated block that can be skipped
1557                            return Ok(ops::ControlFlow::Continue(()))
1558                        }
1559
1560                        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
1561                            // block already known to the tree (e.g. delivered via newPayload first)
1562                            return Ok(ops::ControlFlow::Continue(()))
1563                        }
1564
1565                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1566                        let now = Instant::now();
1567
1568                        let block = match self
1569                            .payload_validator
1570                            .on_inserted_executed_block(payload, &self.state)
1571                        {
1572                            Ok(block) => block,
1573                            Err(err) => {
1574                                warn!(target: "engine::tree", %err, block=?block_num_hash, "Failed to insert already executed block");
1575                                return Ok(ops::ControlFlow::Continue(()))
1576                            }
1577                        };
1578
1579                        // if the parent is the canonical head, we can insert the block as the
1580                        // pending block
1581                        if self.state.tree_state.canonical_block_hash() ==
1582                            block.recovered_block().parent_hash()
1583                        {
1584                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1585                            self.canonical_in_memory_state.set_pending_block(block.clone());
1586                        }
1587
1588                        self.state.tree_state.insert_executed(block.clone());
1589                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1590                        self.emit_event(EngineApiEvent::BeaconConsensus(
1591                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1592                        ));
1593                    }
1594                    EngineApiRequest::Beacon(request) => {
1595                        match request {
1596                            BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1597                                let has_attrs = payload_attrs.is_some();
1598
1599                                let start = Instant::now();
1600                                let mut output = self.on_forkchoice_updated(state, payload_attrs);
1601
1602                                if let Ok(res) = &mut output {
1603                                    // track last received forkchoice state
1604                                    self.state
1605                                        .forkchoice_state_tracker
1606                                        .set_latest(state, res.outcome.forkchoice_status());
1607
1608                                    // emit an event about the handled FCU
1609                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1610                                        state,
1611                                        res.outcome.forkchoice_status(),
1612                                    ));
1613
1614                                    // handle the event if any
1615                                    self.on_maybe_tree_event(res.event.take())?;
1616                                }
1617
1618                                if let Err(ref err) = output {
1619                                    error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1620                                }
1621
1622                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1623                                    start,
1624                                    &mut self.metrics.engine.new_payload.latest_finish_at,
1625                                    has_attrs,
1626                                    &output,
1627                                );
1628
1629                                if let Err(err) =
1630                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1631                                {
1632                                    self.metrics
1633                                        .engine
1634                                        .failed_forkchoice_updated_response_deliveries
1635                                        .increment(1);
1636                                    warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1637                                }
1638                            }
1639                            BeaconEngineMessage::NewPayload { payload, tx } => {
1640                                let start = Instant::now();
1641                                let gas_used = payload.gas_used();
1642                                let num_hash = payload.num_hash();
1643                                let mut output = self.on_new_payload(payload);
1644                                self.metrics.engine.new_payload.update_response_metrics(
1645                                    start,
1646                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1647                                    &output,
1648                                    gas_used,
1649                                );
1650
1651                                let maybe_event =
1652                                    output.as_mut().ok().and_then(|out| out.event.take());
1653
1654                                // emit response
1655                                if let Err(err) =
1656                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1657                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1658                                    }))
1659                                {
1660                                    warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1661                                    self.metrics
1662                                        .engine
1663                                        .failed_new_payload_response_deliveries
1664                                        .increment(1);
1665                                }
1666
1667                                // handle the event if any
1668                                self.on_maybe_tree_event(maybe_event)?;
1669                            }
1670                            BeaconEngineMessage::RethNewPayload {
1671                                payload,
1672                                wait_for_persistence,
1673                                wait_for_caches,
1674                                tx,
1675                                enqueued_at,
1676                            } => {
1677                                debug!(
1678                                    target: "engine::tree",
1679                                    wait_for_persistence,
1680                                    wait_for_caches,
1681                                    "Processing reth_newPayload"
1682                                );
1683
1684                                let backpressure_wait = enqueued_at.elapsed();
1685
1686                                let explicit_persistence_wait = if wait_for_persistence {
1687                                    let pending_persistence = self.persistence_state.rx.take();
1688                                    if let Some((rx, start_time, _action)) = pending_persistence {
1689                                        let (persistence_tx, persistence_rx) =
1690                                            std::sync::mpsc::channel();
1691                                        self.runtime.spawn_blocking_named(
1692                                            "wait-persist",
1693                                            move || {
1694                                                let start = Instant::now();
1695                                                let result = rx
1696                                                    .recv()
1697                                                    .expect("persistence state channel closed");
1698                                                let _ = persistence_tx.send((
1699                                                    result,
1700                                                    start_time,
1701                                                    start.elapsed(),
1702                                                ));
1703                                            },
1704                                        );
1705                                        let (result, start_time, wait_duration) = persistence_rx
1706                                            .recv()
1707                                            .expect("persistence result channel closed");
1708                                        let _ = self.on_persistence_complete(result, start_time);
1709                                        wait_duration
1710                                    } else {
1711                                        Duration::ZERO
1712                                    }
1713                                } else {
1714                                    Duration::ZERO
1715                                };
1716
1717                                let cache_wait = wait_for_caches
1718                                    .then(|| self.payload_validator.wait_for_caches());
1719
1720                                let start = Instant::now();
1721                                let gas_used = payload.gas_used();
1722                                let num_hash = payload.num_hash();
1723                                let mut output = self.on_new_payload(payload);
1724                                let latency = start.elapsed();
1725                                self.metrics.engine.new_payload.update_response_metrics(
1726                                    start,
1727                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1728                                    &output,
1729                                    gas_used,
1730                                );
1731
1732                                let maybe_event =
1733                                    output.as_mut().ok().and_then(|out| out.event.take());
1734
1735                                let timings = NewPayloadTimings {
1736                                    latency,
1737                                    persistence_wait: backpressure_wait + explicit_persistence_wait,
1738                                    execution_cache_wait: cache_wait
1739                                        .map(|wait| wait.execution_cache),
1740                                    sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1741                                };
1742                                if let Err(err) =
1743                                    tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1744                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1745                                    }))
1746                                {
1747                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1748                                    self.metrics
1749                                        .engine
1750                                        .failed_new_payload_response_deliveries
1751                                        .increment(1);
1752                                }
1753
1754                                self.on_maybe_tree_event(maybe_event)?;
1755                            }
1756                        }
1757                    }
1758                }
1759            }
1760            FromEngine::DownloadedBlocks(blocks) => {
1761                if let Some(event) = self.on_downloaded(blocks)? {
1762                    self.on_tree_event(event)?;
1763                }
1764            }
1765        }
1766        Ok(ops::ControlFlow::Continue(()))
1767    }
1768
1769    /// Invoked if the backfill sync has finished to target.
1770    ///
1771    /// At this point we consider the block synced to the backfill target.
1772    ///
1773    /// Checks the tracked finalized block against the block on disk and requests another backfill
1774    /// run if the distance to the tip exceeds the threshold for another backfill run.
1775    ///
1776    /// This will also do the necessary housekeeping of the tree state, this includes:
1777    ///  - removing all blocks below the backfill height
1778    ///  - resetting the canonical in-memory state
1779    ///
1780    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1781    /// target block.
1782    fn on_backfill_sync_finished(
1783        &mut self,
1784        ctrl: ControlFlow,
1785    ) -> Result<(), InsertBlockFatalError> {
1786        debug!(target: "engine::tree", "received backfill sync finished event");
1787        self.backfill_sync_state = BackfillSyncState::Idle;
1788
1789        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1790        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1791            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1792            // update the `invalid_headers` cache with the new invalid header
1793            self.state.invalid_headers.insert(**bad_block);
1794
1795            // if this was an unwind then the target is the new height
1796            Some(*target)
1797        } else {
1798            // backfill height is the block number that the backfill finished at
1799            ctrl.block_number()
1800        };
1801
1802        // backfill height is the block number that the backfill finished at
1803        let Some(backfill_height) = backfill_height else { return Ok(()) };
1804
1805        // state house keeping after backfill sync
1806        // remove all executed blocks below the backfill height
1807        //
1808        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1809        // before that
1810        let Some(backfill_num_hash) = self
1811            .provider
1812            .block_hash(backfill_height)?
1813            .map(|hash| BlockNumHash { hash, number: backfill_height })
1814        else {
1815            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1816            return Ok(())
1817        };
1818
1819        if ctrl.is_unwind() {
1820            // the node reset so we need to clear everything above that height so that backfill
1821            // height is the new canonical block.
1822            self.state.tree_state.reset(backfill_num_hash)
1823        } else {
1824            self.state.tree_state.remove_until(
1825                backfill_num_hash,
1826                self.persistence_state.last_persisted_block.hash,
1827                Some(backfill_num_hash),
1828            );
1829        }
1830
1831        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1832        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1833
1834        // remove all buffered blocks below the backfill height
1835        self.state.buffer.remove_old_blocks(backfill_height);
1836        self.purge_timing_stats(backfill_height, None);
1837        // we remove all entries because now we're synced to the backfill target and consider this
1838        // the canonical chain
1839        self.canonical_in_memory_state.clear_state();
1840
1841        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1842            // update the tracked chain height, after backfill sync both the canonical height and
1843            // persisted height are the same
1844            self.state.tree_state.set_canonical_head(new_head.num_hash());
1845            self.persistence_state.finish(new_head.hash(), new_head.number());
1846
1847            // update the tracked canonical head
1848            self.canonical_in_memory_state.set_canonical_head(new_head);
1849        }
1850
1851        // check if we need to run backfill again by comparing the most recent finalized height to
1852        // the backfill height
1853        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1854        else {
1855            return Ok(())
1856        };
1857        if sync_target_state.finalized_block_hash.is_zero() {
1858            // no finalized block, can't check distance
1859            return Ok(())
1860        }
1861        // get the block number of the finalized block, if we have it
1862        let newest_finalized = self
1863            .state
1864            .buffer
1865            .block(&sync_target_state.finalized_block_hash)
1866            .map(|block| block.number());
1867
1868        // The block number that the backfill finished at - if the progress or newest
1869        // finalized is None then we can't check the distance anyways.
1870        //
1871        // If both are Some, we perform another distance check and return the desired
1872        // backfill target
1873        if let Some(backfill_target) =
1874            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1875                // Determines whether or not we should run backfill again, in case
1876                // the new gap is still large enough and requires running backfill again
1877                self.backfill_sync_target(progress, finalized_number, None)
1878            })
1879        {
1880            // request another backfill run
1881            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1882                backfill_target.into(),
1883            )));
1884            return Ok(())
1885        };
1886
1887        // Check if there are more blocks to sync between current head and FCU target
1888        if let Some(lowest_buffered) =
1889            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1890        {
1891            let current_head_num = self.state.tree_state.current_canonical_head.number;
1892            let target_head_num = lowest_buffered.number();
1893
1894            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1895            {
1896                // There are blocks between current head and FCU target, download them
1897                debug!(
1898                    target: "engine::tree",
1899                    %current_head_num,
1900                    %target_head_num,
1901                    %distance,
1902                    "Backfill complete, downloading remaining blocks to reach FCU target"
1903                );
1904
1905                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1906                    lowest_buffered.parent_hash(),
1907                    distance,
1908                )));
1909                return Ok(());
1910            }
1911        } else {
1912            // We don't have the head block or any of its ancestors buffered. Request
1913            // a download for the head block which will then trigger further sync.
1914            debug!(
1915                target: "engine::tree",
1916                head_hash = %sync_target_state.head_block_hash,
1917                "Backfill complete but head block not buffered, requesting download"
1918            );
1919            self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1920                sync_target_state.head_block_hash,
1921            )));
1922            return Ok(());
1923        }
1924
1925        // try to close the gap by executing buffered blocks that are child blocks of the new head
1926        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1927    }
1928
1929    /// Attempts to make the given target canonical.
1930    ///
1931    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1932    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1933        if let Some(chain_update) = self.on_new_head(target)? {
1934            self.on_canonical_chain_update(chain_update);
1935        }
1936
1937        self.on_canonicalized_sync_target(target);
1938
1939        Ok(())
1940    }
1941
1942    /// Applies the tracked forkchoice state once its sync target head becomes canonical.
1943    fn on_canonicalized_sync_target(&mut self, target: B256) {
1944        let Some(sync_target_state) = self
1945            .state
1946            .forkchoice_state_tracker
1947            .sync_target_state()
1948            .filter(|state| state.head_block_hash == target)
1949        else {
1950            return;
1951        };
1952
1953        if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
1954            debug!(
1955                target: "engine::tree",
1956                head = %sync_target_state.head_block_hash,
1957                safe = %sync_target_state.safe_block_hash,
1958                finalized = %sync_target_state.finalized_block_hash,
1959                ?outcome,
1960                "Canonicalized sync target head before safe/finalized could be applied"
1961            );
1962            return;
1963        }
1964
1965        self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
1966    }
1967
1968    /// Convenience function to handle an optional tree event.
1969    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1970        if let Some(event) = event {
1971            self.on_tree_event(event)?;
1972        }
1973
1974        Ok(())
1975    }
1976
1977    /// Handles a tree event.
1978    ///
1979    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1980    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1981        match event {
1982            TreeEvent::TreeAction(action) => match action {
1983                TreeAction::MakeCanonical { sync_target_head } => {
1984                    self.make_canonical(sync_target_head)?;
1985                }
1986            },
1987            TreeEvent::BackfillAction(action) => {
1988                self.emit_event(EngineApiEvent::BackfillAction(action));
1989            }
1990            TreeEvent::Download(action) => {
1991                self.emit_event(EngineApiEvent::Download(action));
1992            }
1993        }
1994
1995        Ok(())
1996    }
1997
1998    /// Removes timing stats for blocks at or below `below_number`.
1999    ///
2000    /// No-op when detailed block logging is disabled (no stats are recorded in that case).
2001    /// When `commit_duration` is provided and a slow block threshold is configured, checks
2002    /// each removed block against the threshold and emits a [`ConsensusEngineEvent::SlowBlock`]
2003    /// event for blocks that exceed it.
2004    fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
2005        let threshold = self.config.slow_block_threshold();
2006        let check_slow = commit_duration.is_some() && threshold.is_some();
2007
2008        // Two-pass: collect keys first because emit_event borrows &mut self.
2009        let keys_to_remove: Vec<B256> = self
2010            .execution_timing_stats
2011            .iter()
2012            .filter(|(_, stats)| stats.block_number <= below_number)
2013            .map(|(k, _)| *k)
2014            .collect();
2015
2016        for key in keys_to_remove {
2017            let stats = self.execution_timing_stats.remove(&key).expect("key just found");
2018            if check_slow {
2019                let commit_dur = commit_duration.expect("checked above");
2020                // state_read_duration is already included in execution_duration
2021                let total_duration =
2022                    stats.execution_duration + stats.state_hash_duration + commit_dur;
2023
2024                if total_duration > threshold.expect("checked above") {
2025                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2026                        stats,
2027                        commit_duration: Some(commit_dur),
2028                        total_duration,
2029                    }));
2030                }
2031            }
2032        }
2033    }
2034
2035    /// Emits an outgoing event to the engine.
2036    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
2037        let event = event.into();
2038
2039        if event.is_backfill_action() {
2040            debug_assert_eq!(
2041                self.backfill_sync_state,
2042                BackfillSyncState::Idle,
2043                "backfill action should only be emitted when backfill is idle"
2044            );
2045
2046            if self.persistence_state.in_progress() {
2047                // backfill sync and persisting data are mutually exclusive, so we can't start
2048                // backfill while we're still persisting
2049                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
2050                return
2051            }
2052
2053            self.backfill_sync_state = BackfillSyncState::Pending;
2054            self.metrics.engine.pipeline_runs.increment(1);
2055            debug!(target: "engine::tree", "emitting backfill action event");
2056        }
2057
2058        let _ = self.outgoing.send(event).inspect_err(
2059            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2060        );
2061    }
2062
2063    /// Returns true if the canonical chain length minus the last persisted
2064    /// block is greater than or equal to the persistence threshold,
2065    /// backfill is not running, and no payload is currently being built.
2066    pub const fn should_persist(&self) -> bool {
2067        if self.building_payload {
2068            return false
2069        }
2070
2071        if !self.backfill_sync_state.is_idle() {
2072            // can't persist if backfill is running
2073            return false
2074        }
2075
2076        let min_block = self.persistence_state.last_persisted_block.number;
2077        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2078            self.config.persistence_threshold()
2079    }
2080
2081    /// Returns a batch of consecutive canonical blocks to persist in the range
2082    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
2083    fn get_canonical_blocks_to_persist(
2084        &self,
2085        target: PersistTarget,
2086    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2087        // We will calculate the state root using the database, so we need to be sure there are no
2088        // changes
2089        debug_assert!(!self.persistence_state.in_progress());
2090
2091        let mut blocks_to_persist = Vec::new();
2092        let mut current_hash = self.state.tree_state.canonical_block_hash();
2093        let last_persisted_number = self.persistence_state.last_persisted_block.number;
2094        let canonical_head_number = self.state.tree_state.canonical_block_number();
2095
2096        let target_number = match target {
2097            PersistTarget::Head => canonical_head_number,
2098            PersistTarget::Threshold => {
2099                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2100            }
2101        };
2102
2103        debug!(
2104            target: "engine::tree",
2105            ?current_hash,
2106            ?last_persisted_number,
2107            ?canonical_head_number,
2108            ?target_number,
2109            "Returning canonical blocks to persist"
2110        );
2111        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
2112            if block.recovered_block().number() <= last_persisted_number {
2113                break;
2114            }
2115
2116            if block.recovered_block().number() <= target_number {
2117                blocks_to_persist.push(block.clone());
2118            }
2119
2120            current_hash = block.recovered_block().parent_hash();
2121        }
2122
2123        // Reverse the order so that the oldest block comes first
2124        blocks_to_persist.reverse();
2125
2126        Ok(blocks_to_persist)
2127    }
2128
2129    /// This clears the blocks from the in-memory tree state that have been persisted to the
2130    /// database.
2131    ///
2132    /// This also updates the canonical in-memory state to reflect the newest persisted block
2133    /// height.
2134    ///
2135    /// Assumes that `finish` has been called on the `persistence_state` at least once
2136    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2137        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
2138        // state.
2139        if let Some(remove_above) = self.find_disk_reorg()? {
2140            self.remove_blocks(remove_above);
2141            return Ok(())
2142        }
2143
2144        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2145        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2146        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2147            number: self.persistence_state.last_persisted_block.number,
2148            hash: self.persistence_state.last_persisted_block.hash,
2149        });
2150        Ok(())
2151    }
2152
2153    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
2154    ///
2155    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
2156    /// or the database. If the required historical data (such as trie change sets) has been
2157    /// pruned for a given block, this operation will return an error. On archive nodes, it
2158    /// can retrieve any block.
2159    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2160    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2161        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2162        // check memory first
2163        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2164            return Ok(block.clone())
2165        }
2166
2167        let (block, senders) = self
2168            .provider
2169            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2170            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2171            .split_sealed();
2172        let mut execution_output = self
2173            .provider
2174            .get_state(block.header().number())?
2175            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2176        let hashed_state = self.provider.hashed_post_state(execution_output.state());
2177
2178        debug!(
2179            target: "engine::tree",
2180            number = ?block.number(),
2181            "computing block trie updates",
2182        );
2183        let db_provider = self.provider.database_provider_ro()?;
2184        let trie_updates = reth_trie_db::compute_block_trie_updates(
2185            &self.changeset_cache,
2186            &db_provider,
2187            block.number(),
2188        )?;
2189
2190        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2191        let sorted_trie_updates = Arc::new(trie_updates);
2192        // Skip building trie input and anchor for DB-loaded blocks.
2193        let trie_data =
2194            ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2195
2196        let execution_output = Arc::new(BlockExecutionOutput {
2197            state: execution_output.bundle,
2198            result: BlockExecutionResult {
2199                receipts: execution_output.receipts.pop().unwrap_or_default(),
2200                requests: execution_output.requests.pop().unwrap_or_default(),
2201                gas_used: block.gas_used(),
2202                blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2203            },
2204        });
2205
2206        Ok(ExecutedBlock::new(
2207            Arc::new(RecoveredBlock::new_sealed(block, senders)),
2208            execution_output,
2209            trie_data,
2210        ))
2211    }
2212
2213    /// Returns `true` if a block with the given hash is known, either in memory or in the
2214    /// database. This is a lightweight existence check that avoids constructing a full
2215    /// [`SealedHeader`].
2216    fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2217        if self.state.tree_state.contains_hash(&hash) {
2218            Ok(true)
2219        } else {
2220            self.provider.is_known(hash)
2221        }
2222    }
2223
2224    /// Return sealed block header from in-memory state or database by hash.
2225    fn sealed_header_by_hash(
2226        &self,
2227        hash: B256,
2228    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2229        // check memory first
2230        let header = self.state.tree_state.sealed_header_by_hash(&hash);
2231
2232        if header.is_some() {
2233            Ok(header)
2234        } else {
2235            self.provider.sealed_header_by_hash(hash)
2236        }
2237    }
2238
2239    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
2240    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
2241    /// not exist in the buffer, this returns the hash that is passed in.
2242    ///
2243    /// Returns the parent hash of the block itself if the block is buffered and has no other
2244    /// buffered ancestors.
2245    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2246        self.state
2247            .buffer
2248            .lowest_ancestor(&hash)
2249            .map(|block| block.parent_hash())
2250            .unwrap_or_else(|| hash)
2251    }
2252
2253    /// If validation fails, the response MUST contain the latest valid hash:
2254    ///
2255    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
2256    ///     conditions:
2257    ///     - It is fully validated and deemed VALID
2258    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
2259    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
2260    ///     conditions are satisfied by a `PoW` block.
2261    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
2262    ///     the above conditions.
2263    fn latest_valid_hash_for_invalid_payload(
2264        &mut self,
2265        parent_hash: B256,
2266    ) -> ProviderResult<Option<B256>> {
2267        // Check if parent exists in side chain or in canonical chain.
2268        if self.has_block_by_hash(parent_hash)? {
2269            return Ok(Some(parent_hash))
2270        }
2271
2272        // iterate over ancestors in the invalid cache
2273        // until we encounter the first valid ancestor
2274        let mut current_hash = parent_hash;
2275        let mut current_block = self.state.invalid_headers.get(&current_hash);
2276        while let Some(block_with_parent) = current_block {
2277            current_hash = block_with_parent.parent;
2278            current_block = self.state.invalid_headers.get(&current_hash);
2279
2280            // If current_header is None, then the current_hash does not have an invalid
2281            // ancestor in the cache, check its presence in blockchain tree
2282            if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2283                return Ok(Some(current_hash))
2284            }
2285        }
2286        Ok(None)
2287    }
2288
2289    /// Prepares the invalid payload response for the given hash, checking the
2290    /// database for the parent hash and populating the payload status with the latest valid hash
2291    /// according to the engine api spec.
2292    fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2293        let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2294            // Edge case: the `latestValid` field is the zero hash if the parent block is the
2295            // terminal PoW block, which we need to identify by looking at the parent's block
2296            // difficulty
2297            Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2298            Some(_) => Some(parent_hash),
2299            None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2300        };
2301
2302        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2303            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2304        })
2305        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2306    }
2307
2308    /// Returns true if the given hash is the last received sync target block.
2309    ///
2310    /// See [`ForkchoiceStateTracker::sync_target_state`]
2311    fn is_sync_target_head(&self, block_hash: B256) -> bool {
2312        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2313            return target.head_block_hash == block_hash
2314        }
2315        false
2316    }
2317
2318    /// Returns true if the given hash is part of the last received sync target fork choice update.
2319    ///
2320    /// See [`ForkchoiceStateTracker::sync_target_state`]
2321    fn is_any_sync_target(&self, block_hash: B256) -> bool {
2322        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2323            return target.contains(block_hash)
2324        }
2325        false
2326    }
2327
2328    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
2329    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
2330    ///
2331    /// Returns a payload status response according to the engine API spec if the block is known to
2332    /// be invalid.
2333    fn check_invalid_ancestor_with_head(
2334        &mut self,
2335        check: B256,
2336        head: &SealedBlock<N::Block>,
2337    ) -> ProviderResult<Option<PayloadStatus>> {
2338        // check if the check hash was previously marked as invalid
2339        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2340
2341        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2342    }
2343
2344    /// Invoked when a new payload received is invalid.
2345    fn on_invalid_new_payload(
2346        &mut self,
2347        head: SealedBlock<N::Block>,
2348        invalid: BlockWithParent,
2349    ) -> ProviderResult<PayloadStatus> {
2350        // populate the latest valid hash field
2351        let status = self.prepare_invalid_response(invalid.parent)?;
2352
2353        // insert the head block into the invalid header cache
2354        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2355        self.emit_event(ConsensusEngineEvent::InvalidBlock {
2356            block: Box::new(head),
2357            error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2358        });
2359
2360        Ok(status)
2361    }
2362
2363    /// Finds any invalid ancestor for the given payload.
2364    ///
2365    /// This function first checks if the block itself is in the invalid headers cache (to
2366    /// avoid re-executing a known-invalid block). Then it walks up the chain of buffered
2367    /// ancestors and checks if any ancestor is marked as invalid.
2368    ///
2369    /// The check works by:
2370    /// 1. Checking if the block hash itself is in the `invalid_headers` map
2371    /// 2. Finding the lowest buffered ancestor for the given block hash
2372    /// 3. If the ancestor is the same as the block hash itself, using the parent hash instead
2373    /// 4. Checking if this ancestor is in the `invalid_headers` map
2374    ///
2375    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
2376    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2377        let parent_hash = payload.parent_hash();
2378        let block_hash = payload.block_hash();
2379
2380        // Check if the block itself is already known to be invalid, avoiding re-execution
2381        if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2382            return Some(entry);
2383        }
2384
2385        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2386        if lowest_buffered_ancestor == block_hash {
2387            lowest_buffered_ancestor = parent_hash;
2388        }
2389
2390        // Check if the block has an invalid ancestor
2391        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2392    }
2393
2394    /// Handles a payload that has an invalid ancestor.
2395    ///
2396    /// This function validates the payload and processes it according to whether it's
2397    /// well-formed or malformed:
2398    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2399    ///    known-bad block, which violates consensus rules
2400    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2401    ///    validated due to its own structural issues
2402    fn handle_invalid_ancestor_payload(
2403        &mut self,
2404        payload: T::ExecutionData,
2405        invalid: BlockWithParent,
2406    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2407        let parent_hash = payload.parent_hash();
2408        let num_hash = payload.num_hash();
2409
2410        // Here we might have 2 cases
2411        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2412        //    remember it as invalid
2413        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2414        //    an error and forget it
2415        let block = match self.payload_validator.convert_payload_to_block(payload) {
2416            Ok(block) => block,
2417            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2418        };
2419
2420        Ok(self.on_invalid_new_payload(block, invalid)?)
2421    }
2422
2423    /// Checks if the given `head` points to an invalid header, which requires a specific response
2424    /// to a forkchoice update.
2425    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2426        // check if the head was previously marked as invalid
2427        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2428
2429        // Try to prepare invalid response, but handle errors gracefully
2430        match self.prepare_invalid_response(header.parent) {
2431            Ok(status) => Ok(Some(status)),
2432            Err(err) => {
2433                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2434                // Return a basic invalid status without latest valid hash
2435                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2436                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2437                })))
2438            }
2439        }
2440    }
2441
2442    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2443    /// and block body itself.
2444    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2445        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2446            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2447            return Err(e)
2448        }
2449
2450        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2451            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2452            return Err(e)
2453        }
2454
2455        Ok(())
2456    }
2457
2458    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2459    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2460    fn try_connect_buffered_blocks(
2461        &mut self,
2462        parent: BlockNumHash,
2463    ) -> Result<(), InsertBlockFatalError> {
2464        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2465
2466        if blocks.is_empty() {
2467            // nothing to append
2468            return Ok(())
2469        }
2470
2471        let now = Instant::now();
2472        let block_count = blocks.len();
2473        for child in blocks {
2474            let child_num_hash = child.num_hash();
2475            match self.insert_block(child) {
2476                Ok(res) => {
2477                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2478                    if self.is_any_sync_target(child_num_hash.hash) &&
2479                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2480                    {
2481                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2482                        // we just inserted a block that we know is part of the canonical chain, so
2483                        // we can make it canonical
2484                        self.make_canonical(child_num_hash.hash)?;
2485                    }
2486                }
2487                Err(err) => {
2488                    if let InsertPayloadError::Block(err) = err {
2489                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2490                        if let Err(fatal) = self.on_insert_block_error(err) {
2491                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2492                            return Err(fatal)
2493                        }
2494                    }
2495                }
2496            }
2497        }
2498
2499        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2500        Ok(())
2501    }
2502
2503    /// Pre-validates the block and inserts it into the buffer.
2504    fn buffer_block(
2505        &mut self,
2506        block: SealedBlock<N::Block>,
2507    ) -> Result<(), InsertBlockError<N::Block>> {
2508        if let Err(err) = self.validate_block(&block) {
2509            return Err(InsertBlockError::consensus_error(err, block))
2510        }
2511        self.state.buffer.insert_block(block);
2512        Ok(())
2513    }
2514
2515    /// Returns true if the distance from the local tip to the block is greater than the configured
2516    /// threshold.
2517    ///
2518    /// If the `local_tip` is greater than the `block`, then this will return false.
2519    #[inline]
2520    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2521        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2522    }
2523
2524    /// Returns how far the local tip is from the given block. If the local tip is at the same
2525    /// height or its block number is greater than the given block, this returns None.
2526    #[inline]
2527    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2528        if block > local_tip {
2529            Some(block - local_tip)
2530        } else {
2531            None
2532        }
2533    }
2534
2535    /// Returns the target hash to sync to if the distance from the local tip to the block is
2536    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2537    /// that block already).
2538    ///
2539    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2540    /// (missing) finalized block.
2541    fn backfill_sync_target(
2542        &self,
2543        canonical_tip_num: u64,
2544        target_block_number: u64,
2545        downloaded_block: Option<BlockNumHash>,
2546    ) -> Option<B256> {
2547        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2548
2549        // check if the downloaded block is the tracked finalized block
2550        let exceeds_backfill_threshold =
2551            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2552                // if we downloaded the finalized block we can now check how far we're off
2553                (Some(downloaded_block), Some(state))
2554                    if downloaded_block.hash == state.finalized_block_hash =>
2555                {
2556                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2557                }
2558                _ => match sync_target_state
2559                    .as_ref()
2560                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2561                {
2562                    Some(buffered_finalized) => {
2563                        // if we have buffered the finalized block, we should check how far we're
2564                        // off
2565                        self.exceeds_backfill_run_threshold(
2566                            canonical_tip_num,
2567                            buffered_finalized.number(),
2568                        )
2569                    }
2570                    None => {
2571                        // check if the distance exceeds the threshold for backfill sync
2572                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2573                    }
2574                },
2575            };
2576
2577        // if the number of missing blocks is greater than the max, trigger backfill
2578        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2579            // if we have already canonicalized the finalized block, we should skip backfill
2580            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2581                Err(err) => {
2582                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2583                }
2584                Ok(None) => {
2585                    // ensure the finalized block is known (not the zero hash)
2586                    if !state.finalized_block_hash.is_zero() {
2587                        // we don't have the block yet and the distance exceeds the allowed
2588                        // threshold
2589                        return Some(state.finalized_block_hash)
2590                    }
2591
2592                    // OPTIMISTIC SYNCING
2593                    //
2594                    // It can happen when the node is doing an
2595                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2596                    // but is expecting the EL to sync as high
2597                    // as possible before finalizing.
2598                    //
2599                    // This usually doesn't happen on ETH mainnet since CLs use the more
2600                    // secure checkpoint syncing.
2601                    //
2602                    // However, optimism chains will do this. The risk of a reorg is however
2603                    // low.
2604                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2605                    return Some(state.head_block_hash)
2606                }
2607                Ok(Some(_)) => {
2608                    // we're fully synced to the finalized block
2609                }
2610            }
2611        }
2612
2613        None
2614    }
2615
2616    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2617    /// happen if a reorg is happening while we are persisting a block.
2618    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2619        let mut canonical = self.state.tree_state.current_canonical_head;
2620        let mut persisted = self.persistence_state.last_persisted_block;
2621
2622        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2623            Ok(self
2624                .sealed_header_by_hash(num_hash.hash)?
2625                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2626                .parent_num_hash())
2627        };
2628
2629        // Happy path, canonical chain is ahead or equal to persisted chain.
2630        // Walk canonical chain back to make sure that it connects to persisted chain.
2631        while canonical.number > persisted.number {
2632            canonical = parent_num_hash(canonical)?;
2633        }
2634
2635        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2636        if canonical == persisted {
2637            return Ok(None);
2638        }
2639
2640        // At this point, we know that `persisted` block can't be reached by walking the canonical
2641        // chain back. In this case we need to truncate it to the first canonical block it connects
2642        // to.
2643
2644        // Firstly, walk back until we reach the same height as `canonical`.
2645        while persisted.number > canonical.number {
2646            persisted = parent_num_hash(persisted)?;
2647        }
2648
2649        debug_assert_eq!(persisted.number, canonical.number);
2650
2651        // Now walk both chains back until we find a common ancestor.
2652        while persisted.hash != canonical.hash {
2653            canonical = parent_num_hash(canonical)?;
2654            persisted = parent_num_hash(persisted)?;
2655        }
2656
2657        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2658
2659        Ok(Some(persisted.number))
2660    }
2661
2662    /// Invoked when we the canonical chain has been updated.
2663    ///
2664    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2665    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2666        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2667        let start = Instant::now();
2668
2669        // update the tracked canonical head
2670        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2671
2672        let tip = chain_update.tip().clone_sealed_header();
2673        let notification = chain_update.to_chain_notification();
2674
2675        // reinsert any missing reorged blocks
2676        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2677            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2678            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2679            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2680
2681            self.update_reorg_metrics(old.len(), old_first);
2682            self.reinsert_reorged_blocks(new.clone());
2683            self.reinsert_reorged_blocks(old.clone());
2684        }
2685
2686        // update the tracked in-memory state with the new chain
2687        self.canonical_in_memory_state.update_chain(chain_update);
2688        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2689
2690        // Update metrics based on new tip
2691        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2692
2693        // sends an event to all active listeners about the new canonical chain
2694        self.canonical_in_memory_state.notify_canon_state(notification);
2695
2696        // emit event
2697        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2698            Box::new(tip),
2699            start.elapsed(),
2700        ));
2701    }
2702
2703    /// This updates metrics based on the given reorg length and first reorged block number.
2704    fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2705        if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2706            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2707                first_reorged_block <= finalized.number
2708            {
2709                self.metrics.tree.reorgs.finalized.increment(1);
2710            } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2711                first_reorged_block <= safe.number
2712            {
2713                self.metrics.tree.reorgs.safe.increment(1);
2714            } else {
2715                self.metrics.tree.reorgs.head.increment(1);
2716            }
2717        } else {
2718            debug_unreachable!("Reorged chain doesn't have any blocks");
2719        }
2720        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2721    }
2722
2723    /// This reinserts any blocks in the new chain that do not already exist in the tree
2724    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2725        for block in new_chain {
2726            if self
2727                .state
2728                .tree_state
2729                .executed_block_by_hash(block.recovered_block().hash())
2730                .is_none()
2731            {
2732                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2733                self.state.tree_state.insert_executed(block);
2734            }
2735        }
2736    }
2737
2738    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2739    ///
2740    /// This mainly compares the missing parent of the downloaded block with the current canonical
2741    /// tip, and decides whether or not backfill sync should be triggered.
2742    fn on_disconnected_downloaded_block(
2743        &self,
2744        downloaded_block: BlockNumHash,
2745        missing_parent: BlockNumHash,
2746        head: BlockNumHash,
2747    ) -> Option<TreeEvent> {
2748        // compare the missing parent with the canonical tip
2749        if let Some(target) =
2750            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2751        {
2752            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2753            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2754        }
2755
2756        // continue downloading the missing parent
2757        //
2758        // this happens if either:
2759        //  * the missing parent block num < canonical tip num
2760        //    * this case represents a missing block on a fork that is shorter than the canonical
2761        //      chain
2762        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2763        //    less than the backfill threshold
2764        //    * this case represents a potentially long range of blocks to download and execute
2765        let request = if let Some(distance) =
2766            self.distance_from_local_tip(head.number, missing_parent.number)
2767        {
2768            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2769            DownloadRequest::BlockRange(missing_parent.hash, distance)
2770        } else {
2771            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2772            // This happens when the missing parent is on an outdated
2773            // sidechain and we can only download the missing block itself
2774            DownloadRequest::single_block(missing_parent.hash)
2775        };
2776
2777        Some(TreeEvent::Download(request))
2778    }
2779
2780    /// Handles a downloaded block that was successfully inserted as valid.
2781    ///
2782    /// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
2783    /// If it matches a non-head sync target (safe or finalized), makes it canonical inline
2784    /// and triggers a download for the remaining blocks towards the actual head.
2785    /// Otherwise, tries to connect buffered blocks.
2786    fn on_valid_downloaded_block(
2787        &mut self,
2788        block_num_hash: BlockNumHash,
2789    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2790        // check if we just inserted a block that's part of sync targets,
2791        // i.e. head, safe, or finalized
2792        if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2793            sync_target.contains(block_num_hash.hash)
2794        {
2795            debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2796
2797            if sync_target.head_block_hash == block_num_hash.hash {
2798                // we just inserted the sync target head block, make it canonical
2799                return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2800                    sync_target_head: block_num_hash.hash,
2801                })))
2802            }
2803
2804            // This block is part of the sync target (safe or finalized) but not the
2805            // head. Make it canonical and try to connect any buffered children, then
2806            // continue downloading towards the actual head if needed.
2807            self.make_canonical(block_num_hash.hash)?;
2808            self.try_connect_buffered_blocks(block_num_hash)?;
2809
2810            // Check if we've reached the sync target head after connecting buffered
2811            // blocks (e.g. the head block may have already been buffered).
2812            if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2813                let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2814                trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2815                return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2816            }
2817
2818            return Ok(None)
2819        }
2820        trace!(target: "engine::tree", "appended downloaded block");
2821        self.try_connect_buffered_blocks(block_num_hash)?;
2822        Ok(None)
2823    }
2824
2825    /// Invoked with a block downloaded from the network
2826    ///
2827    /// Returns an event with the appropriate action to take, such as:
2828    ///  - download more missing blocks
2829    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2830    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2831    fn on_downloaded_block(
2832        &mut self,
2833        block: SealedBlock<N::Block>,
2834    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2835        let block_num_hash = block.num_hash();
2836        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2837        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2838            return Ok(None)
2839        }
2840
2841        if !self.backfill_sync_state.is_idle() {
2842            return Ok(None)
2843        }
2844
2845        // try to append the block
2846        match self.insert_block(block) {
2847            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2848                return self.on_valid_downloaded_block(block_num_hash);
2849            }
2850            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2851                // block is not connected to the canonical head, we need to download
2852                // its missing branch first
2853                return Ok(self.on_disconnected_downloaded_block(
2854                    block_num_hash,
2855                    missing_ancestor,
2856                    head,
2857                ))
2858            }
2859            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2860                trace!(target: "engine::tree", "downloaded block already executed");
2861            }
2862            Err(err) => {
2863                if let InsertPayloadError::Block(err) = err {
2864                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2865                    if let Err(fatal) = self.on_insert_block_error(err) {
2866                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2867                        return Err(fatal)
2868                    }
2869                }
2870            }
2871        }
2872        Ok(None)
2873    }
2874
2875    /// Inserts a payload into the tree and executes it.
2876    ///
2877    /// This function validates the payload's basic structure, then executes it using the
2878    /// payload validator. The execution includes running all transactions in the payload
2879    /// and validating the resulting state transitions.
2880    ///
2881    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2882    /// or `InsertPayloadError` if validation or execution failed.
2883    fn insert_payload(
2884        &mut self,
2885        payload: T::ExecutionData,
2886    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2887        self.insert_block_or_payload(
2888            payload.block_with_parent(),
2889            payload,
2890            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2891            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2892        )
2893    }
2894
2895    fn insert_block(
2896        &mut self,
2897        block: SealedBlock<N::Block>,
2898    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2899        self.insert_block_or_payload(
2900            block.block_with_parent(),
2901            block,
2902            |validator, block, ctx| validator.validate_block(block, ctx),
2903            |_, block| Ok(block),
2904        )
2905    }
2906
2907    /// Inserts a block or payload into the blockchain tree with full execution.
2908    ///
2909    /// This is a generic function that handles both blocks and payloads by accepting
2910    /// a block identifier, input data, and execution/validation functions. It performs
2911    /// comprehensive checks and execution:
2912    ///
2913    /// - Validates that the block doesn't already exist in the tree
2914    /// - Ensures parent state is available, buffering if necessary
2915    /// - Executes the block/payload using the provided execute function
2916    /// - Handles both canonical and fork chain insertions
2917    /// - Updates pending block state when appropriate
2918    /// - Emits consensus engine events and records metrics
2919    ///
2920    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2921    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2922    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2923    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2924    fn insert_block_or_payload<Input, Err>(
2925        &mut self,
2926        block_id: BlockWithParent,
2927        input: Input,
2928        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ValidationOutput<N>, Err>,
2929        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2930    ) -> Result<InsertPayloadOk, Err>
2931    where
2932        Err: From<InsertBlockError<N::Block>>,
2933    {
2934        let block_insert_start = Instant::now();
2935        let block_num_hash = block_id.block;
2936        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2937
2938        // Check if block already exists - first in memory, then DB only if it could be persisted
2939        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2940            convert_to_block(self, input)?;
2941            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2942        }
2943
2944        // Only query DB if block could be persisted (number <= last persisted block).
2945        // New blocks from CL always have number > last persisted, so skip DB lookup for them.
2946        if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2947            match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2948                Err(err) => {
2949                    let block = convert_to_block(self, input)?;
2950                    return Err(InsertBlockError::new(block, err.into()).into());
2951                }
2952                Ok(Some(_)) => {
2953                    convert_to_block(self, input)?;
2954                    return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2955                }
2956                Ok(None) => {}
2957            }
2958        }
2959
2960        // Ensure that the parent state is available.
2961        match self.state_provider_builder(block_id.parent) {
2962            Err(err) => {
2963                let block = convert_to_block(self, input)?;
2964                return Err(InsertBlockError::new(block, err.into()).into());
2965            }
2966            Ok(None) => {
2967                let block = convert_to_block(self, input)?;
2968
2969                // we don't have the state required to execute this block, buffering it and find the
2970                // missing parent block
2971                let missing_ancestor = self
2972                    .state
2973                    .buffer
2974                    .lowest_ancestor(&block.parent_hash())
2975                    .map(|block| block.parent_num_hash())
2976                    .unwrap_or_else(|| block.parent_num_hash());
2977
2978                self.state.buffer.insert_block(block);
2979
2980                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2981                    head: self.state.tree_state.current_canonical_head,
2982                    missing_ancestor,
2983                }))
2984            }
2985            Ok(Some(_)) => {}
2986        }
2987
2988        // determine whether we are on a fork chain by comparing the block number with the
2989        // canonical head. This is a simple check that is sufficient for the event emission below.
2990        // A block is considered a fork if its number is less than or equal to the canonical head,
2991        // as this indicates there's already a canonical block at that height.
2992        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2993
2994        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2995
2996        let start = Instant::now();
2997
2998        let ValidationOutput { executed_block: executed, execution_timing_stats: timing_stats } =
2999            execute(&mut self.payload_validator, input, ctx)?;
3000
3001        // Emit slow block event immediately after execution so it appears even when
3002        // persistence hasn't completed yet (e.g. blocks arriving faster than persistence).
3003        if let Some(stats) = timing_stats {
3004            if let Some(threshold) = self.config.slow_block_threshold() {
3005                let total_duration = stats.execution_duration + stats.state_hash_duration;
3006                if total_duration > threshold {
3007                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
3008                        stats: stats.clone(),
3009                        commit_duration: None,
3010                        total_duration,
3011                    }));
3012                }
3013            }
3014            self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
3015        }
3016
3017        // if the parent is the canonical head, we can insert the block as the pending block
3018        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
3019        {
3020            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
3021            self.canonical_in_memory_state.set_pending_block(executed.clone());
3022        }
3023
3024        self.state.tree_state.insert_executed(executed.clone());
3025        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
3026
3027        // emit insert event
3028        let elapsed = start.elapsed();
3029        let engine_event = if is_fork {
3030            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
3031        } else {
3032            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
3033        };
3034        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
3035
3036        self.metrics
3037            .engine
3038            .block_insert_total_duration
3039            .record(block_insert_start.elapsed().as_secs_f64());
3040        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
3041        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
3042    }
3043
3044    /// Handles an error that occurred while inserting a block.
3045    ///
3046    /// If this is a validation error this will mark the block as invalid.
3047    ///
3048    /// Returns the proper payload status response if the block is invalid.
3049    fn on_insert_block_error(
3050        &mut self,
3051        error: InsertBlockError<N::Block>,
3052    ) -> Result<PayloadStatus, InsertBlockFatalError> {
3053        let (block, error) = error.split();
3054
3055        // if invalid block, we check the validation error. Otherwise return the fatal
3056        // error.
3057        let validation_err = error.ensure_validation_error()?;
3058
3059        // If the error was due to an invalid payload, the payload is added to the
3060        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
3061        // returned.
3062        warn!(
3063            target: "engine::tree",
3064            invalid_hash=%block.hash(),
3065            invalid_number=block.number(),
3066            %validation_err,
3067            "Invalid block error on new payload",
3068        );
3069        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3070
3071        // keep track of the invalid header unless the consensus impl considers it transient
3072        let is_transient = match &validation_err {
3073            InsertBlockValidationError::Consensus(err) => self.consensus.is_transient_error(err),
3074            _ => false,
3075        };
3076        if is_transient {
3077            warn!(
3078                target: "engine::tree",
3079                invalid_hash=%block.hash(),
3080                invalid_number=block.number(),
3081                %validation_err,
3082                "Skipping invalid header cache insert for transient validation error",
3083            );
3084        } else {
3085            self.state.invalid_headers.insert(block.block_with_parent());
3086        }
3087        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock {
3088            block: Box::new(block),
3089            error: validation_err.to_string(),
3090        }));
3091
3092        Ok(PayloadStatus::new(
3093            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3094            latest_valid_hash,
3095        ))
3096    }
3097
3098    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
3099    fn on_new_payload_error(
3100        &mut self,
3101        error: NewPayloadError,
3102        payload_num_hash: NumHash,
3103        parent_hash: B256,
3104    ) -> ProviderResult<PayloadStatus> {
3105        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3106        // we need to convert the error to a payload status (response to the CL)
3107
3108        let latest_valid_hash =
3109            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3110                // Engine-API rules:
3111                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
3112                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
3113                None
3114            } else {
3115                self.latest_valid_hash_for_invalid_payload(parent_hash)?
3116            };
3117
3118        let status = PayloadStatusEnum::from(error);
3119        Ok(PayloadStatus::new(status, latest_valid_hash))
3120    }
3121
3122    /// Attempts to find the header for the given block hash if it is canonical.
3123    pub fn find_canonical_header(
3124        &self,
3125        hash: B256,
3126    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3127        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3128
3129        if canonical.is_none() {
3130            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3131        }
3132
3133        Ok(canonical)
3134    }
3135
3136    /// Updates the tracked finalized block if we have it.
3137    fn update_finalized_block(
3138        &self,
3139        finalized_block_hash: B256,
3140    ) -> Result<(), OnForkChoiceUpdated> {
3141        if finalized_block_hash.is_zero() {
3142            return Ok(())
3143        }
3144
3145        match self.find_canonical_header(finalized_block_hash) {
3146            Ok(None) => {
3147                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3148                // if the finalized block is not known, we can't update the finalized block
3149                return Err(OnForkChoiceUpdated::invalid_state())
3150            }
3151            Ok(Some(finalized)) => {
3152                if Some(finalized.num_hash()) !=
3153                    self.canonical_in_memory_state.get_finalized_num_hash()
3154                {
3155                    // we're also persisting the finalized block on disk so we can reload it on
3156                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3157                    let _ = self.persistence.save_finalized_block_number(finalized.number());
3158                    self.canonical_in_memory_state.set_finalized(finalized.clone());
3159                    // Update finalized block height metric
3160                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3161                }
3162            }
3163            Err(err) => {
3164                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3165            }
3166        }
3167
3168        Ok(())
3169    }
3170
3171    /// Updates the tracked safe block if we have it
3172    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3173        if safe_block_hash.is_zero() {
3174            return Ok(())
3175        }
3176
3177        match self.find_canonical_header(safe_block_hash) {
3178            Ok(None) => {
3179                debug!(target: "engine::tree", "Safe block not found in canonical chain");
3180                // if the safe block is not known, we can't update the safe block
3181                return Err(OnForkChoiceUpdated::invalid_state())
3182            }
3183            Ok(Some(safe)) => {
3184                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3185                    // we're also persisting the safe block on disk so we can reload it on
3186                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3187                    let _ = self.persistence.save_safe_block_number(safe.number());
3188                    self.canonical_in_memory_state.set_safe(safe.clone());
3189                    // Update safe block height metric
3190                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
3191                }
3192            }
3193            Err(err) => {
3194                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3195            }
3196        }
3197
3198        Ok(())
3199    }
3200
3201    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
3202    /// made canonical.
3203    ///
3204    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
3205    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
3206    ///
3207    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
3208    /// are consistent with the head block.
3209    fn ensure_consistent_forkchoice_state(
3210        &self,
3211        state: ForkchoiceState,
3212    ) -> Result<(), OnForkChoiceUpdated> {
3213        // Ensure that the finalized block, if not zero, is known and in the canonical chain
3214        // after the head block is canonicalized.
3215        //
3216        // This ensures that the finalized block is consistent with the head block, i.e. the
3217        // finalized block is an ancestor of the head block.
3218        self.update_finalized_block(state.finalized_block_hash)?;
3219
3220        // Also ensure that the safe block, if not zero, is known and in the canonical chain
3221        // after the head block is canonicalized.
3222        //
3223        // This ensures that the safe block is consistent with the head block, i.e. the safe
3224        // block is an ancestor of the head block.
3225        self.update_safe_block(state.safe_block_hash)
3226    }
3227
3228    /// Validates the payload attributes with respect to the header and fork choice state.
3229    ///
3230    /// This is called during `engine_forkchoiceUpdated` when the CL provides payload attributes,
3231    /// indicating it wants the EL to start building a new block.
3232    ///
3233    /// Runs [`PayloadValidator::validate_payload_attributes_against_header`](reth_engine_primitives::PayloadValidator::validate_payload_attributes_against_header) to ensure
3234    /// `payloadAttributes.timestamp > headBlock.timestamp` per the Engine API spec.
3235    ///
3236    /// If validation passes, sends the attributes to the payload builder to start a new
3237    /// payload job. If it fails, returns `INVALID_PAYLOAD_ATTRIBUTES` without rolling back
3238    /// the forkchoice update.
3239    ///
3240    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
3241    /// return an error if the payload attributes are invalid.
3242    fn process_payload_attributes(
3243        &self,
3244        attributes: T::PayloadAttributes,
3245        head: &N::BlockHeader,
3246        state: ForkchoiceState,
3247    ) -> OnForkChoiceUpdated {
3248        if let Err(err) =
3249            self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3250        {
3251            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3252            return OnForkChoiceUpdated::invalid_payload_attributes()
3253        }
3254
3255        // 8. Client software MUST begin a payload build process building on top of
3256        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
3257        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
3258        //    The build process is specified in the Payload building section.
3259
3260        let cache = if self.config.share_execution_cache_with_payload_builder() {
3261            self.payload_validator.cache_for(state.head_block_hash)
3262        } else {
3263            None
3264        };
3265
3266        let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3267            self.payload_validator.sparse_trie_handle_for(
3268                state.head_block_hash,
3269                head.state_root(),
3270                &self.state,
3271            )
3272        } else {
3273            None
3274        };
3275
3276        // send the payload to the builder and return the receiver for the pending payload
3277        // id, initiating payload job is handled asynchronously
3278        let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3279            parent_hash: state.head_block_hash,
3280            attributes,
3281            cache,
3282            trie_handle,
3283        });
3284
3285        // Client software MUST respond to this method call in the following way:
3286        // {
3287        //      payloadStatus: {
3288        //          status: VALID,
3289        //          latestValidHash: forkchoiceState.headBlockHash,
3290        //          validationError: null
3291        //      },
3292        //      payloadId: buildProcessId
3293        // }
3294        //
3295        // if the payload is deemed VALID and the build process has begun.
3296        OnForkChoiceUpdated::updated_with_pending_payload_id(
3297            PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3298            pending_payload_id,
3299        )
3300    }
3301
3302    /// Remove all blocks up to __and including__ the given block number.
3303    ///
3304    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
3305    /// those which have a fork point at or below the finalized hash.
3306    ///
3307    /// Canonical blocks below the upper bound will still be removed.
3308    pub(crate) fn remove_before(
3309        &mut self,
3310        upper_bound: BlockNumHash,
3311        finalized_hash: Option<B256>,
3312    ) -> ProviderResult<()> {
3313        // first fetch the finalized block number and then call the remove_before method on
3314        // tree_state
3315        let num = if let Some(hash) = finalized_hash {
3316            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3317        } else {
3318            None
3319        };
3320
3321        self.state.tree_state.remove_until(
3322            upper_bound,
3323            self.persistence_state.last_persisted_block.hash,
3324            num,
3325        );
3326        Ok(())
3327    }
3328
3329    /// Returns a builder for creating state providers for the given hash.
3330    ///
3331    /// This is an optimization for parallel execution contexts where we want to avoid
3332    /// creating state providers in the critical path.
3333    pub fn state_provider_builder(
3334        &self,
3335        hash: B256,
3336    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3337    where
3338        P: BlockReader + StateProviderFactory + StateReader + Clone,
3339    {
3340        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3341            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3342            // the block leads back to the canonical chain
3343            return Ok(Some(StateProviderBuilder::new(
3344                self.provider.clone(),
3345                historical,
3346                Some(blocks),
3347            )))
3348        }
3349
3350        // Check if the block is persisted
3351        if let Some(header) = self.provider.header(hash)? {
3352            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3353            // For persisted blocks, we create a builder that will fetch state directly from the
3354            // database
3355            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3356        }
3357
3358        debug!(target: "engine::tree", %hash, "no canonical state found for block");
3359        Ok(None)
3360    }
3361}
3362
3363/// Events received in the main engine loop.
3364#[derive(Debug)]
3365enum LoopEvent<T, N>
3366where
3367    N: NodePrimitives,
3368    T: PayloadTypes,
3369{
3370    /// An engine API message was received.
3371    EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3372    /// A persistence task completed.
3373    PersistenceComplete {
3374        /// The unified result of the persistence operation.
3375        result: PersistenceResult,
3376        /// When the persistence operation started.
3377        start_time: Instant,
3378    },
3379    /// A channel was disconnected.
3380    Disconnected,
3381}
3382
3383/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3384/// variant.
3385///
3386/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3387/// is valid or not.
3388#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3389pub enum BlockStatus {
3390    /// The block is valid and block extends canonical chain.
3391    Valid,
3392    /// The block may be valid and has an unknown missing ancestor.
3393    Disconnected {
3394        /// Current canonical head.
3395        head: BlockNumHash,
3396        /// The lowest ancestor block that is not connected to the canonical chain.
3397        missing_ancestor: BlockNumHash,
3398    },
3399}
3400
3401/// How a payload was inserted if it was valid.
3402///
3403/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3404/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3405#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3406pub enum InsertPayloadOk {
3407    /// The payload was valid, but we have already seen it.
3408    AlreadySeen(BlockStatus),
3409    /// The payload was valid and inserted into the tree.
3410    Inserted(BlockStatus),
3411}
3412
3413/// Target for block persistence.
3414#[derive(Debug, Clone, Copy)]
3415enum PersistTarget {
3416    /// Persist up to `canonical_head - memory_block_buffer_target`.
3417    Threshold,
3418    /// Persist all blocks up to and including the canonical head.
3419    Head,
3420}
3421
3422/// Result of waiting for caches to become available.
3423#[derive(Debug, Clone, Copy, Default)]
3424pub struct CacheWaitDurations {
3425    /// Time spent waiting for the execution cache lock.
3426    pub execution_cache: Duration,
3427    /// Time spent waiting for the sparse trie lock.
3428    pub sparse_trie: Duration,
3429}
3430
3431/// Trait for types that can wait for caches to become available.
3432///
3433/// This is used by `reth_newPayload` endpoint to ensure that payload processing
3434/// waits for any ongoing operations to complete before starting.
3435pub trait WaitForCaches {
3436    /// Waits for cache updates to complete.
3437    ///
3438    /// Returns the time spent waiting for each cache separately.
3439    fn wait_for_caches(&self) -> CacheWaitDurations;
3440}