Skip to main content

reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::{map::B256Map, B256};
11use alloy_rpc_types_engine::{
12    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError, InsertBlockValidationError};
15use reth_chain_state::{
16    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17    MemoryOverlayStateProvider, NewCanonicalChain, StateTrieOverlayManager,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29    FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32    BalProvider, BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33    DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34    StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35    StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority, WorkerPool};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48    oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64pub mod types;
65
66use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
67pub use block_buffer::BlockBuffer;
68pub use invalid_headers::InvalidHeaderCache;
69pub use metrics::EngineApiMetrics;
70pub use payload_processor::*;
71pub use payload_validator::{BasicEngineValidator, EngineValidator};
72pub use persistence_state::PersistenceState;
73pub use reth_engine_primitives::TreeConfig;
74pub use reth_execution_cache::{
75    CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider,
76    ExecutionCache, PayloadExecutionCache, SavedCache,
77};
78pub use types::{ValidationOutcome, ValidationOutput};
79
80pub mod state;
81
82/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
83///
84/// This is the default threshold, and represents the distance (gap) from the local head to a
85/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
86/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
87///
88/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
89/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
90/// backfill this gap.
91pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
92
93/// The minimum number of blocks to retain in the changeset cache after eviction.
94///
95/// This ensures that recent trie changesets are kept in memory for potential reorgs,
96/// even when the finalized block is not set (e.g., on L2s like Optimism).
97const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
98
99/// A builder for creating state providers that can be used across threads.
100#[derive(Clone, Debug)]
101pub struct StateProviderBuilder<N: NodePrimitives, P> {
102    /// The provider factory used to create providers.
103    provider_factory: P,
104    /// The historical block hash to fetch state from.
105    historical: B256,
106    /// The blocks that form the chain from historical to target and are in memory.
107    overlay: Option<Vec<ExecutedBlock<N>>>,
108}
109
110impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
111    /// Creates a new state provider from the provider factory, historical block hash and optional
112    /// overlaid blocks.
113    pub const fn new(
114        provider_factory: P,
115        historical: B256,
116        overlay: Option<Vec<ExecutedBlock<N>>>,
117    ) -> Self {
118        Self { provider_factory, historical, overlay }
119    }
120}
121
122impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
123where
124    P: BlockReader + StateProviderFactory + StateReader + Clone,
125{
126    /// Creates a new state provider from this builder.
127    pub fn build(&self) -> ProviderResult<StateProviderBox> {
128        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
129        if let Some(overlay) = self.overlay.clone() {
130            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
131        }
132        Ok(provider)
133    }
134}
135
136/// Tracks the state of the engine api internals.
137///
138/// This type is not shareable.
139#[derive(Debug)]
140pub struct EngineApiTreeState<N: NodePrimitives> {
141    /// Tracks the state of the blockchain tree.
142    tree_state: TreeState<N>,
143    /// Tracks the forkchoice state updates received by the CL.
144    forkchoice_state_tracker: ForkchoiceStateTracker,
145    /// Buffer of detached blocks.
146    buffer: BlockBuffer<N::Block>,
147    /// Tracks the header of invalid payloads that were rejected by the engine because they're
148    /// invalid.
149    invalid_headers: InvalidHeaderCache,
150}
151
152impl<N: NodePrimitives> EngineApiTreeState<N> {
153    fn new(
154        block_buffer_limit: u32,
155        max_invalid_header_cache_length: u32,
156        invalid_header_hit_eviction_threshold: u8,
157        canonical_block: BlockNumHash,
158        engine_kind: EngineApiKind,
159        state_trie_overlay_worker_pool: Arc<WorkerPool>,
160    ) -> Self {
161        Self {
162            invalid_headers: InvalidHeaderCache::new(
163                max_invalid_header_cache_length,
164                invalid_header_hit_eviction_threshold,
165            ),
166            buffer: BlockBuffer::new(block_buffer_limit),
167            tree_state: TreeState::new(
168                canonical_block,
169                engine_kind,
170                StateTrieOverlayManager::new(state_trie_overlay_worker_pool),
171            ),
172            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
173        }
174    }
175
176    /// Returns a reference to the tree state.
177    pub const fn tree_state(&self) -> &TreeState<N> {
178        &self.tree_state
179    }
180
181    /// Returns true if the block has been marked as invalid.
182    pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
183        self.invalid_headers.get(hash).is_some()
184    }
185}
186
187/// The outcome of a tree operation.
188#[derive(Debug)]
189pub struct TreeOutcome<T> {
190    /// The outcome of the operation.
191    pub outcome: T,
192    /// An optional event to tell the caller to do something.
193    pub event: Option<TreeEvent>,
194    /// Whether the block was already seen, meaning no real execution happened during this
195    /// `newPayload` call.
196    pub already_seen: bool,
197}
198
199impl<T> TreeOutcome<T> {
200    /// Create new tree outcome.
201    pub const fn new(outcome: T) -> Self {
202        Self { outcome, event: None, already_seen: false }
203    }
204
205    /// Set event on the outcome.
206    pub fn with_event(mut self, event: TreeEvent) -> Self {
207        self.event = Some(event);
208        self
209    }
210
211    /// Set the `already_seen` flag on the outcome.
212    pub const fn with_already_seen(mut self, value: bool) -> Self {
213        self.already_seen = value;
214        self
215    }
216}
217
218/// Result of trying to insert a new payload in [`EngineApiTreeHandler`].
219#[derive(Debug)]
220pub struct TryInsertPayloadResult {
221    /// - `Valid`: Payload successfully validated and inserted
222    /// - `Syncing`: Parent missing, payload buffered for later
223    /// - Error status: Payload is invalid
224    pub status: PayloadStatus,
225    /// Whether the block was already seen
226    pub already_seen: bool,
227}
228
229impl TryInsertPayloadResult {
230    /// Convert the result into a [`TreeOutcome`].
231    #[inline]
232    pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
233        TreeOutcome::new(self.status).with_already_seen(self.already_seen)
234    }
235}
236
237/// Events that are triggered by Tree Chain
238#[derive(Debug)]
239pub enum TreeEvent {
240    /// Tree action is needed.
241    TreeAction(TreeAction),
242    /// Backfill action is needed.
243    BackfillAction(BackfillAction),
244    /// Block download is needed.
245    Download(DownloadRequest),
246}
247
248impl TreeEvent {
249    /// Returns true if the event is a backfill action.
250    const fn is_backfill_action(&self) -> bool {
251        matches!(self, Self::BackfillAction(_))
252    }
253}
254
255/// The actions that can be performed on the tree.
256#[derive(Debug)]
257pub enum TreeAction {
258    /// Make target canonical.
259    MakeCanonical {
260        /// The sync target head hash
261        sync_target_head: B256,
262    },
263}
264
265/// The engine API tree handler implementation.
266///
267/// This type is responsible for processing engine API requests, maintaining the canonical state and
268/// emitting events.
269pub struct EngineApiTreeHandler<N, P, T, V, C>
270where
271    N: NodePrimitives,
272    T: PayloadTypes,
273    C: ConfigureEvm<Primitives = N> + 'static,
274{
275    provider: P,
276    consensus: Arc<dyn FullConsensus<N>>,
277    payload_validator: V,
278    /// Keeps track of internals such as executed and buffered blocks.
279    state: EngineApiTreeState<N>,
280    /// The half for sending messages to the engine.
281    ///
282    /// This is kept so that we can queue in messages to ourself that we can process later, for
283    /// example distributing workload across multiple messages that would otherwise take too long
284    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
285    /// them one by one so that we can handle incoming engine API in between and don't become
286    /// unresponsive. This can happen during live sync transition where we're trying to close the
287    /// gap (up to 3 epochs of blocks in the worst case).
288    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
289    /// Incoming engine API requests.
290    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
291    /// Outgoing events that are emitted to the handler.
292    outgoing: UnboundedSender<EngineApiEvent<N>>,
293    /// Channels to the persistence layer.
294    persistence: PersistenceHandle<N>,
295    /// Tracks the state changes of the persistence task.
296    persistence_state: PersistenceState,
297    /// Flag indicating the state of the node's backfill synchronization process.
298    backfill_sync_state: BackfillSyncState,
299    /// Keeps track of the state of the canonical chain that isn't persisted yet.
300    /// This is intended to be accessed from external sources, such as rpc.
301    canonical_in_memory_state: CanonicalInMemoryState<N>,
302    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
303    /// updates
304    payload_builder: PayloadBuilderHandle<T>,
305    /// Configuration settings.
306    config: TreeConfig,
307    /// Metrics for the engine api.
308    metrics: EngineApiMetrics,
309    /// The engine API variant of this handler
310    engine_kind: EngineApiKind,
311    /// The EVM configuration.
312    evm_config: C,
313    /// Changeset cache for in-memory trie changesets
314    changeset_cache: ChangesetCache,
315    /// Timing statistics for executed blocks, keyed by block hash.
316    /// Stored here (not in `ExecutedBlock`) to avoid leaking observability concerns into the block
317    /// type. Entries are removed when blocks are persisted or invalidated.
318    execution_timing_stats: B256Map<Box<ExecutionTimingStats>>,
319    /// Set when an FCU with payload attributes is received, cleared on the next FCU without.
320    /// Suppresses persistence cycles during payload building.
321    building_payload: bool,
322    /// Task runtime for spawning blocking work on named, reusable threads.
323    runtime: reth_tasks::Runtime,
324}
325
326impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
327    for EngineApiTreeHandler<N, P, T, V, C>
328where
329    N: NodePrimitives,
330    C: Debug + ConfigureEvm<Primitives = N>,
331{
332    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333        f.debug_struct("EngineApiTreeHandler")
334            .field("provider", &self.provider)
335            .field("consensus", &self.consensus)
336            .field("payload_validator", &self.payload_validator)
337            .field("state", &self.state)
338            .field("incoming_tx", &self.incoming_tx)
339            .field("persistence", &self.persistence)
340            .field("persistence_state", &self.persistence_state)
341            .field("backfill_sync_state", &self.backfill_sync_state)
342            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
343            .field("payload_builder", &self.payload_builder)
344            .field("config", &self.config)
345            .field("metrics", &self.metrics)
346            .field("engine_kind", &self.engine_kind)
347            .field("evm_config", &self.evm_config)
348            .field("changeset_cache", &self.changeset_cache)
349            .field("execution_timing_stats", &self.execution_timing_stats.len())
350            .field("runtime", &self.runtime)
351            .finish()
352    }
353}
354
355impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
356where
357    N: NodePrimitives,
358    P: DatabaseProviderFactory
359        + BlockReader<Block = N::Block, Header = N::BlockHeader>
360        + StateProviderFactory
361        + StateReader<Receipt = N::Receipt>
362        + HashedPostStateProvider
363        + BalProvider
364        + Clone
365        + 'static,
366    P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
367        + StageCheckpointReader
368        + ChangeSetReader
369        + StorageChangeSetReader
370        + StorageSettingsCache,
371    C: ConfigureEvm<Primitives = N> + 'static,
372    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
373    V: EngineValidator<T> + WaitForCaches,
374{
375    /// Creates a new [`EngineApiTreeHandler`].
376    #[expect(clippy::too_many_arguments)]
377    pub fn new(
378        provider: P,
379        consensus: Arc<dyn FullConsensus<N>>,
380        payload_validator: V,
381        outgoing: UnboundedSender<EngineApiEvent<N>>,
382        state: EngineApiTreeState<N>,
383        canonical_in_memory_state: CanonicalInMemoryState<N>,
384        persistence: PersistenceHandle<N>,
385        persistence_state: PersistenceState,
386        payload_builder: PayloadBuilderHandle<T>,
387        config: TreeConfig,
388        engine_kind: EngineApiKind,
389        evm_config: C,
390        changeset_cache: ChangesetCache,
391        runtime: reth_tasks::Runtime,
392    ) -> Self {
393        let (incoming_tx, incoming) = crossbeam_channel::unbounded();
394
395        Self {
396            provider,
397            consensus,
398            payload_validator,
399            incoming,
400            outgoing,
401            persistence,
402            persistence_state,
403            backfill_sync_state: BackfillSyncState::Idle,
404            state,
405            canonical_in_memory_state,
406            payload_builder,
407            config,
408            metrics: Default::default(),
409            incoming_tx,
410            engine_kind,
411            evm_config,
412            changeset_cache,
413            execution_timing_stats: B256Map::default(),
414            building_payload: false,
415            runtime,
416        }
417    }
418
419    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
420    /// own thread.
421    ///
422    /// Returns the sender through which incoming requests can be sent to the task and the receiver
423    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
424    #[expect(clippy::complexity)]
425    pub fn spawn_new(
426        provider: P,
427        consensus: Arc<dyn FullConsensus<N>>,
428        payload_validator: V,
429        persistence: PersistenceHandle<N>,
430        payload_builder: PayloadBuilderHandle<T>,
431        canonical_in_memory_state: CanonicalInMemoryState<N>,
432        config: TreeConfig,
433        kind: EngineApiKind,
434        evm_config: C,
435        changeset_cache: ChangesetCache,
436        runtime: reth_tasks::Runtime,
437    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
438    {
439        let best_block_number = provider.best_block_number().unwrap_or(0);
440        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
441
442        let persistence_state = PersistenceState {
443            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
444            rx: None,
445        };
446
447        let (tx, outgoing) = unbounded_channel();
448        let state = EngineApiTreeState::new(
449            config.block_buffer_limit(),
450            config.max_invalid_header_cache_length(),
451            config.invalid_header_hit_eviction_threshold(),
452            header.num_hash(),
453            kind,
454            runtime.state_trie_overlay_worker_pool(),
455        );
456
457        let task = Self::new(
458            provider,
459            consensus,
460            payload_validator,
461            tx,
462            state,
463            canonical_in_memory_state,
464            persistence,
465            persistence_state,
466            payload_builder,
467            config,
468            kind,
469            evm_config,
470            changeset_cache,
471            runtime,
472        );
473        let incoming = task.incoming_tx.clone();
474        spawn_os_thread("engine", || {
475            increase_thread_priority();
476            task.run()
477        });
478        (incoming, outgoing)
479    }
480
481    /// Returns a [`TreeOutcome`] indicating the forkchoice head is valid and canonical.
482    fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
483        TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
484            PayloadStatusEnum::Valid,
485            Some(state.head_block_hash),
486        )))
487    }
488
489    /// Returns a new [`Sender`] to send messages to this type.
490    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
491        self.incoming_tx.clone()
492    }
493
494    /// How many blocks the canonical tip is ahead of the last persisted block. A large gap means
495    /// persistence is falling behind execution.
496    const fn persistence_gap(&self) -> u64 {
497        self.state
498            .tree_state
499            .canonical_block_number()
500            .saturating_sub(self.persistence_state.last_persisted_block.number)
501    }
502
503    /// Returns `true` when the main loop should stop draining the tree input channel.
504    ///
505    /// This is the case when persistence is already running and the gap between the canonical tip
506    /// and the last persisted block has reached the configured threshold.
507    const fn should_backpressure(&self) -> bool {
508        self.persistence_state.in_progress() &&
509            self.persistence_gap() >= self.config.persistence_backpressure_threshold()
510    }
511
512    /// Run the engine API handler.
513    ///
514    /// This will block the current thread and process incoming messages.
515    pub fn run(mut self) {
516        loop {
517            // Each iteration has three phases:
518            //
519            // 1. Non-blocking poll for persistence completion. If the background flush already
520            //    landed, absorb the result now so the gap calculation below is fresh.
521            // 2. Decide how to wait for the next event. When the canonical-to-persisted gap exceeds
522            //    the backpressure threshold we only block on the persistence receiver, leaving new
523            //    engine requests sitting in the unbounded upstream channel.
524            // 3. Handle the event (engine message or persistence completion) and kick off a new
525            //    persistence cycle if the threshold is met again.
526            //
527            // The net effect: when the persistence gap exceeds the threshold, we stop
528            // processing incoming messages and let them queue in the channel. This is only a
529            // soft form of backpressure: it delays replies and, more importantly, prevents
530            // executing further blocks that would pile up in the persistence queue - where each
531            // block carries heavier state (eg. trie updates) than the raw payload sitting in the
532            // engine channel.
533            //
534            // Standard Ethereum CLs won't truly back off - the engine API has no
535            // backpressure semantics, and CLs typically timeout after ≈8s and resend - so
536            // this cannot prevent the incoming channel from growing under sustained load.
537            // But it shifts the bottleneck to the lighter-weight incoming queue rather than
538            // the costlier persistence pipeline. Other clients that respect reply latency
539            // can treat the delayed responses as a signal to chill out.
540            match self.try_poll_persistence() {
541                Ok(true) => {
542                    if let Err(err) = self.advance_persistence() {
543                        error!(target: "engine::tree", %err, "Advancing persistence failed");
544                        return
545                    }
546                    continue;
547                }
548                Ok(false) => {}
549                Err(err) => {
550                    error!(target: "engine::tree", %err, "Polling persistence failed");
551                    return
552                }
553            }
554
555            let event = if self.should_backpressure() {
556                self.metrics.engine.backpressure_active.set(1.0);
557                let stall_start = Instant::now();
558                let event = self.wait_for_persistence_event();
559                self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
560                event
561            } else {
562                self.metrics.engine.backpressure_active.set(0.0);
563                self.wait_for_event()
564            };
565
566            match event {
567                LoopEvent::EngineMessage(msg) => {
568                    debug!(target: "engine::tree", %msg, "received new engine message");
569                    match self.on_engine_message(msg) {
570                        Ok(ops::ControlFlow::Break(())) => return,
571                        Ok(ops::ControlFlow::Continue(())) => {}
572                        Err(fatal) => {
573                            error!(target: "engine::tree", %fatal, "insert block fatal error");
574                            return
575                        }
576                    }
577                }
578                LoopEvent::PersistenceComplete { result, start_time } => {
579                    if let Err(err) = self.on_persistence_complete(result, start_time) {
580                        error!(target: "engine::tree", %err, "Persistence complete handling failed");
581                        return
582                    }
583                }
584                LoopEvent::Disconnected => {
585                    error!(target: "engine::tree", "Channel disconnected");
586                    return
587                }
588            }
589
590            // Always check if we need to trigger new persistence after any event:
591            // - After engine messages: new blocks may have been inserted that exceed the
592            //   persistence threshold
593            // - After persistence completion: we can now persist more blocks if needed
594            if let Err(err) = self.advance_persistence() {
595                error!(target: "engine::tree", %err, "Advancing persistence failed");
596                return
597            }
598        }
599    }
600
601    /// Blocks until the in-flight persistence task completes, used when we are under
602    /// backpressure.
603    ///
604    /// Unlike `wait_for_event`, this deliberately does not read from the tree input channel. Any
605    /// requests sent to the tree remain queued upstream until persistence catches up.
606    fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
607        let maybe_persistence = self.persistence_state.rx.take();
608
609        if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
610            match persistence_rx.recv() {
611                Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
612                Err(_) => LoopEvent::Disconnected,
613            }
614        } else {
615            self.wait_for_event()
616        }
617    }
618
619    /// Blocks until the next event is ready: either an incoming engine message or a persistence
620    /// completion (if one is in progress).
621    ///
622    /// Uses biased selection to prioritize persistence completion to update in-memory state and
623    /// unblock further writes.
624    fn wait_for_event(&mut self) -> LoopEvent<T, N> {
625        // Take ownership of persistence rx if present
626        let maybe_persistence = self.persistence_state.rx.take();
627
628        if let Some((persistence_rx, start_time, action)) = maybe_persistence {
629            // Biased select prioritizes persistence completion to update in memory state and
630            // unblock further writes
631            crossbeam_channel::select_biased! {
632                recv(persistence_rx) -> result => {
633                    // Don't put it back - consumed (oneshot-like behavior)
634                    match result {
635                        Ok(result) => LoopEvent::PersistenceComplete {
636                            result,
637                            start_time,
638                        },
639                        Err(_) => LoopEvent::Disconnected,
640                    }
641                },
642                recv(self.incoming) -> msg => {
643                    // Put the persistence rx back - we didn't consume it
644                    self.persistence_state.rx = Some((persistence_rx, start_time, action));
645                    match msg {
646                        Ok(m) => LoopEvent::EngineMessage(m),
647                        Err(_) => LoopEvent::Disconnected,
648                    }
649                },
650            }
651        } else {
652            // No persistence in progress - just wait on incoming
653            match self.incoming.recv() {
654                Ok(m) => LoopEvent::EngineMessage(m),
655                Err(_) => LoopEvent::Disconnected,
656            }
657        }
658    }
659
660    /// Invoked when previously requested blocks were downloaded.
661    ///
662    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
663    /// will execute the first batch and send the remaining blocks back through the channel so that
664    /// block request processing isn't blocked for a long time.
665    fn on_downloaded(
666        &mut self,
667        mut blocks: Vec<SealedBlock<N::Block>>,
668    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
669        if blocks.is_empty() {
670            // nothing to execute
671            return Ok(None)
672        }
673
674        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
675        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
676        for block in blocks.drain(..batch) {
677            if let Some(event) = self.on_downloaded_block(block)? {
678                let needs_backfill = event.is_backfill_action();
679                self.on_tree_event(event)?;
680                if needs_backfill {
681                    // can exit early if backfill is needed
682                    return Ok(None)
683                }
684            }
685        }
686
687        // if we still have blocks to execute, send them as a followup request
688        if !blocks.is_empty() {
689            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
690        }
691
692        Ok(None)
693    }
694
695    /// When the Consensus layer receives a new block via the consensus gossip protocol,
696    /// the transactions in the block are sent to the execution layer in the form of a
697    /// [`PayloadTypes::ExecutionData`], for example
698    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
699    /// Execution layer executes the transactions and validates the state in the block header,
700    /// then passes validation data back to Consensus layer, that adds the block to the head of
701    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
702    /// network in the form of a "Beacon block".
703    ///
704    /// These responses should adhere to the [Engine API Spec for
705    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
706    ///
707    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
708    /// returns an error if an internal error occurred.
709    #[instrument(
710        level = "debug",
711        target = "engine::tree",
712        skip_all,
713        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
714    )]
715    fn on_new_payload(
716        &mut self,
717        payload: T::ExecutionData,
718    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
719        trace!(target: "engine::tree", "invoked new payload");
720
721        // start timing for the new payload process
722        let start = Instant::now();
723
724        // Ensures that the given payload does not violate any consensus rules that concern the
725        // block's layout, like:
726        //    - missing or invalid base fee
727        //    - invalid extra data
728        //    - invalid transactions
729        //    - incorrect hash
730        //    - the versioned hashes passed with the payload do not exactly match transaction
731        //      versioned hashes
732        //    - the block does not contain blob transactions if it is pre-cancun
733        //
734        // This validates the following engine API rule:
735        //
736        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
737        //    validation by taking the following steps:
738        //
739        //   1. Obtain the actual array by concatenating blob versioned hashes lists
740        //      (`tx.blob_versioned_hashes`) of each [blob
741        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
742        //      in the payload, respecting the order of inclusion. If the payload has no blob
743        //      transactions the expected array **MUST** be `[]`.
744        //
745        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
746        //      null}` if the expected and the actual arrays don't match.
747        //
748        // This validation **MUST** be instantly run in all cases even during active sync process.
749
750        let num_hash = payload.num_hash();
751        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
752        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
753
754        let block_hash = num_hash.hash;
755
756        // Check for invalid ancestors
757        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
758            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
759            return Ok(TreeOutcome::new(status));
760        }
761
762        // record pre-execution phase duration
763        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
764
765        let mut outcome = if self.backfill_sync_state.is_idle() {
766            self.try_insert_payload(payload)?.into_outcome()
767        } else {
768            TreeOutcome::new(self.try_buffer_payload(payload)?)
769        };
770
771        // if the block is valid and it is the current sync target head, make it canonical
772        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
773            // Only create the canonical event if this block isn't already the canonical head
774            if self.state.tree_state.canonical_block_hash() != block_hash {
775                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
776                    sync_target_head: block_hash,
777                }));
778            }
779        }
780
781        // record total newPayload duration
782        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
783
784        Ok(outcome)
785    }
786
787    /// Processes a payload during normal sync operation.
788    #[instrument(level = "debug", target = "engine::tree", skip_all)]
789    fn try_insert_payload(
790        &mut self,
791        payload: T::ExecutionData,
792    ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
793        let block_hash = payload.block_hash();
794        let num_hash = payload.num_hash();
795        let parent_hash = payload.parent_hash();
796        let mut latest_valid_hash = None;
797
798        match self.insert_payload(payload) {
799            Ok(status) => {
800                let (status, already_seen) = match status {
801                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
802                        latest_valid_hash = Some(block_hash);
803                        self.try_connect_buffered_blocks(num_hash)?;
804                        (PayloadStatusEnum::Valid, false)
805                    }
806                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
807                        latest_valid_hash = Some(block_hash);
808                        (PayloadStatusEnum::Valid, true)
809                    }
810                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
811                        (PayloadStatusEnum::Syncing, false)
812                    }
813                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
814                        // not known to be invalid, but we don't know anything else
815                        (PayloadStatusEnum::Syncing, true)
816                    }
817                };
818
819                Ok(TryInsertPayloadResult {
820                    status: PayloadStatus::new(status, latest_valid_hash),
821                    already_seen,
822                })
823            }
824            Err(error) => {
825                let status = match error {
826                    InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
827                    InsertPayloadError::Payload(error) => {
828                        self.on_new_payload_error(error, num_hash, parent_hash)?
829                    }
830                };
831
832                Ok(TryInsertPayloadResult { status, already_seen: false })
833            }
834        }
835    }
836
837    /// Stores a payload for later processing during backfill sync.
838    ///
839    /// During backfill, the node lacks the state needed to validate payloads,
840    /// so they are buffered (stored in memory) until their parent blocks are synced.
841    ///
842    /// Returns:
843    /// - `Syncing`: Payload successfully buffered
844    /// - Error status: Payload is malformed or invalid
845    fn try_buffer_payload(
846        &mut self,
847        payload: T::ExecutionData,
848    ) -> Result<PayloadStatus, InsertBlockFatalError> {
849        let parent_hash = payload.parent_hash();
850        let num_hash = payload.num_hash();
851
852        match self.payload_validator.convert_payload_to_block(payload) {
853            // if the block is well-formed, buffer it for later
854            Ok(block) => {
855                if let Err(error) = self.buffer_block(block) {
856                    Ok(self.on_insert_block_error(error)?)
857                } else {
858                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
859                }
860            }
861            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
862        }
863    }
864
865    /// Returns the new chain for the given head.
866    ///
867    /// This also handles reorgs.
868    ///
869    /// Note: This does not update the tracked state and instead returns the new chain based on the
870    /// given head.
871    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
872        // get the executed new head block
873        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
874            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
875            self.metrics.engine.executed_new_block_cache_miss.increment(1);
876            return Ok(None)
877        };
878
879        let new_head_number = new_head_block.recovered_block().number();
880        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
881
882        let mut new_chain = vec![new_head_block.clone()];
883        let mut current_hash = new_head_block.recovered_block().parent_hash();
884        let mut current_number = new_head_number - 1;
885
886        // Walk back the new chain until we reach a block we know about
887        //
888        // This is only done for in-memory blocks, because we should not have persisted any blocks
889        // that are _above_ the current canonical head.
890        while current_number > current_canonical_number {
891            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
892            {
893                current_hash = block.recovered_block().parent_hash();
894                current_number -= 1;
895                new_chain.push(block);
896            } else {
897                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
898                // This should never happen as we're walking back a chain that should connect to
899                // the canonical chain
900                return Ok(None)
901            }
902        }
903
904        // If we have reached the current canonical head by walking back from the target, then we
905        // know this represents an extension of the canonical chain.
906        if current_hash == self.state.tree_state.current_canonical_head.hash {
907            new_chain.reverse();
908
909            // Simple extension of the current chain
910            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
911        }
912
913        // We have a reorg. Walk back both chains to find the fork point.
914        let mut old_chain = Vec::new();
915        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
916
917        // If the canonical chain is ahead of the new chain,
918        // gather all blocks until new head number.
919        while current_canonical_number > current_number {
920            let block = self.canonical_block_by_hash(old_hash)?;
921            old_hash = block.recovered_block().parent_hash();
922            old_chain.push(block);
923            current_canonical_number -= 1;
924        }
925
926        // Both new and old chain pointers are now at the same height.
927        debug_assert_eq!(current_number, current_canonical_number);
928
929        // Walk both chains from specified hashes at same height until
930        // a common ancestor (fork block) is reached.
931        while old_hash != current_hash {
932            let block = self.canonical_block_by_hash(old_hash)?;
933            old_hash = block.recovered_block().parent_hash();
934            old_chain.push(block);
935
936            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
937            {
938                current_hash = block.recovered_block().parent_hash();
939                new_chain.push(block);
940            } else {
941                // This shouldn't happen as we've already walked this path
942                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
943                return Ok(None)
944            }
945        }
946        new_chain.reverse();
947        old_chain.reverse();
948
949        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
950    }
951
952    /// Updates the latest block state to the specified canonical ancestor.
953    ///
954    /// This method ensures that the latest block tracks the given canonical header by resetting
955    ///
956    /// # Arguments
957    /// * `canonical_header` - The canonical header to set as the new head
958    ///
959    /// # Returns
960    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
961    ///
962    /// Caution: This unwinds the canonical chain
963    fn update_latest_block_to_canonical_ancestor(
964        &mut self,
965        canonical_header: &SealedHeader<N::BlockHeader>,
966    ) -> ProviderResult<()> {
967        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
968        let current_head_number = self.state.tree_state.canonical_block_number();
969        let new_head_number = canonical_header.number();
970        let new_head_hash = canonical_header.hash();
971
972        // Update tree state with the new canonical head
973        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
974
975        // Handle the state update based on whether this is an unwind scenario
976        if new_head_number < current_head_number {
977            debug!(
978                target: "engine::tree",
979                current_head = current_head_number,
980                new_head = new_head_number,
981                new_head_hash = ?new_head_hash,
982                "FCU unwind detected: reverting to canonical ancestor"
983            );
984
985            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
986        } else {
987            debug!(
988                target: "engine::tree",
989                previous_head = current_head_number,
990                new_head = new_head_number,
991                new_head_hash = ?new_head_hash,
992                "Advancing latest block to canonical ancestor"
993            );
994            self.handle_chain_advance_or_same_height(canonical_header)
995        }
996    }
997
998    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
999    /// to the canonical header
1000    fn handle_canonical_chain_unwind(
1001        &self,
1002        current_head_number: u64,
1003        canonical_header: &SealedHeader<N::BlockHeader>,
1004    ) -> ProviderResult<()> {
1005        let new_head_number = canonical_header.number();
1006        debug!(
1007            target: "engine::tree",
1008            from = current_head_number,
1009            to = new_head_number,
1010            "Handling unwind: collecting blocks to remove from in-memory state"
1011        );
1012
1013        // Collect blocks that need to be removed from memory
1014        let old_blocks =
1015            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
1016
1017        // Load and apply the canonical ancestor block
1018        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1019    }
1020
1021    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
1022    fn collect_blocks_for_canonical_unwind(
1023        &self,
1024        new_head_number: u64,
1025        current_head_number: u64,
1026    ) -> Vec<ExecutedBlock<N>> {
1027        let mut old_blocks =
1028            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1029
1030        for block_num in (new_head_number + 1)..=current_head_number {
1031            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1032                let executed_block = block_state.block_ref().clone();
1033                old_blocks.push(executed_block);
1034                debug!(
1035                    target: "engine::tree",
1036                    block_number = block_num,
1037                    "Collected block for removal from in-memory state"
1038                );
1039            }
1040        }
1041
1042        if old_blocks.is_empty() {
1043            debug!(
1044                target: "engine::tree",
1045                "No blocks found in memory to remove, will clear and reset state"
1046            );
1047        }
1048
1049        old_blocks
1050    }
1051
1052    /// Applies the canonical ancestor block via a reorg operation.
1053    fn apply_canonical_ancestor_via_reorg(
1054        &self,
1055        canonical_header: &SealedHeader<N::BlockHeader>,
1056        old_blocks: Vec<ExecutedBlock<N>>,
1057    ) -> ProviderResult<()> {
1058        let new_head_hash = canonical_header.hash();
1059        let new_head_number = canonical_header.number();
1060
1061        // Load the canonical ancestor's block
1062        let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1063        // Perform the reorg to properly handle the unwind
1064        self.canonical_in_memory_state
1065            .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1066
1067        // CRITICAL: Update the canonical head after the reorg
1068        // This ensures get_canonical_head() returns the correct block
1069        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1070
1071        debug!(
1072            target: "engine::tree",
1073            block_number = new_head_number,
1074            block_hash = ?new_head_hash,
1075            "Successfully loaded canonical ancestor into memory via reorg"
1076        );
1077
1078        Ok(())
1079    }
1080
1081    /// Handles chain advance or same height scenarios.
1082    fn handle_chain_advance_or_same_height(
1083        &self,
1084        canonical_header: &SealedHeader<N::BlockHeader>,
1085    ) -> ProviderResult<()> {
1086        // Load the block into memory if it's not already present
1087        self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1088
1089        // Update the canonical head header
1090        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1091
1092        Ok(())
1093    }
1094
1095    /// Ensures a block is loaded into memory if not already present.
1096    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1097        // Check if block is already in memory
1098        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1099            return Ok(());
1100        }
1101
1102        // Load the block from storage
1103        let executed_block = self.canonical_block_by_hash(block_hash)?;
1104        self.canonical_in_memory_state
1105            .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1106
1107        debug!(
1108            target: "engine::tree",
1109            block_number,
1110            block_hash = ?block_hash,
1111            "Added canonical block to in-memory state"
1112        );
1113
1114        Ok(())
1115    }
1116
1117    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1118    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1119    /// chain.
1120    ///
1121    /// These responses should adhere to the [Engine API Spec for
1122    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1123    ///
1124    /// Returns an error if an internal error occurred like a database error.
1125    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1126    fn on_forkchoice_updated(
1127        &mut self,
1128        state: ForkchoiceState,
1129        attrs: Option<T::PayloadAttributes>,
1130    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1131        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1132
1133        self.building_payload = attrs.is_some() && self.config.suppress_persistence_during_build();
1134
1135        // Record metrics
1136        self.record_forkchoice_metrics();
1137
1138        // Pre-validation of forkchoice state
1139        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1140            return Ok(TreeOutcome::new(early_result));
1141        }
1142
1143        // Return early if we are on the correct fork
1144        if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1145            return Ok(result);
1146        }
1147
1148        // Attempt to apply a chain update when the head differs from our canonical chain.
1149        // This handles reorgs and chain extensions by making the specified head canonical.
1150        if let Some(result) = self.apply_chain_update(state, &attrs)? {
1151            return Ok(result);
1152        }
1153
1154        // Fallback that ensures to catch up to the network's state.
1155        self.handle_missing_block(state)
1156    }
1157
1158    /// Records metrics for forkchoice updated calls
1159    fn record_forkchoice_metrics(&self) {
1160        self.canonical_in_memory_state.on_forkchoice_update_received();
1161    }
1162
1163    /// Pre-validates the forkchoice state and returns early if validation fails.
1164    ///
1165    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1166    /// returned. Returns `None` if validation passes and processing should continue.
1167    fn validate_forkchoice_state(
1168        &mut self,
1169        state: ForkchoiceState,
1170    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1171        if state.head_block_hash.is_zero() {
1172            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1173        }
1174
1175        // Check if the new head hash is connected to any ancestor that we previously marked as
1176        // invalid
1177        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1178        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1179            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1180        }
1181
1182        if !self.backfill_sync_state.is_idle() {
1183            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1184            // exclusive access to the database
1185            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1186            return Ok(Some(OnForkChoiceUpdated::syncing()));
1187        }
1188
1189        Ok(None)
1190    }
1191
1192    /// Handles the case where the forkchoice head is already canonical.
1193    ///
1194    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1195    /// processing is complete. Returns `None` if the head is not canonical and processing
1196    /// should continue.
1197    fn handle_canonical_head(
1198        &self,
1199        state: ForkchoiceState,
1200        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1201    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1202        // Process the forkchoice update by trying to make the head block canonical
1203        //
1204        // We can only process this forkchoice update if:
1205        // - we have the `head` block
1206        // - the head block is part of a chain that is connected to the canonical chain. This
1207        //   includes reorgs.
1208        //
1209        // Performing a FCU involves:
1210        // - marking the FCU's head block as canonical
1211        // - updating in memory state to reflect the new canonical chain
1212        // - updating canonical state trackers
1213        // - emitting a canonicalization event for the new chain (including reorg)
1214        // - if we have payload attributes, delegate them to the payload service
1215
1216        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1217            return Ok(None);
1218        }
1219
1220        trace!(target: "engine::tree", "fcu head hash is already canonical");
1221
1222        // Update the safe and finalized blocks and ensure their values are valid
1223        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1224            // safe or finalized hashes are invalid
1225            return Ok(Some(TreeOutcome::new(outcome)));
1226        }
1227
1228        // Process payload attributes if the head is already canonical
1229        if let Some(attr) = attrs {
1230            let tip = self
1231                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1232                .ok_or_else(|| {
1233                    // If we can't find the canonical block, then something is wrong and we need
1234                    // to return an error
1235                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1236                })?;
1237            // Clone only when we actually need to process the attributes
1238            let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1239            return Ok(Some(TreeOutcome::new(updated)));
1240        }
1241
1242        // The head block is already canonical
1243        Ok(Some(Self::valid_outcome(state)))
1244    }
1245
1246    /// Applies chain update for the new head block and processes payload attributes.
1247    ///
1248    /// This method handles the case where the forkchoice head differs from our current canonical
1249    /// head. It attempts to make the specified head block canonical by:
1250    /// - Checking if the head is already part of the canonical chain
1251    /// - Applying chain reorganizations (reorgs) if necessary
1252    /// - Processing payload attributes if provided
1253    /// - Returning the appropriate forkchoice update response
1254    ///
1255    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1256    /// Returns `None` if no chain update was needed or possible.
1257    fn apply_chain_update(
1258        &mut self,
1259        state: ForkchoiceState,
1260        attrs: &Option<T::PayloadAttributes>,
1261    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1262        // Check if the head is already part of the canonical chain
1263        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1264            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1265
1266            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1267            // own chain at will, so we need to always trigger a new payload job if requested.
1268            if self.engine_kind.is_opstack() ||
1269                self.config.always_process_payload_attributes_on_canonical_head()
1270            {
1271                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1272                // part of the canonical chain. We need to update the latest block state to reflect
1273                // the canonical ancestor. This ensures that state providers and the transaction
1274                // pool operate with the correct chain state after forkchoice update processing, and
1275                // new payloads built on the reorg'd head will be added to the tree immediately.
1276                if self.config.unwind_canonical_header() {
1277                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1278                }
1279
1280                if let Some(attr) = attrs {
1281                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1282                    // Clone only when we actually need to process the attributes
1283                    let updated =
1284                        self.process_payload_attributes(attr.clone(), &canonical_header, state);
1285                    return Ok(Some(TreeOutcome::new(updated)));
1286                }
1287            }
1288
1289            // According to the Engine API specification, client software MAY skip an update of the
1290            // forkchoice state and MUST NOT begin a payload build process if
1291            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1292            // of canonical chain, i.e. the ancestor passed payload validation process
1293            // and deemed `VALID`. In the case of such an event, client software MUST
1294            // return `{payloadStatus: {status: VALID, latestValidHash:
1295            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1296
1297            // The head block is already canonical and we're not processing payload attributes,
1298            // so we're not triggering a payload job and can return right away
1299            return Ok(Some(Self::valid_outcome(state)));
1300        }
1301
1302        // Ensure we can apply a new chain update for the head block
1303        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1304            let tip = chain_update.tip().clone_sealed_header();
1305            self.on_canonical_chain_update(chain_update);
1306
1307            // Update the safe and finalized blocks and ensure their values are valid
1308            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1309                // safe or finalized hashes are invalid
1310                return Ok(Some(TreeOutcome::new(outcome)));
1311            }
1312
1313            if let Some(attr) = attrs {
1314                // Clone only when we actually need to process the attributes
1315                let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1316                return Ok(Some(TreeOutcome::new(updated)));
1317            }
1318
1319            return Ok(Some(Self::valid_outcome(state)));
1320        }
1321
1322        Ok(None)
1323    }
1324
1325    /// Handles the case where the head block is missing and needs to be downloaded.
1326    ///
1327    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1328    /// Returns a `TreeOutcome` with syncing status and download event.
1329    fn handle_missing_block(
1330        &self,
1331        state: ForkchoiceState,
1332    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1333        // We don't have the block to perform the forkchoice update
1334        // We assume the FCU is valid and at least the head is missing,
1335        // so we need to start syncing to it
1336        //
1337        // find the appropriate target to sync to, if we don't have the safe block hash then we
1338        // start syncing to the safe block via backfill first
1339        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1340        // check that safe block is valid and missing
1341        !state.safe_block_hash.is_zero() &&
1342        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1343        {
1344            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1345            state.safe_block_hash
1346        } else {
1347            state.head_block_hash
1348        };
1349
1350        let target = self.lowest_buffered_ancestor_or(target);
1351        trace!(target: "engine::tree", %target, "downloading missing block");
1352
1353        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1354            PayloadStatusEnum::Syncing,
1355        )))
1356        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1357    }
1358
1359    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1360    /// the current persistence action while we're removing blocks.
1361    fn remove_blocks(&mut self, new_tip_num: u64) {
1362        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1363        if new_tip_num < self.persistence_state.last_persisted_block.number {
1364            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1365            let (tx, rx) = crossbeam_channel::bounded(1);
1366            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1367            self.persistence_state.start_remove(new_tip_num, rx);
1368        }
1369    }
1370
1371    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1372    /// the current persistence action while we're saving blocks.
1373    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1374        if blocks_to_persist.is_empty() {
1375            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1376            return
1377        }
1378
1379        // NOTE: checked non-empty above
1380        let highest_num_hash = blocks_to_persist
1381            .iter()
1382            .max_by_key(|block| block.recovered_block().number())
1383            .map(|b| b.recovered_block().num_hash())
1384            .expect("Checked non-empty persisting blocks");
1385
1386        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1387        let (tx, rx) = crossbeam_channel::bounded(1);
1388        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1389
1390        self.persistence_state.start_save(highest_num_hash, rx);
1391    }
1392
1393    /// Triggers new persistence actions if no persistence task is currently in progress.
1394    ///
1395    /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
1396    /// Persistence completion is handled separately via the `wait_for_event` method.
1397    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1398        if !self.persistence_state.in_progress() {
1399            if let Some(new_tip_num) = self.find_disk_reorg()? {
1400                self.remove_blocks(new_tip_num)
1401            } else if self.should_persist() {
1402                let blocks_to_persist =
1403                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1404                self.persist_blocks(blocks_to_persist);
1405            }
1406        }
1407
1408        Ok(())
1409    }
1410
1411    /// Finishes termination by persisting all remaining blocks and signaling completion.
1412    ///
1413    /// This blocks until all persistence is complete. Always signals completion,
1414    /// even if an error occurs.
1415    fn finish_termination(
1416        &mut self,
1417        pending_termination: oneshot::Sender<()>,
1418    ) -> Result<(), AdvancePersistenceError> {
1419        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1420        let result = self.persist_until_complete();
1421        let _ = pending_termination.send(());
1422        result
1423    }
1424
1425    /// Persists all remaining blocks until none are left.
1426    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1427        loop {
1428            // Wait for any in-progress persistence to complete (blocking)
1429            if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1430                debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1431                let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1432                self.on_persistence_complete(result, start_time)?;
1433            }
1434
1435            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1436
1437            if blocks_to_persist.is_empty() {
1438                debug!(target: "engine::tree", "persistence complete, signaling termination");
1439                return Ok(())
1440            }
1441
1442            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1443            self.persist_blocks(blocks_to_persist);
1444        }
1445    }
1446
1447    /// Tries to poll for a completed persistence task (non-blocking).
1448    ///
1449    /// Returns `true` if a persistence task was completed, `false` otherwise.
1450    fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1451        let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1452            return Ok(false);
1453        };
1454
1455        match rx.try_recv() {
1456            Ok(result) => {
1457                self.on_persistence_complete(result, start_time)?;
1458                Ok(true)
1459            }
1460            Err(crossbeam_channel::TryRecvError::Empty) => {
1461                // Not ready yet, put it back
1462                self.persistence_state.rx = Some((rx, start_time, action));
1463                Ok(false)
1464            }
1465            Err(crossbeam_channel::TryRecvError::Disconnected) => {
1466                Err(AdvancePersistenceError::ChannelClosed)
1467            }
1468        }
1469    }
1470
1471    /// Handles a completed persistence task.
1472    fn on_persistence_complete(
1473        &mut self,
1474        result: PersistenceResult,
1475        start_time: Instant,
1476    ) -> Result<(), AdvancePersistenceError> {
1477        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1478
1479        let commit_duration = result.commit_duration;
1480        let Some(BlockNumHash {
1481            hash: last_persisted_block_hash,
1482            number: last_persisted_block_number,
1483        }) = result.last_block
1484        else {
1485            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1486            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1487            return Ok(())
1488        };
1489
1490        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1491        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1492
1493        // Evict trie changesets for blocks below the eviction threshold.
1494        // Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
1495        // the finalized block if set.
1496        let min_threshold =
1497            last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1498        let eviction_threshold =
1499            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1500                // Use the minimum of finalized block and retention threshold to be conservative
1501                finalized.number.min(min_threshold)
1502            } else {
1503                // When finalized is not set (e.g., on L2s), use the retention threshold
1504                min_threshold
1505            };
1506        debug!(
1507            target: "engine::tree",
1508            last_persisted = last_persisted_block_number,
1509            finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1510            eviction_threshold,
1511            "Evicting changesets below threshold"
1512        );
1513        self.changeset_cache.evict(eviction_threshold);
1514
1515        self.on_new_persisted_block()?;
1516
1517        self.purge_timing_stats(last_persisted_block_number, commit_duration);
1518
1519        Ok(())
1520    }
1521
1522    /// Handles a message from the engine.
1523    ///
1524    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1525    fn on_engine_message(
1526        &mut self,
1527        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1528    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1529        match msg {
1530            FromEngine::Event(event) => match event {
1531                FromOrchestrator::BackfillSyncStarted => {
1532                    debug!(target: "engine::tree", "received backfill sync started event");
1533                    self.backfill_sync_state = BackfillSyncState::Active;
1534                }
1535                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1536                    self.on_backfill_sync_finished(ctrl)?;
1537                }
1538                FromOrchestrator::Terminate { tx } => {
1539                    debug!(target: "engine::tree", "received terminate request");
1540                    if let Err(err) = self.finish_termination(tx) {
1541                        error!(target: "engine::tree", %err, "Termination failed");
1542                    }
1543                    return Ok(ops::ControlFlow::Break(()))
1544                }
1545            },
1546            FromEngine::Request(request) => {
1547                match request {
1548                    EngineApiRequest::InsertExecutedBlock(payload) => {
1549                        let block_num_hash = payload.recovered_block.num_hash();
1550                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1551                            // outdated block that can be skipped
1552                            return Ok(ops::ControlFlow::Continue(()))
1553                        }
1554
1555                        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
1556                            // block already known to the tree (e.g. delivered via newPayload first)
1557                            return Ok(ops::ControlFlow::Continue(()))
1558                        }
1559
1560                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1561                        let now = Instant::now();
1562
1563                        let block = match self
1564                            .payload_validator
1565                            .on_inserted_executed_block(payload, &self.state)
1566                        {
1567                            Ok(block) => block,
1568                            Err(err) => {
1569                                warn!(target: "engine::tree", %err, block=?block_num_hash, "Failed to insert already executed block");
1570                                return Ok(ops::ControlFlow::Continue(()))
1571                            }
1572                        };
1573
1574                        // if the parent is the canonical head, we can insert the block as the
1575                        // pending block
1576                        if self.state.tree_state.canonical_block_hash() ==
1577                            block.recovered_block().parent_hash()
1578                        {
1579                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1580                            self.canonical_in_memory_state.set_pending_block(block.clone());
1581                        }
1582
1583                        self.state.tree_state.insert_executed(block.clone());
1584                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1585                        self.emit_event(EngineApiEvent::BeaconConsensus(
1586                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1587                        ));
1588                    }
1589                    EngineApiRequest::Beacon(request) => {
1590                        match request {
1591                            BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1592                                let has_attrs = payload_attrs.is_some();
1593
1594                                let start = Instant::now();
1595                                let mut output = self.on_forkchoice_updated(state, payload_attrs);
1596
1597                                if let Ok(res) = &mut output {
1598                                    // track last received forkchoice state
1599                                    self.state
1600                                        .forkchoice_state_tracker
1601                                        .set_latest(state, res.outcome.forkchoice_status());
1602
1603                                    // emit an event about the handled FCU
1604                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1605                                        state,
1606                                        res.outcome.forkchoice_status(),
1607                                    ));
1608
1609                                    // handle the event if any
1610                                    self.on_maybe_tree_event(res.event.take())?;
1611                                }
1612
1613                                if let Err(ref err) = output {
1614                                    error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1615                                }
1616
1617                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1618                                    start,
1619                                    &mut self.metrics.engine.new_payload.latest_finish_at,
1620                                    has_attrs,
1621                                    &output,
1622                                );
1623
1624                                if let Err(err) =
1625                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1626                                {
1627                                    self.metrics
1628                                        .engine
1629                                        .failed_forkchoice_updated_response_deliveries
1630                                        .increment(1);
1631                                    warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1632                                }
1633                            }
1634                            BeaconEngineMessage::NewPayload { payload, tx } => {
1635                                let start = Instant::now();
1636                                let gas_used = payload.gas_used();
1637                                let num_hash = payload.num_hash();
1638                                let mut output = self.on_new_payload(payload);
1639                                self.metrics.engine.new_payload.update_response_metrics(
1640                                    start,
1641                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1642                                    &output,
1643                                    gas_used,
1644                                );
1645
1646                                let maybe_event =
1647                                    output.as_mut().ok().and_then(|out| out.event.take());
1648
1649                                // emit response
1650                                if let Err(err) =
1651                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1652                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1653                                    }))
1654                                {
1655                                    warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1656                                    self.metrics
1657                                        .engine
1658                                        .failed_new_payload_response_deliveries
1659                                        .increment(1);
1660                                }
1661
1662                                // handle the event if any
1663                                self.on_maybe_tree_event(maybe_event)?;
1664                            }
1665                            BeaconEngineMessage::RethNewPayload {
1666                                payload,
1667                                wait_for_persistence,
1668                                wait_for_caches,
1669                                tx,
1670                                enqueued_at,
1671                            } => {
1672                                debug!(
1673                                    target: "engine::tree",
1674                                    wait_for_persistence,
1675                                    wait_for_caches,
1676                                    "Processing reth_newPayload"
1677                                );
1678
1679                                let backpressure_wait = enqueued_at.elapsed();
1680
1681                                let explicit_persistence_wait = if wait_for_persistence {
1682                                    let pending_persistence = self.persistence_state.rx.take();
1683                                    if let Some((rx, start_time, _action)) = pending_persistence {
1684                                        let (persistence_tx, persistence_rx) =
1685                                            std::sync::mpsc::channel();
1686                                        self.runtime.spawn_blocking_named(
1687                                            "wait-persist",
1688                                            move || {
1689                                                let start = Instant::now();
1690                                                let result = rx
1691                                                    .recv()
1692                                                    .expect("persistence state channel closed");
1693                                                let _ = persistence_tx.send((
1694                                                    result,
1695                                                    start_time,
1696                                                    start.elapsed(),
1697                                                ));
1698                                            },
1699                                        );
1700                                        let (result, start_time, wait_duration) = persistence_rx
1701                                            .recv()
1702                                            .expect("persistence result channel closed");
1703                                        let _ = self.on_persistence_complete(result, start_time);
1704                                        wait_duration
1705                                    } else {
1706                                        Duration::ZERO
1707                                    }
1708                                } else {
1709                                    Duration::ZERO
1710                                };
1711
1712                                let cache_wait = wait_for_caches
1713                                    .then(|| self.payload_validator.wait_for_caches());
1714
1715                                let start = Instant::now();
1716                                let gas_used = payload.gas_used();
1717                                let num_hash = payload.num_hash();
1718                                let mut output = self.on_new_payload(payload);
1719                                let latency = start.elapsed();
1720                                self.metrics.engine.new_payload.update_response_metrics(
1721                                    start,
1722                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1723                                    &output,
1724                                    gas_used,
1725                                );
1726
1727                                let maybe_event =
1728                                    output.as_mut().ok().and_then(|out| out.event.take());
1729
1730                                let timings = NewPayloadTimings {
1731                                    latency,
1732                                    persistence_wait: backpressure_wait + explicit_persistence_wait,
1733                                    execution_cache_wait: cache_wait
1734                                        .map(|wait| wait.execution_cache),
1735                                    sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1736                                };
1737                                if let Err(err) =
1738                                    tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1739                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1740                                    }))
1741                                {
1742                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1743                                    self.metrics
1744                                        .engine
1745                                        .failed_new_payload_response_deliveries
1746                                        .increment(1);
1747                                }
1748
1749                                self.on_maybe_tree_event(maybe_event)?;
1750                            }
1751                        }
1752                    }
1753                }
1754            }
1755            FromEngine::DownloadedBlocks(blocks) => {
1756                if let Some(event) = self.on_downloaded(blocks)? {
1757                    self.on_tree_event(event)?;
1758                }
1759            }
1760        }
1761        Ok(ops::ControlFlow::Continue(()))
1762    }
1763
1764    /// Invoked if the backfill sync has finished to target.
1765    ///
1766    /// At this point we consider the block synced to the backfill target.
1767    ///
1768    /// Checks the tracked finalized block against the block on disk and requests another backfill
1769    /// run if the distance to the tip exceeds the threshold for another backfill run.
1770    ///
1771    /// This will also do the necessary housekeeping of the tree state, this includes:
1772    ///  - removing all blocks below the backfill height
1773    ///  - resetting the canonical in-memory state
1774    ///
1775    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1776    /// target block.
1777    fn on_backfill_sync_finished(
1778        &mut self,
1779        ctrl: ControlFlow,
1780    ) -> Result<(), InsertBlockFatalError> {
1781        debug!(target: "engine::tree", "received backfill sync finished event");
1782        self.backfill_sync_state = BackfillSyncState::Idle;
1783
1784        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1785        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1786            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1787            // update the `invalid_headers` cache with the new invalid header
1788            self.state.invalid_headers.insert(**bad_block);
1789
1790            // if this was an unwind then the target is the new height
1791            Some(*target)
1792        } else {
1793            // backfill height is the block number that the backfill finished at
1794            ctrl.block_number()
1795        };
1796
1797        // backfill height is the block number that the backfill finished at
1798        let Some(backfill_height) = backfill_height else { return Ok(()) };
1799
1800        // state house keeping after backfill sync
1801        // remove all executed blocks below the backfill height
1802        //
1803        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1804        // before that
1805        let Some(backfill_num_hash) = self
1806            .provider
1807            .block_hash(backfill_height)?
1808            .map(|hash| BlockNumHash { hash, number: backfill_height })
1809        else {
1810            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1811            return Ok(())
1812        };
1813
1814        if ctrl.is_unwind() {
1815            // the node reset so we need to clear everything above that height so that backfill
1816            // height is the new canonical block.
1817            self.state.tree_state.reset(backfill_num_hash)
1818        } else {
1819            self.state.tree_state.remove_until(
1820                backfill_num_hash,
1821                self.persistence_state.last_persisted_block.hash,
1822                Some(backfill_num_hash),
1823            );
1824        }
1825
1826        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1827        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1828
1829        // remove all buffered blocks below the backfill height
1830        self.state.buffer.remove_old_blocks(backfill_height);
1831        self.purge_timing_stats(backfill_height, None);
1832        // we remove all entries because now we're synced to the backfill target and consider this
1833        // the canonical chain
1834        self.canonical_in_memory_state.clear_state();
1835
1836        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1837            // update the tracked chain height, after backfill sync both the canonical height and
1838            // persisted height are the same
1839            self.state.tree_state.set_canonical_head(new_head.num_hash());
1840            self.persistence_state.finish(new_head.hash(), new_head.number());
1841
1842            // update the tracked canonical head
1843            self.canonical_in_memory_state.set_canonical_head(new_head);
1844        }
1845
1846        // check if we need to run backfill again by comparing the most recent backfill target
1847        // height to the backfill height
1848        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1849        else {
1850            return Ok(())
1851        };
1852        if !self.engine_kind.is_opstack() && sync_target_state.finalized_block_hash.is_zero() {
1853            // no finalized block, can't check distance on non-OP Stack chains
1854            return Ok(())
1855        }
1856        let target_hash = self.backfill_target_hash(sync_target_state);
1857        if target_hash.is_zero() {
1858            return Ok(())
1859        }
1860        // get the block number of the backfill target block, if we have it buffered
1861        let newest_target = self.state.buffer.block(&target_hash).map(|block| block.number());
1862
1863        // The block number that the backfill finished at - if the progress or newest target is
1864        // None then we can't check the distance anyways.
1865        //
1866        // If both are Some, we perform another distance check and return the desired backfill
1867        // target
1868        if let Some(backfill_target) =
1869            ctrl.block_number().zip(newest_target).and_then(|(progress, target_number)| {
1870                // Determines whether or not we should run backfill again, in case
1871                // the new gap is still large enough and requires running backfill again
1872                self.backfill_sync_target(progress, target_number, None)
1873            })
1874        {
1875            // request another backfill run
1876            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1877                backfill_target.into(),
1878            )));
1879            return Ok(())
1880        };
1881
1882        // Check if there are more blocks to sync between current head and FCU target
1883        if let Some(lowest_buffered) =
1884            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1885        {
1886            let current_head_num = self.state.tree_state.current_canonical_head.number;
1887            let target_head_num = lowest_buffered.number();
1888
1889            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1890            {
1891                // There are blocks between current head and FCU target, download them
1892                debug!(
1893                    target: "engine::tree",
1894                    %current_head_num,
1895                    %target_head_num,
1896                    %distance,
1897                    "Backfill complete, downloading remaining blocks to reach FCU target"
1898                );
1899
1900                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1901                    lowest_buffered.parent_hash(),
1902                    distance,
1903                )));
1904                return Ok(());
1905            }
1906        } else {
1907            // We don't have the head block or any of its ancestors buffered. Request
1908            // a download for the head block which will then trigger further sync.
1909            debug!(
1910                target: "engine::tree",
1911                head_hash = %sync_target_state.head_block_hash,
1912                "Backfill complete but head block not buffered, requesting download"
1913            );
1914            self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1915                sync_target_state.head_block_hash,
1916            )));
1917            return Ok(());
1918        }
1919
1920        // try to close the gap by executing buffered blocks that are child blocks of the new head
1921        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1922    }
1923
1924    /// Attempts to make the given target canonical.
1925    ///
1926    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1927    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1928        if let Some(chain_update) = self.on_new_head(target)? {
1929            self.on_canonical_chain_update(chain_update);
1930        }
1931
1932        self.on_canonicalized_sync_target(target);
1933
1934        Ok(())
1935    }
1936
1937    /// Applies the tracked forkchoice state once its sync target head becomes canonical.
1938    fn on_canonicalized_sync_target(&mut self, target: B256) {
1939        let Some(sync_target_state) = self
1940            .state
1941            .forkchoice_state_tracker
1942            .sync_target_state()
1943            .filter(|state| state.head_block_hash == target)
1944        else {
1945            return;
1946        };
1947
1948        if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
1949            debug!(
1950                target: "engine::tree",
1951                head = %sync_target_state.head_block_hash,
1952                safe = %sync_target_state.safe_block_hash,
1953                finalized = %sync_target_state.finalized_block_hash,
1954                ?outcome,
1955                "Canonicalized sync target head before safe/finalized could be applied"
1956            );
1957            return;
1958        }
1959
1960        self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
1961    }
1962
1963    /// Convenience function to handle an optional tree event.
1964    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1965        if let Some(event) = event {
1966            self.on_tree_event(event)?;
1967        }
1968
1969        Ok(())
1970    }
1971
1972    /// Handles a tree event.
1973    ///
1974    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1975    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1976        match event {
1977            TreeEvent::TreeAction(action) => match action {
1978                TreeAction::MakeCanonical { sync_target_head } => {
1979                    self.make_canonical(sync_target_head)?;
1980                }
1981            },
1982            TreeEvent::BackfillAction(action) => {
1983                self.emit_event(EngineApiEvent::BackfillAction(action));
1984            }
1985            TreeEvent::Download(action) => {
1986                self.emit_event(EngineApiEvent::Download(action));
1987            }
1988        }
1989
1990        Ok(())
1991    }
1992
1993    /// Removes timing stats for blocks at or below `below_number`.
1994    ///
1995    /// No-op when detailed block logging is disabled (no stats are recorded in that case).
1996    /// When `commit_duration` is provided and a slow block threshold is configured, checks
1997    /// each removed block against the threshold and emits a [`ConsensusEngineEvent::SlowBlock`]
1998    /// event for blocks that exceed it.
1999    fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
2000        let threshold = self.config.slow_block_threshold();
2001        let check_slow = commit_duration.is_some() && threshold.is_some();
2002
2003        // Two-pass: collect keys first because emit_event borrows &mut self.
2004        let keys_to_remove: Vec<B256> = self
2005            .execution_timing_stats
2006            .iter()
2007            .filter(|(_, stats)| stats.block_number <= below_number)
2008            .map(|(k, _)| *k)
2009            .collect();
2010
2011        for key in keys_to_remove {
2012            let stats = self.execution_timing_stats.remove(&key).expect("key just found");
2013            if check_slow {
2014                let commit_dur = commit_duration.expect("checked above");
2015                // state_read_duration is already included in execution_duration
2016                let total_duration =
2017                    stats.execution_duration + stats.state_hash_duration + commit_dur;
2018
2019                if total_duration > threshold.expect("checked above") {
2020                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2021                        stats,
2022                        commit_duration: Some(commit_dur),
2023                        total_duration,
2024                    }));
2025                }
2026            }
2027        }
2028    }
2029
2030    /// Emits an outgoing event to the engine.
2031    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
2032        let event = event.into();
2033
2034        if event.is_backfill_action() {
2035            debug_assert_eq!(
2036                self.backfill_sync_state,
2037                BackfillSyncState::Idle,
2038                "backfill action should only be emitted when backfill is idle"
2039            );
2040
2041            if self.persistence_state.in_progress() {
2042                // backfill sync and persisting data are mutually exclusive, so we can't start
2043                // backfill while we're still persisting
2044                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
2045                return
2046            }
2047
2048            self.backfill_sync_state = BackfillSyncState::Pending;
2049            self.metrics.engine.pipeline_runs.increment(1);
2050            debug!(target: "engine::tree", "emitting backfill action event");
2051        }
2052
2053        let _ = self.outgoing.send(event).inspect_err(
2054            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2055        );
2056    }
2057
2058    /// Returns true if the canonical chain length minus the last persisted
2059    /// block is greater than or equal to the persistence threshold,
2060    /// backfill is not running, and no payload is currently being built.
2061    pub const fn should_persist(&self) -> bool {
2062        if self.building_payload {
2063            return false
2064        }
2065
2066        if !self.backfill_sync_state.is_idle() {
2067            // can't persist if backfill is running
2068            return false
2069        }
2070
2071        let min_block = self.persistence_state.last_persisted_block.number;
2072        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2073            self.config.persistence_threshold()
2074    }
2075
2076    /// Returns a batch of consecutive canonical blocks to persist in the range
2077    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
2078    fn get_canonical_blocks_to_persist(
2079        &self,
2080        target: PersistTarget,
2081    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2082        // We will calculate the state root using the database, so we need to be sure there are no
2083        // changes
2084        debug_assert!(!self.persistence_state.in_progress());
2085
2086        let mut blocks_to_persist = Vec::new();
2087        let mut current_hash = self.state.tree_state.canonical_block_hash();
2088        let last_persisted_number = self.persistence_state.last_persisted_block.number;
2089        let canonical_head_number = self.state.tree_state.canonical_block_number();
2090
2091        let target_number = match target {
2092            PersistTarget::Head => canonical_head_number,
2093            PersistTarget::Threshold => {
2094                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2095            }
2096        };
2097
2098        debug!(
2099            target: "engine::tree",
2100            ?current_hash,
2101            ?last_persisted_number,
2102            ?canonical_head_number,
2103            ?target_number,
2104            "Returning canonical blocks to persist"
2105        );
2106        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
2107            if block.recovered_block().number() <= last_persisted_number {
2108                break;
2109            }
2110
2111            if block.recovered_block().number() <= target_number {
2112                blocks_to_persist.push(block.clone());
2113            }
2114
2115            current_hash = block.recovered_block().parent_hash();
2116        }
2117
2118        // Reverse the order so that the oldest block comes first
2119        blocks_to_persist.reverse();
2120
2121        Ok(blocks_to_persist)
2122    }
2123
2124    /// This clears the blocks from the in-memory tree state that have been persisted to the
2125    /// database.
2126    ///
2127    /// This also updates the canonical in-memory state to reflect the newest persisted block
2128    /// height.
2129    ///
2130    /// Assumes that `finish` has been called on the `persistence_state` at least once
2131    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2132        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
2133        // state.
2134        if let Some(remove_above) = self.find_disk_reorg()? {
2135            self.remove_blocks(remove_above);
2136            return Ok(())
2137        }
2138
2139        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2140        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2141        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2142            number: self.persistence_state.last_persisted_block.number,
2143            hash: self.persistence_state.last_persisted_block.hash,
2144        });
2145        Ok(())
2146    }
2147
2148    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
2149    ///
2150    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
2151    /// or the database. If the required historical data (such as trie change sets) has been
2152    /// pruned for a given block, this operation will return an error. On archive nodes, it
2153    /// can retrieve any block.
2154    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2155    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2156        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2157        // check memory first
2158        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2159            return Ok(block.clone())
2160        }
2161
2162        let (block, senders) = self
2163            .provider
2164            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2165            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2166            .split_sealed();
2167        let mut execution_output = self
2168            .provider
2169            .get_state(block.header().number())?
2170            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2171        let hashed_state = self.provider.hashed_post_state(execution_output.state());
2172
2173        debug!(
2174            target: "engine::tree",
2175            number = ?block.number(),
2176            "computing block trie updates",
2177        );
2178        let db_provider = self.provider.database_provider_ro()?;
2179        let trie_updates = reth_trie_db::compute_block_trie_updates(
2180            &self.changeset_cache,
2181            &db_provider,
2182            block.number(),
2183        )?;
2184
2185        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2186        let sorted_trie_updates = Arc::new(trie_updates);
2187        let trie_data = ComputedTrieData::new(sorted_hashed_state, sorted_trie_updates);
2188
2189        let execution_output = Arc::new(BlockExecutionOutput {
2190            state: execution_output.bundle,
2191            result: BlockExecutionResult {
2192                receipts: execution_output.receipts.pop().unwrap_or_default(),
2193                requests: execution_output.requests.pop().unwrap_or_default(),
2194                gas_used: block.gas_used(),
2195                blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2196            },
2197        });
2198
2199        Ok(ExecutedBlock::new(
2200            Arc::new(RecoveredBlock::new_sealed(block, senders)),
2201            execution_output,
2202            trie_data,
2203        ))
2204    }
2205
2206    /// Returns `true` if a block with the given hash is known, either in memory or in the
2207    /// database. This is a lightweight existence check that avoids constructing a full
2208    /// [`SealedHeader`].
2209    fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2210        if self.state.tree_state.contains_hash(&hash) {
2211            Ok(true)
2212        } else {
2213            self.provider.is_known(hash)
2214        }
2215    }
2216
2217    /// Return sealed block header from in-memory state or database by hash.
2218    fn sealed_header_by_hash(
2219        &self,
2220        hash: B256,
2221    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2222        // check memory first
2223        let header = self.state.tree_state.sealed_header_by_hash(&hash);
2224
2225        if header.is_some() {
2226            Ok(header)
2227        } else {
2228            self.provider.sealed_header_by_hash(hash)
2229        }
2230    }
2231
2232    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
2233    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
2234    /// not exist in the buffer, this returns the hash that is passed in.
2235    ///
2236    /// Returns the parent hash of the block itself if the block is buffered and has no other
2237    /// buffered ancestors.
2238    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2239        self.state
2240            .buffer
2241            .lowest_ancestor(&hash)
2242            .map(|block| block.parent_hash())
2243            .unwrap_or_else(|| hash)
2244    }
2245
2246    /// If validation fails, the response MUST contain the latest valid hash:
2247    ///
2248    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
2249    ///     conditions:
2250    ///     - It is fully validated and deemed VALID
2251    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
2252    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
2253    ///     conditions are satisfied by a `PoW` block.
2254    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
2255    ///     the above conditions.
2256    fn latest_valid_hash_for_invalid_payload(
2257        &mut self,
2258        parent_hash: B256,
2259    ) -> ProviderResult<Option<B256>> {
2260        // Check if parent exists in side chain or in canonical chain.
2261        if self.has_block_by_hash(parent_hash)? {
2262            return Ok(Some(parent_hash))
2263        }
2264
2265        // iterate over ancestors in the invalid cache
2266        // until we encounter the first valid ancestor
2267        let mut current_hash = parent_hash;
2268        let mut current_block = self.state.invalid_headers.get(&current_hash);
2269        while let Some(block_with_parent) = current_block {
2270            current_hash = block_with_parent.parent;
2271            current_block = self.state.invalid_headers.get(&current_hash);
2272
2273            // If current_header is None, then the current_hash does not have an invalid
2274            // ancestor in the cache, check its presence in blockchain tree
2275            if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2276                return Ok(Some(current_hash))
2277            }
2278        }
2279        Ok(None)
2280    }
2281
2282    /// Prepares the invalid payload response for the given hash, checking the
2283    /// database for the parent hash and populating the payload status with the latest valid hash
2284    /// according to the engine api spec.
2285    fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2286        let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2287            // Edge case: the `latestValid` field is the zero hash if the parent block is the
2288            // terminal PoW block, which we need to identify by looking at the parent's block
2289            // difficulty
2290            Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2291            Some(_) => Some(parent_hash),
2292            None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2293        };
2294
2295        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2296            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2297        })
2298        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2299    }
2300
2301    /// Returns true if the given hash is the last received sync target block.
2302    ///
2303    /// See [`ForkchoiceStateTracker::sync_target_state`]
2304    fn is_sync_target_head(&self, block_hash: B256) -> bool {
2305        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2306            return target.head_block_hash == block_hash
2307        }
2308        false
2309    }
2310
2311    /// Returns true if the given hash is part of the last received sync target fork choice update.
2312    ///
2313    /// See [`ForkchoiceStateTracker::sync_target_state`]
2314    fn is_any_sync_target(&self, block_hash: B256) -> bool {
2315        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2316            return target.contains(block_hash)
2317        }
2318        false
2319    }
2320
2321    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
2322    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
2323    ///
2324    /// Returns a payload status response according to the engine API spec if the block is known to
2325    /// be invalid.
2326    fn check_invalid_ancestor_with_head(
2327        &mut self,
2328        check: B256,
2329        head: &SealedBlock<N::Block>,
2330    ) -> ProviderResult<Option<PayloadStatus>> {
2331        // check if the check hash was previously marked as invalid
2332        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2333
2334        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2335    }
2336
2337    /// Invoked when a new payload received is invalid.
2338    fn on_invalid_new_payload(
2339        &mut self,
2340        head: SealedBlock<N::Block>,
2341        invalid: BlockWithParent,
2342    ) -> ProviderResult<PayloadStatus> {
2343        // populate the latest valid hash field
2344        let status = self.prepare_invalid_response(invalid.parent)?;
2345
2346        // insert the head block into the invalid header cache
2347        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2348        self.emit_event(ConsensusEngineEvent::InvalidBlock {
2349            block: Box::new(head),
2350            error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2351        });
2352
2353        Ok(status)
2354    }
2355
2356    /// Finds any invalid ancestor for the given payload.
2357    ///
2358    /// This function first checks if the block itself is in the invalid headers cache (to
2359    /// avoid re-executing a known-invalid block). Then it walks up the chain of buffered
2360    /// ancestors and checks if any ancestor is marked as invalid.
2361    ///
2362    /// The check works by:
2363    /// 1. Checking if the block hash itself is in the `invalid_headers` map
2364    /// 2. Finding the lowest buffered ancestor for the given block hash
2365    /// 3. If the ancestor is the same as the block hash itself, using the parent hash instead
2366    /// 4. Checking if this ancestor is in the `invalid_headers` map
2367    ///
2368    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
2369    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2370        let parent_hash = payload.parent_hash();
2371        let block_hash = payload.block_hash();
2372
2373        // Check if the block itself is already known to be invalid, avoiding re-execution
2374        if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2375            return Some(entry);
2376        }
2377
2378        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2379        if lowest_buffered_ancestor == block_hash {
2380            lowest_buffered_ancestor = parent_hash;
2381        }
2382
2383        // Check if the block has an invalid ancestor
2384        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2385    }
2386
2387    /// Handles a payload that has an invalid ancestor.
2388    ///
2389    /// This function validates the payload and processes it according to whether it's
2390    /// well-formed or malformed:
2391    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2392    ///    known-bad block, which violates consensus rules
2393    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2394    ///    validated due to its own structural issues
2395    fn handle_invalid_ancestor_payload(
2396        &mut self,
2397        payload: T::ExecutionData,
2398        invalid: BlockWithParent,
2399    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2400        let parent_hash = payload.parent_hash();
2401        let num_hash = payload.num_hash();
2402
2403        // Here we might have 2 cases
2404        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2405        //    remember it as invalid
2406        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2407        //    an error and forget it
2408        let block = match self.payload_validator.convert_payload_to_block(payload) {
2409            Ok(block) => block,
2410            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2411        };
2412
2413        Ok(self.on_invalid_new_payload(block, invalid)?)
2414    }
2415
2416    /// Checks if the given `head` points to an invalid header, which requires a specific response
2417    /// to a forkchoice update.
2418    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2419        // check if the head was previously marked as invalid
2420        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2421
2422        // Try to prepare invalid response, but handle errors gracefully
2423        match self.prepare_invalid_response(header.parent) {
2424            Ok(status) => Ok(Some(status)),
2425            Err(err) => {
2426                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2427                // Return a basic invalid status without latest valid hash
2428                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2429                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2430                })))
2431            }
2432        }
2433    }
2434
2435    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2436    /// and block body itself.
2437    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2438        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2439            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2440            return Err(e)
2441        }
2442
2443        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2444            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2445            return Err(e)
2446        }
2447
2448        Ok(())
2449    }
2450
2451    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2452    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2453    fn try_connect_buffered_blocks(
2454        &mut self,
2455        parent: BlockNumHash,
2456    ) -> Result<(), InsertBlockFatalError> {
2457        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2458
2459        if blocks.is_empty() {
2460            // nothing to append
2461            return Ok(())
2462        }
2463
2464        let now = Instant::now();
2465        let block_count = blocks.len();
2466        for child in blocks {
2467            let child_num_hash = child.num_hash();
2468            match self.insert_block(child) {
2469                Ok(res) => {
2470                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2471                    if self.is_any_sync_target(child_num_hash.hash) &&
2472                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2473                    {
2474                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2475                        // we just inserted a block that we know is part of the canonical chain, so
2476                        // we can make it canonical
2477                        self.make_canonical(child_num_hash.hash)?;
2478                    }
2479                }
2480                Err(err) => {
2481                    if let InsertPayloadError::Block(err) = err {
2482                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2483                        if let Err(fatal) = self.on_insert_block_error(err) {
2484                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2485                            return Err(fatal)
2486                        }
2487                    }
2488                }
2489            }
2490        }
2491
2492        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2493        Ok(())
2494    }
2495
2496    /// Pre-validates the block and inserts it into the buffer.
2497    fn buffer_block(
2498        &mut self,
2499        block: SealedBlock<N::Block>,
2500    ) -> Result<(), InsertBlockError<N::Block>> {
2501        if let Err(err) = self.validate_block(&block) {
2502            return Err(InsertBlockError::consensus_error(err, block))
2503        }
2504        self.state.buffer.insert_block(block);
2505        Ok(())
2506    }
2507
2508    /// Returns true if the distance from the local tip to the block is greater than the configured
2509    /// threshold.
2510    ///
2511    /// If the `local_tip` is greater than the `block`, then this will return false.
2512    #[inline]
2513    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2514        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2515    }
2516
2517    /// Returns how far the local tip is from the given block. If the local tip is at the same
2518    /// height or its block number is greater than the given block, this returns None.
2519    #[inline]
2520    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2521        if block > local_tip {
2522            Some(block - local_tip)
2523        } else {
2524            None
2525        }
2526    }
2527
2528    /// Returns the block hash that backfill should target.
2529    ///
2530    /// Defaults to the finalized block hash. On OP Stack, the CL finalizes in large batches and the
2531    /// finalized hash can lag the canonical tip by a wide margin, so backfill targets the head.
2532    ///
2533    /// The zero-finalized optimistic-sync fallback for non-OP Stack chains is handled by
2534    /// [`Self::backfill_sync_target`].
2535    const fn backfill_target_hash(&self, state: ForkchoiceState) -> B256 {
2536        if self.engine_kind.is_opstack() {
2537            state.head_block_hash
2538        } else {
2539            state.finalized_block_hash
2540        }
2541    }
2542
2543    /// Returns the target hash to sync to if the distance from the local tip is greater than the
2544    /// threshold and we're not yet synced to the backfill target (see
2545    /// [`Self::backfill_target_hash`]).
2546    ///
2547    /// If this is invoked after a new block has been downloaded, the downloaded block could be
2548    /// the (missing) target block.
2549    fn backfill_sync_target(
2550        &self,
2551        canonical_tip_num: u64,
2552        target_block_number: u64,
2553        downloaded_block: Option<BlockNumHash>,
2554    ) -> Option<B256> {
2555        let state = self.state.forkchoice_state_tracker.sync_target_state()?;
2556        let target_hash = self.backfill_target_hash(state);
2557
2558        // check if the downloaded block is the tracked backfill target
2559        let exceeds_backfill_threshold = match downloaded_block.as_ref() {
2560            // if we downloaded the target block we can now check how far we're off
2561            Some(downloaded_block) if downloaded_block.hash == target_hash => {
2562                self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2563            }
2564            _ => match self.state.buffer.block(&target_hash) {
2565                // if we have buffered the target block, we should check how far we're off
2566                Some(buffered_target) => {
2567                    self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_target.number())
2568                }
2569                // check if the distance exceeds the threshold for backfill sync
2570                None => self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number),
2571            },
2572        };
2573
2574        if !exceeds_backfill_threshold {
2575            return None
2576        }
2577
2578        // if we have already canonicalized the target block, we should skip backfill
2579        match self.provider.header_by_hash_or_number(target_hash.into()) {
2580            Err(err) => {
2581                warn!(target: "engine::tree", %err, "Failed to get backfill target block header");
2582                None
2583            }
2584            // we don't have the block yet and the distance exceeds the allowed threshold
2585            Ok(None) if !target_hash.is_zero() => Some(target_hash),
2586            Ok(None) => {
2587                // OPTIMISTIC SYNCING
2588                //
2589                // It can happen when the node is doing an
2590                // optimistic sync, where the CL has no knowledge of the finalized hash,
2591                // but is expecting the EL to sync as high
2592                // as possible before finalizing.
2593                //
2594                // This usually doesn't happen on ETH mainnet since CLs use the more
2595                // secure checkpoint syncing.
2596                //
2597                // However, optimism chains will do this. The risk of a reorg is however
2598                // low.
2599                debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2600                Some(state.head_block_hash)
2601            }
2602            // we're fully synced to the target block
2603            Ok(Some(_)) => None,
2604        }
2605    }
2606
2607    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2608    /// happen if a reorg is happening while we are persisting a block.
2609    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2610        let mut canonical = self.state.tree_state.current_canonical_head;
2611        let mut persisted = self.persistence_state.last_persisted_block;
2612
2613        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2614            Ok(self
2615                .sealed_header_by_hash(num_hash.hash)?
2616                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2617                .parent_num_hash())
2618        };
2619
2620        // Happy path, canonical chain is ahead or equal to persisted chain.
2621        // Walk canonical chain back to make sure that it connects to persisted chain.
2622        while canonical.number > persisted.number {
2623            canonical = parent_num_hash(canonical)?;
2624        }
2625
2626        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2627        if canonical == persisted {
2628            return Ok(None);
2629        }
2630
2631        // At this point, we know that `persisted` block can't be reached by walking the canonical
2632        // chain back. In this case we need to truncate it to the first canonical block it connects
2633        // to.
2634
2635        // Firstly, walk back until we reach the same height as `canonical`.
2636        while persisted.number > canonical.number {
2637            persisted = parent_num_hash(persisted)?;
2638        }
2639
2640        debug_assert_eq!(persisted.number, canonical.number);
2641
2642        // Now walk both chains back until we find a common ancestor.
2643        while persisted.hash != canonical.hash {
2644            canonical = parent_num_hash(canonical)?;
2645            persisted = parent_num_hash(persisted)?;
2646        }
2647
2648        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2649
2650        Ok(Some(persisted.number))
2651    }
2652
2653    /// Invoked when we the canonical chain has been updated.
2654    ///
2655    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2656    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2657        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2658        let start = Instant::now();
2659
2660        // update the tracked canonical head
2661        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2662
2663        let tip = chain_update.tip().clone_sealed_header();
2664        let notification = chain_update.to_chain_notification();
2665
2666        // reinsert any missing reorged blocks
2667        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2668            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2669            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2670            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2671
2672            self.update_reorg_metrics(old.len(), old_first);
2673            self.reinsert_reorged_blocks(new.clone());
2674            self.reinsert_reorged_blocks(old.clone());
2675        }
2676
2677        // update the tracked in-memory state with the new chain
2678        self.canonical_in_memory_state.update_chain(chain_update);
2679        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2680
2681        // Update metrics based on new tip
2682        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2683
2684        // sends an event to all active listeners about the new canonical chain
2685        self.canonical_in_memory_state.notify_canon_state(notification);
2686
2687        // emit event
2688        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2689            Box::new(tip),
2690            start.elapsed(),
2691        ));
2692    }
2693
2694    /// This updates metrics based on the given reorg length and first reorged block number.
2695    fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2696        if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2697            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2698                first_reorged_block <= finalized.number
2699            {
2700                self.metrics.tree.reorgs.finalized.increment(1);
2701            } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2702                first_reorged_block <= safe.number
2703            {
2704                self.metrics.tree.reorgs.safe.increment(1);
2705            } else {
2706                self.metrics.tree.reorgs.head.increment(1);
2707            }
2708        } else {
2709            debug_unreachable!("Reorged chain doesn't have any blocks");
2710        }
2711        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2712    }
2713
2714    /// This reinserts any blocks in the new chain that do not already exist in the tree
2715    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2716        for block in new_chain {
2717            if self
2718                .state
2719                .tree_state
2720                .executed_block_by_hash(block.recovered_block().hash())
2721                .is_none()
2722            {
2723                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2724                self.state.tree_state.insert_executed(block);
2725            }
2726        }
2727    }
2728
2729    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2730    ///
2731    /// This mainly compares the missing parent of the downloaded block with the current canonical
2732    /// tip, and decides whether or not backfill sync should be triggered.
2733    fn on_disconnected_downloaded_block(
2734        &self,
2735        downloaded_block: BlockNumHash,
2736        missing_parent: BlockNumHash,
2737        head: BlockNumHash,
2738    ) -> Option<TreeEvent> {
2739        // compare the missing parent with the canonical tip
2740        if let Some(target) =
2741            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2742        {
2743            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2744            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2745        }
2746
2747        // continue downloading the missing parent
2748        //
2749        // this happens if either:
2750        //  * the missing parent block num < canonical tip num
2751        //    * this case represents a missing block on a fork that is shorter than the canonical
2752        //      chain
2753        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2754        //    less than the backfill threshold
2755        //    * this case represents a potentially long range of blocks to download and execute
2756        let request = if let Some(distance) =
2757            self.distance_from_local_tip(head.number, missing_parent.number)
2758        {
2759            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2760            DownloadRequest::BlockRange(missing_parent.hash, distance)
2761        } else {
2762            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2763            // This happens when the missing parent is on an outdated
2764            // sidechain and we can only download the missing block itself
2765            DownloadRequest::single_block(missing_parent.hash)
2766        };
2767
2768        Some(TreeEvent::Download(request))
2769    }
2770
2771    /// Handles a downloaded block that was successfully inserted as valid.
2772    ///
2773    /// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
2774    /// If it matches a non-head sync target (safe or finalized), makes it canonical inline
2775    /// and triggers a download for the remaining blocks towards the actual head.
2776    /// Otherwise, tries to connect buffered blocks.
2777    fn on_valid_downloaded_block(
2778        &mut self,
2779        block_num_hash: BlockNumHash,
2780    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2781        // check if we just inserted a block that's part of sync targets,
2782        // i.e. head, safe, or finalized
2783        if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2784            sync_target.contains(block_num_hash.hash)
2785        {
2786            debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2787
2788            if sync_target.head_block_hash == block_num_hash.hash {
2789                // we just inserted the sync target head block, make it canonical
2790                return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2791                    sync_target_head: block_num_hash.hash,
2792                })))
2793            }
2794
2795            // This block is part of the sync target (safe or finalized) but not the
2796            // head. Make it canonical and try to connect any buffered children, then
2797            // continue downloading towards the actual head if needed.
2798            self.make_canonical(block_num_hash.hash)?;
2799            self.try_connect_buffered_blocks(block_num_hash)?;
2800
2801            // Check if we've reached the sync target head after connecting buffered
2802            // blocks (e.g. the head block may have already been buffered).
2803            if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2804                let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2805                trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2806                return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2807            }
2808
2809            return Ok(None)
2810        }
2811        trace!(target: "engine::tree", "appended downloaded block");
2812        self.try_connect_buffered_blocks(block_num_hash)?;
2813        Ok(None)
2814    }
2815
2816    /// Invoked with a block downloaded from the network
2817    ///
2818    /// Returns an event with the appropriate action to take, such as:
2819    ///  - download more missing blocks
2820    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2821    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2822    fn on_downloaded_block(
2823        &mut self,
2824        block: SealedBlock<N::Block>,
2825    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2826        let block_num_hash = block.num_hash();
2827        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2828        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2829            return Ok(None)
2830        }
2831
2832        if !self.backfill_sync_state.is_idle() {
2833            return Ok(None)
2834        }
2835
2836        // try to append the block
2837        match self.insert_block(block) {
2838            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2839                return self.on_valid_downloaded_block(block_num_hash);
2840            }
2841            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2842                // block is not connected to the canonical head, we need to download
2843                // its missing branch first
2844                return Ok(self.on_disconnected_downloaded_block(
2845                    block_num_hash,
2846                    missing_ancestor,
2847                    head,
2848                ))
2849            }
2850            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2851                trace!(target: "engine::tree", "downloaded block already executed");
2852            }
2853            Err(err) => {
2854                if let InsertPayloadError::Block(err) = err {
2855                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2856                    if let Err(fatal) = self.on_insert_block_error(err) {
2857                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2858                        return Err(fatal)
2859                    }
2860                }
2861            }
2862        }
2863        Ok(None)
2864    }
2865
2866    /// Inserts a payload into the tree and executes it.
2867    ///
2868    /// This function validates the payload's basic structure, then executes it using the
2869    /// payload validator. The execution includes running all transactions in the payload
2870    /// and validating the resulting state transitions.
2871    ///
2872    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2873    /// or `InsertPayloadError` if validation or execution failed.
2874    fn insert_payload(
2875        &mut self,
2876        payload: T::ExecutionData,
2877    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2878        self.insert_block_or_payload(
2879            payload.block_with_parent(),
2880            payload,
2881            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2882            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2883        )
2884    }
2885
2886    fn insert_block(
2887        &mut self,
2888        block: SealedBlock<N::Block>,
2889    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2890        self.insert_block_or_payload(
2891            block.block_with_parent(),
2892            block,
2893            |validator, block, ctx| validator.validate_block(block, ctx),
2894            |_, block| Ok(block),
2895        )
2896    }
2897
2898    /// Inserts a block or payload into the blockchain tree with full execution.
2899    ///
2900    /// This is a generic function that handles both blocks and payloads by accepting
2901    /// a block identifier, input data, and execution/validation functions. It performs
2902    /// comprehensive checks and execution:
2903    ///
2904    /// - Validates that the block doesn't already exist in the tree
2905    /// - Ensures parent state is available, buffering if necessary
2906    /// - Executes the block/payload using the provided execute function
2907    /// - Handles both canonical and fork chain insertions
2908    /// - Updates pending block state when appropriate
2909    /// - Emits consensus engine events and records metrics
2910    ///
2911    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2912    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2913    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2914    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2915    fn insert_block_or_payload<Input, Err>(
2916        &mut self,
2917        block_id: BlockWithParent,
2918        input: Input,
2919        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ValidationOutput<N>, Err>,
2920        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2921    ) -> Result<InsertPayloadOk, Err>
2922    where
2923        Err: From<InsertBlockError<N::Block>>,
2924    {
2925        let block_insert_start = Instant::now();
2926        let block_num_hash = block_id.block;
2927        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2928
2929        // Check if block already exists - first in memory, then DB only if it could be persisted
2930        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2931            convert_to_block(self, input)?;
2932            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2933        }
2934
2935        // Only query DB if block could be persisted (number <= last persisted block).
2936        // New blocks from CL always have number > last persisted, so skip DB lookup for them.
2937        if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2938            match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2939                Err(err) => {
2940                    let block = convert_to_block(self, input)?;
2941                    return Err(InsertBlockError::new(block, err.into()).into());
2942                }
2943                Ok(Some(_)) => {
2944                    convert_to_block(self, input)?;
2945                    return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2946                }
2947                Ok(None) => {}
2948            }
2949        }
2950
2951        // Ensure that the parent state is available.
2952        match self.state_provider_builder(block_id.parent) {
2953            Err(err) => {
2954                let block = convert_to_block(self, input)?;
2955                return Err(InsertBlockError::new(block, err.into()).into());
2956            }
2957            Ok(None) => {
2958                let block = convert_to_block(self, input)?;
2959
2960                // we don't have the state required to execute this block, buffering it and find the
2961                // missing parent block
2962                let missing_ancestor = self
2963                    .state
2964                    .buffer
2965                    .lowest_ancestor(&block.parent_hash())
2966                    .map(|block| block.parent_num_hash())
2967                    .unwrap_or_else(|| block.parent_num_hash());
2968
2969                self.state.buffer.insert_block(block);
2970
2971                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2972                    head: self.state.tree_state.current_canonical_head,
2973                    missing_ancestor,
2974                }))
2975            }
2976            Ok(Some(_)) => {}
2977        }
2978
2979        // determine whether we are on a fork chain by comparing the block number with the
2980        // canonical head. This is a simple check that is sufficient for the event emission below.
2981        // A block is considered a fork if its number is less than or equal to the canonical head,
2982        // as this indicates there's already a canonical block at that height.
2983        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2984
2985        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2986
2987        let start = Instant::now();
2988
2989        let ValidationOutput {
2990            executed_block: executed,
2991            execution_timing_stats: timing_stats,
2992            raw_bal,
2993        } = execute(&mut self.payload_validator, input, ctx)?;
2994
2995        if let Some(raw_bal) = raw_bal {
2996            let num_hash = executed.recovered_block().num_hash();
2997            if let Err(err) = self.provider.bal_store().insert(num_hash, raw_bal) {
2998                warn!(
2999                    target: "engine::tree",
3000                    ?num_hash,
3001                    %err,
3002                    "Failed to store validated block access list"
3003                );
3004            }
3005        }
3006
3007        // Emit slow block event immediately after execution so it appears even when
3008        // persistence hasn't completed yet (e.g. blocks arriving faster than persistence).
3009        if let Some(stats) = timing_stats {
3010            if let Some(threshold) = self.config.slow_block_threshold() {
3011                let total_duration = stats.execution_duration + stats.state_hash_duration;
3012                if total_duration > threshold {
3013                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
3014                        stats: stats.clone(),
3015                        commit_duration: None,
3016                        total_duration,
3017                    }));
3018                }
3019            }
3020            self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
3021        }
3022
3023        // if the parent is the canonical head, we can insert the block as the pending block
3024        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
3025        {
3026            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
3027            self.canonical_in_memory_state.set_pending_block(executed.clone());
3028        }
3029
3030        self.state.tree_state.insert_executed(executed.clone());
3031        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
3032
3033        // emit insert event
3034        let elapsed = start.elapsed();
3035        let engine_event = if is_fork {
3036            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
3037        } else {
3038            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
3039        };
3040        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
3041
3042        self.metrics
3043            .engine
3044            .block_insert_total_duration
3045            .record(block_insert_start.elapsed().as_secs_f64());
3046        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
3047        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
3048    }
3049
3050    /// Handles an error that occurred while inserting a block.
3051    ///
3052    /// If this is a validation error this will mark the block as invalid.
3053    ///
3054    /// Returns the proper payload status response if the block is invalid.
3055    fn on_insert_block_error(
3056        &mut self,
3057        error: InsertBlockError<N::Block>,
3058    ) -> Result<PayloadStatus, InsertBlockFatalError> {
3059        let (block, error) = error.split();
3060
3061        // if invalid block, we check the validation error. Otherwise return the fatal
3062        // error.
3063        let validation_err = error.ensure_validation_error()?;
3064
3065        // If the error was due to an invalid payload, the payload is added to the
3066        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
3067        // returned.
3068        warn!(
3069            target: "engine::tree",
3070            invalid_hash=%block.hash(),
3071            invalid_number=block.number(),
3072            %validation_err,
3073            "Invalid block error on new payload",
3074        );
3075        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3076
3077        // keep track of the invalid header unless the consensus impl considers it transient
3078        let is_transient = match &validation_err {
3079            InsertBlockValidationError::Consensus(err) => self.consensus.is_transient_error(err),
3080            _ => false,
3081        };
3082        if is_transient {
3083            warn!(
3084                target: "engine::tree",
3085                invalid_hash=%block.hash(),
3086                invalid_number=block.number(),
3087                %validation_err,
3088                "Skipping invalid header cache insert for transient validation error",
3089            );
3090        } else {
3091            self.state.invalid_headers.insert(block.block_with_parent());
3092        }
3093        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock {
3094            block: Box::new(block),
3095            error: validation_err.to_string(),
3096        }));
3097
3098        Ok(PayloadStatus::new(
3099            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3100            latest_valid_hash,
3101        ))
3102    }
3103
3104    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
3105    fn on_new_payload_error(
3106        &mut self,
3107        error: NewPayloadError,
3108        payload_num_hash: NumHash,
3109        parent_hash: B256,
3110    ) -> ProviderResult<PayloadStatus> {
3111        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3112        // we need to convert the error to a payload status (response to the CL)
3113
3114        let latest_valid_hash =
3115            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3116                // Engine-API rules:
3117                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
3118                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
3119                None
3120            } else {
3121                self.latest_valid_hash_for_invalid_payload(parent_hash)?
3122            };
3123
3124        let status = PayloadStatusEnum::from(error);
3125        Ok(PayloadStatus::new(status, latest_valid_hash))
3126    }
3127
3128    /// Attempts to find the header for the given block hash if it is canonical.
3129    pub fn find_canonical_header(
3130        &self,
3131        hash: B256,
3132    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3133        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3134
3135        if canonical.is_none() {
3136            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3137        }
3138
3139        Ok(canonical)
3140    }
3141
3142    /// Updates the tracked finalized block if we have it.
3143    fn update_finalized_block(
3144        &self,
3145        finalized_block_hash: B256,
3146    ) -> Result<(), OnForkChoiceUpdated> {
3147        if finalized_block_hash.is_zero() {
3148            return Ok(())
3149        }
3150
3151        match self.find_canonical_header(finalized_block_hash) {
3152            Ok(None) => {
3153                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3154                // if the finalized block is not known, we can't update the finalized block
3155                return Err(OnForkChoiceUpdated::invalid_state())
3156            }
3157            Ok(Some(finalized)) => {
3158                if Some(finalized.num_hash()) !=
3159                    self.canonical_in_memory_state.get_finalized_num_hash()
3160                {
3161                    // we're also persisting the finalized block on disk so we can reload it on
3162                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3163                    let _ = self.persistence.save_finalized_block_number(finalized.number());
3164                    self.canonical_in_memory_state.set_finalized(finalized.clone());
3165                    // Update finalized block height metric
3166                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3167                }
3168            }
3169            Err(err) => {
3170                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3171            }
3172        }
3173
3174        Ok(())
3175    }
3176
3177    /// Updates the tracked safe block if we have it
3178    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3179        if safe_block_hash.is_zero() {
3180            return Ok(())
3181        }
3182
3183        match self.find_canonical_header(safe_block_hash) {
3184            Ok(None) => {
3185                debug!(target: "engine::tree", "Safe block not found in canonical chain");
3186                // if the safe block is not known, we can't update the safe block
3187                return Err(OnForkChoiceUpdated::invalid_state())
3188            }
3189            Ok(Some(safe)) => {
3190                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3191                    // we're also persisting the safe block on disk so we can reload it on
3192                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3193                    let _ = self.persistence.save_safe_block_number(safe.number());
3194                    self.canonical_in_memory_state.set_safe(safe.clone());
3195                    // Update safe block height metric
3196                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
3197                }
3198            }
3199            Err(err) => {
3200                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3201            }
3202        }
3203
3204        Ok(())
3205    }
3206
3207    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
3208    /// made canonical.
3209    ///
3210    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
3211    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
3212    ///
3213    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
3214    /// are consistent with the head block.
3215    fn ensure_consistent_forkchoice_state(
3216        &self,
3217        state: ForkchoiceState,
3218    ) -> Result<(), OnForkChoiceUpdated> {
3219        // Ensure that the finalized block, if not zero, is known and in the canonical chain
3220        // after the head block is canonicalized.
3221        //
3222        // This ensures that the finalized block is consistent with the head block, i.e. the
3223        // finalized block is an ancestor of the head block.
3224        self.update_finalized_block(state.finalized_block_hash)?;
3225
3226        // Also ensure that the safe block, if not zero, is known and in the canonical chain
3227        // after the head block is canonicalized.
3228        //
3229        // This ensures that the safe block is consistent with the head block, i.e. the safe
3230        // block is an ancestor of the head block.
3231        self.update_safe_block(state.safe_block_hash)
3232    }
3233
3234    /// Validates the payload attributes with respect to the header and fork choice state.
3235    ///
3236    /// This is called during `engine_forkchoiceUpdated` when the CL provides payload attributes,
3237    /// indicating it wants the EL to start building a new block.
3238    ///
3239    /// Runs [`PayloadValidator::validate_payload_attributes_against_header`](reth_engine_primitives::PayloadValidator::validate_payload_attributes_against_header) to ensure
3240    /// `payloadAttributes.timestamp > headBlock.timestamp` per the Engine API spec.
3241    ///
3242    /// If validation passes, sends the attributes to the payload builder to start a new
3243    /// payload job. If it fails, returns `INVALID_PAYLOAD_ATTRIBUTES` without rolling back
3244    /// the forkchoice update.
3245    ///
3246    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
3247    /// return an error if the payload attributes are invalid.
3248    fn process_payload_attributes(
3249        &self,
3250        attributes: T::PayloadAttributes,
3251        head: &N::BlockHeader,
3252        state: ForkchoiceState,
3253    ) -> OnForkChoiceUpdated {
3254        if let Err(err) =
3255            self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3256        {
3257            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3258            return OnForkChoiceUpdated::invalid_payload_attributes()
3259        }
3260
3261        // 8. Client software MUST begin a payload build process building on top of
3262        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
3263        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
3264        //    The build process is specified in the Payload building section.
3265
3266        let cache = if self.config.share_execution_cache_with_payload_builder() {
3267            self.payload_validator.cache_for(state.head_block_hash)
3268        } else {
3269            None
3270        };
3271
3272        let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3273            self.payload_validator.sparse_trie_handle_for(
3274                state.head_block_hash,
3275                head.state_root(),
3276                &self.state,
3277            )
3278        } else {
3279            None
3280        };
3281
3282        // send the payload to the builder and return the receiver for the pending payload
3283        // id, initiating payload job is handled asynchronously
3284        let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3285            parent_hash: state.head_block_hash,
3286            attributes,
3287            cache,
3288            trie_handle,
3289        });
3290
3291        // Client software MUST respond to this method call in the following way:
3292        // {
3293        //      payloadStatus: {
3294        //          status: VALID,
3295        //          latestValidHash: forkchoiceState.headBlockHash,
3296        //          validationError: null
3297        //      },
3298        //      payloadId: buildProcessId
3299        // }
3300        //
3301        // if the payload is deemed VALID and the build process has begun.
3302        OnForkChoiceUpdated::updated_with_pending_payload_id(
3303            PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3304            pending_payload_id,
3305        )
3306    }
3307
3308    /// Remove all blocks up to __and including__ the given block number.
3309    ///
3310    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
3311    /// those which have a fork point at or below the finalized hash.
3312    ///
3313    /// Canonical blocks below the upper bound will still be removed.
3314    pub(crate) fn remove_before(
3315        &mut self,
3316        upper_bound: BlockNumHash,
3317        finalized_hash: Option<B256>,
3318    ) -> ProviderResult<()> {
3319        // first fetch the finalized block number and then call the remove_before method on
3320        // tree_state
3321        let num = if let Some(hash) = finalized_hash {
3322            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3323        } else {
3324            None
3325        };
3326
3327        self.state.tree_state.remove_until(
3328            upper_bound,
3329            self.persistence_state.last_persisted_block.hash,
3330            num,
3331        );
3332        Ok(())
3333    }
3334
3335    /// Returns a builder for creating state providers for the given hash.
3336    ///
3337    /// This is an optimization for parallel execution contexts where we want to avoid
3338    /// creating state providers in the critical path.
3339    pub fn state_provider_builder(
3340        &self,
3341        hash: B256,
3342    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3343    where
3344        P: BlockReader + StateProviderFactory + StateReader + Clone,
3345    {
3346        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3347            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3348            // the block leads back to the canonical chain
3349            return Ok(Some(StateProviderBuilder::new(
3350                self.provider.clone(),
3351                historical,
3352                Some(blocks),
3353            )))
3354        }
3355
3356        // Check if the block is persisted
3357        if let Some(header) = self.provider.header(hash)? {
3358            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3359            // For persisted blocks, we create a builder that will fetch state directly from the
3360            // database
3361            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3362        }
3363
3364        debug!(target: "engine::tree", %hash, "no canonical state found for block");
3365        Ok(None)
3366    }
3367}
3368
3369/// Events received in the main engine loop.
3370#[derive(Debug)]
3371enum LoopEvent<T, N>
3372where
3373    N: NodePrimitives,
3374    T: PayloadTypes,
3375{
3376    /// An engine API message was received.
3377    EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3378    /// A persistence task completed.
3379    PersistenceComplete {
3380        /// The unified result of the persistence operation.
3381        result: PersistenceResult,
3382        /// When the persistence operation started.
3383        start_time: Instant,
3384    },
3385    /// A channel was disconnected.
3386    Disconnected,
3387}
3388
3389/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3390/// variant.
3391///
3392/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3393/// is valid or not.
3394#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3395pub enum BlockStatus {
3396    /// The block is valid and block extends canonical chain.
3397    Valid,
3398    /// The block may be valid and has an unknown missing ancestor.
3399    Disconnected {
3400        /// Current canonical head.
3401        head: BlockNumHash,
3402        /// The lowest ancestor block that is not connected to the canonical chain.
3403        missing_ancestor: BlockNumHash,
3404    },
3405}
3406
3407/// How a payload was inserted if it was valid.
3408///
3409/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3410/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3411#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3412pub enum InsertPayloadOk {
3413    /// The payload was valid, but we have already seen it.
3414    AlreadySeen(BlockStatus),
3415    /// The payload was valid and inserted into the tree.
3416    Inserted(BlockStatus),
3417}
3418
3419/// Target for block persistence.
3420#[derive(Debug, Clone, Copy)]
3421enum PersistTarget {
3422    /// Persist up to `canonical_head - memory_block_buffer_target`.
3423    Threshold,
3424    /// Persist all blocks up to and including the canonical head.
3425    Head,
3426}
3427
3428/// Result of waiting for caches to become available.
3429#[derive(Debug, Clone, Copy, Default)]
3430pub struct CacheWaitDurations {
3431    /// Time spent waiting for the execution cache lock.
3432    pub execution_cache: Duration,
3433    /// Time spent waiting for the sparse trie lock.
3434    pub sparse_trie: Duration,
3435}
3436
3437/// Trait for types that can wait for caches to become available.
3438///
3439/// This is used by `reth_newPayload` endpoint to ensure that payload processing
3440/// waits for any ongoing operations to complete before starting.
3441pub trait WaitForCaches {
3442    /// Waits for cache updates to complete.
3443    ///
3444    /// Returns the time spent waiting for each cache separately.
3445    fn wait_for_caches(&self) -> CacheWaitDurations;
3446}