Skip to main content

reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::{map::B256Map, B256};
11use alloy_rpc_types_engine::{
12    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError, InsertBlockValidationError};
15use reth_chain_state::{
16    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17    MemoryOverlayStateProvider, NewCanonicalChain, StateTrieOverlayManager,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29    FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32    BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33    DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34    StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35    StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority, WorkerPool};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48    oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64pub mod types;
65
66use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
67pub use block_buffer::BlockBuffer;
68pub use invalid_headers::InvalidHeaderCache;
69pub use metrics::EngineApiMetrics;
70pub use payload_processor::*;
71pub use payload_validator::{BasicEngineValidator, EngineValidator};
72pub use persistence_state::PersistenceState;
73pub use reth_engine_primitives::TreeConfig;
74pub use reth_execution_cache::{
75    CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider,
76    ExecutionCache, PayloadExecutionCache, SavedCache,
77};
78pub use types::{ValidationOutcome, ValidationOutput};
79
80pub mod state;
81
82/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
83///
84/// This is the default threshold, and represents the distance (gap) from the local head to a
85/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
86/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
87///
88/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
89/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
90/// backfill this gap.
91pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
92
93/// The minimum number of blocks to retain in the changeset cache after eviction.
94///
95/// This ensures that recent trie changesets are kept in memory for potential reorgs,
96/// even when the finalized block is not set (e.g., on L2s like Optimism).
97const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
98
99/// A builder for creating state providers that can be used across threads.
100#[derive(Clone, Debug)]
101pub struct StateProviderBuilder<N: NodePrimitives, P> {
102    /// The provider factory used to create providers.
103    provider_factory: P,
104    /// The historical block hash to fetch state from.
105    historical: B256,
106    /// The blocks that form the chain from historical to target and are in memory.
107    overlay: Option<Vec<ExecutedBlock<N>>>,
108}
109
110impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
111    /// Creates a new state provider from the provider factory, historical block hash and optional
112    /// overlaid blocks.
113    pub const fn new(
114        provider_factory: P,
115        historical: B256,
116        overlay: Option<Vec<ExecutedBlock<N>>>,
117    ) -> Self {
118        Self { provider_factory, historical, overlay }
119    }
120}
121
122impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
123where
124    P: BlockReader + StateProviderFactory + StateReader + Clone,
125{
126    /// Creates a new state provider from this builder.
127    pub fn build(&self) -> ProviderResult<StateProviderBox> {
128        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
129        if let Some(overlay) = self.overlay.clone() {
130            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
131        }
132        Ok(provider)
133    }
134}
135
136/// Tracks the state of the engine api internals.
137///
138/// This type is not shareable.
139#[derive(Debug)]
140pub struct EngineApiTreeState<N: NodePrimitives> {
141    /// Tracks the state of the blockchain tree.
142    tree_state: TreeState<N>,
143    /// Tracks the forkchoice state updates received by the CL.
144    forkchoice_state_tracker: ForkchoiceStateTracker,
145    /// Buffer of detached blocks.
146    buffer: BlockBuffer<N::Block>,
147    /// Tracks the header of invalid payloads that were rejected by the engine because they're
148    /// invalid.
149    invalid_headers: InvalidHeaderCache,
150}
151
152impl<N: NodePrimitives> EngineApiTreeState<N> {
153    fn new(
154        block_buffer_limit: u32,
155        max_invalid_header_cache_length: u32,
156        invalid_header_hit_eviction_threshold: u8,
157        canonical_block: BlockNumHash,
158        engine_kind: EngineApiKind,
159        state_trie_overlay_worker_pool: Arc<WorkerPool>,
160    ) -> Self {
161        Self {
162            invalid_headers: InvalidHeaderCache::new(
163                max_invalid_header_cache_length,
164                invalid_header_hit_eviction_threshold,
165            ),
166            buffer: BlockBuffer::new(block_buffer_limit),
167            tree_state: TreeState::new(
168                canonical_block,
169                engine_kind,
170                StateTrieOverlayManager::new(state_trie_overlay_worker_pool),
171            ),
172            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
173        }
174    }
175
176    /// Returns a reference to the tree state.
177    pub const fn tree_state(&self) -> &TreeState<N> {
178        &self.tree_state
179    }
180
181    /// Returns true if the block has been marked as invalid.
182    pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
183        self.invalid_headers.get(hash).is_some()
184    }
185}
186
187/// The outcome of a tree operation.
188#[derive(Debug)]
189pub struct TreeOutcome<T> {
190    /// The outcome of the operation.
191    pub outcome: T,
192    /// An optional event to tell the caller to do something.
193    pub event: Option<TreeEvent>,
194    /// Whether the block was already seen, meaning no real execution happened during this
195    /// `newPayload` call.
196    pub already_seen: bool,
197}
198
199impl<T> TreeOutcome<T> {
200    /// Create new tree outcome.
201    pub const fn new(outcome: T) -> Self {
202        Self { outcome, event: None, already_seen: false }
203    }
204
205    /// Set event on the outcome.
206    pub fn with_event(mut self, event: TreeEvent) -> Self {
207        self.event = Some(event);
208        self
209    }
210
211    /// Set the `already_seen` flag on the outcome.
212    pub const fn with_already_seen(mut self, value: bool) -> Self {
213        self.already_seen = value;
214        self
215    }
216}
217
218/// Result of trying to insert a new payload in [`EngineApiTreeHandler`].
219#[derive(Debug)]
220pub struct TryInsertPayloadResult {
221    /// - `Valid`: Payload successfully validated and inserted
222    /// - `Syncing`: Parent missing, payload buffered for later
223    /// - Error status: Payload is invalid
224    pub status: PayloadStatus,
225    /// Whether the block was already seen
226    pub already_seen: bool,
227}
228
229impl TryInsertPayloadResult {
230    /// Convert the result into a [`TreeOutcome`].
231    #[inline]
232    pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
233        TreeOutcome::new(self.status).with_already_seen(self.already_seen)
234    }
235}
236
237/// Events that are triggered by Tree Chain
238#[derive(Debug)]
239pub enum TreeEvent {
240    /// Tree action is needed.
241    TreeAction(TreeAction),
242    /// Backfill action is needed.
243    BackfillAction(BackfillAction),
244    /// Block download is needed.
245    Download(DownloadRequest),
246}
247
248impl TreeEvent {
249    /// Returns true if the event is a backfill action.
250    const fn is_backfill_action(&self) -> bool {
251        matches!(self, Self::BackfillAction(_))
252    }
253}
254
255/// The actions that can be performed on the tree.
256#[derive(Debug)]
257pub enum TreeAction {
258    /// Make target canonical.
259    MakeCanonical {
260        /// The sync target head hash
261        sync_target_head: B256,
262    },
263}
264
265/// The engine API tree handler implementation.
266///
267/// This type is responsible for processing engine API requests, maintaining the canonical state and
268/// emitting events.
269pub struct EngineApiTreeHandler<N, P, T, V, C>
270where
271    N: NodePrimitives,
272    T: PayloadTypes,
273    C: ConfigureEvm<Primitives = N> + 'static,
274{
275    provider: P,
276    consensus: Arc<dyn FullConsensus<N>>,
277    payload_validator: V,
278    /// Keeps track of internals such as executed and buffered blocks.
279    state: EngineApiTreeState<N>,
280    /// The half for sending messages to the engine.
281    ///
282    /// This is kept so that we can queue in messages to ourself that we can process later, for
283    /// example distributing workload across multiple messages that would otherwise take too long
284    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
285    /// them one by one so that we can handle incoming engine API in between and don't become
286    /// unresponsive. This can happen during live sync transition where we're trying to close the
287    /// gap (up to 3 epochs of blocks in the worst case).
288    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
289    /// Incoming engine API requests.
290    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
291    /// Outgoing events that are emitted to the handler.
292    outgoing: UnboundedSender<EngineApiEvent<N>>,
293    /// Channels to the persistence layer.
294    persistence: PersistenceHandle<N>,
295    /// Tracks the state changes of the persistence task.
296    persistence_state: PersistenceState,
297    /// Flag indicating the state of the node's backfill synchronization process.
298    backfill_sync_state: BackfillSyncState,
299    /// Keeps track of the state of the canonical chain that isn't persisted yet.
300    /// This is intended to be accessed from external sources, such as rpc.
301    canonical_in_memory_state: CanonicalInMemoryState<N>,
302    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
303    /// updates
304    payload_builder: PayloadBuilderHandle<T>,
305    /// Configuration settings.
306    config: TreeConfig,
307    /// Metrics for the engine api.
308    metrics: EngineApiMetrics,
309    /// The engine API variant of this handler
310    engine_kind: EngineApiKind,
311    /// The EVM configuration.
312    evm_config: C,
313    /// Changeset cache for in-memory trie changesets
314    changeset_cache: ChangesetCache,
315    /// Timing statistics for executed blocks, keyed by block hash.
316    /// Stored here (not in `ExecutedBlock`) to avoid leaking observability concerns into the block
317    /// type. Entries are removed when blocks are persisted or invalidated.
318    execution_timing_stats: B256Map<Box<ExecutionTimingStats>>,
319    /// Set when an FCU with payload attributes is received, cleared on the next FCU without.
320    /// Suppresses persistence cycles during payload building.
321    building_payload: bool,
322    /// Task runtime for spawning blocking work on named, reusable threads.
323    runtime: reth_tasks::Runtime,
324}
325
326impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
327    for EngineApiTreeHandler<N, P, T, V, C>
328where
329    N: NodePrimitives,
330    C: Debug + ConfigureEvm<Primitives = N>,
331{
332    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333        f.debug_struct("EngineApiTreeHandler")
334            .field("provider", &self.provider)
335            .field("consensus", &self.consensus)
336            .field("payload_validator", &self.payload_validator)
337            .field("state", &self.state)
338            .field("incoming_tx", &self.incoming_tx)
339            .field("persistence", &self.persistence)
340            .field("persistence_state", &self.persistence_state)
341            .field("backfill_sync_state", &self.backfill_sync_state)
342            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
343            .field("payload_builder", &self.payload_builder)
344            .field("config", &self.config)
345            .field("metrics", &self.metrics)
346            .field("engine_kind", &self.engine_kind)
347            .field("evm_config", &self.evm_config)
348            .field("changeset_cache", &self.changeset_cache)
349            .field("execution_timing_stats", &self.execution_timing_stats.len())
350            .field("runtime", &self.runtime)
351            .finish()
352    }
353}
354
355impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
356where
357    N: NodePrimitives,
358    P: DatabaseProviderFactory
359        + BlockReader<Block = N::Block, Header = N::BlockHeader>
360        + StateProviderFactory
361        + StateReader<Receipt = N::Receipt>
362        + HashedPostStateProvider
363        + Clone
364        + 'static,
365    P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
366        + StageCheckpointReader
367        + ChangeSetReader
368        + StorageChangeSetReader
369        + StorageSettingsCache,
370    C: ConfigureEvm<Primitives = N> + 'static,
371    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
372    V: EngineValidator<T> + WaitForCaches,
373{
374    /// Creates a new [`EngineApiTreeHandler`].
375    #[expect(clippy::too_many_arguments)]
376    pub fn new(
377        provider: P,
378        consensus: Arc<dyn FullConsensus<N>>,
379        payload_validator: V,
380        outgoing: UnboundedSender<EngineApiEvent<N>>,
381        state: EngineApiTreeState<N>,
382        canonical_in_memory_state: CanonicalInMemoryState<N>,
383        persistence: PersistenceHandle<N>,
384        persistence_state: PersistenceState,
385        payload_builder: PayloadBuilderHandle<T>,
386        config: TreeConfig,
387        engine_kind: EngineApiKind,
388        evm_config: C,
389        changeset_cache: ChangesetCache,
390        runtime: reth_tasks::Runtime,
391    ) -> Self {
392        let (incoming_tx, incoming) = crossbeam_channel::unbounded();
393
394        Self {
395            provider,
396            consensus,
397            payload_validator,
398            incoming,
399            outgoing,
400            persistence,
401            persistence_state,
402            backfill_sync_state: BackfillSyncState::Idle,
403            state,
404            canonical_in_memory_state,
405            payload_builder,
406            config,
407            metrics: Default::default(),
408            incoming_tx,
409            engine_kind,
410            evm_config,
411            changeset_cache,
412            execution_timing_stats: B256Map::default(),
413            building_payload: false,
414            runtime,
415        }
416    }
417
418    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
419    /// own thread.
420    ///
421    /// Returns the sender through which incoming requests can be sent to the task and the receiver
422    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
423    #[expect(clippy::complexity)]
424    pub fn spawn_new(
425        provider: P,
426        consensus: Arc<dyn FullConsensus<N>>,
427        payload_validator: V,
428        persistence: PersistenceHandle<N>,
429        payload_builder: PayloadBuilderHandle<T>,
430        canonical_in_memory_state: CanonicalInMemoryState<N>,
431        config: TreeConfig,
432        kind: EngineApiKind,
433        evm_config: C,
434        changeset_cache: ChangesetCache,
435        runtime: reth_tasks::Runtime,
436    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
437    {
438        let best_block_number = provider.best_block_number().unwrap_or(0);
439        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
440
441        let persistence_state = PersistenceState {
442            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
443            rx: None,
444        };
445
446        let (tx, outgoing) = unbounded_channel();
447        let state = EngineApiTreeState::new(
448            config.block_buffer_limit(),
449            config.max_invalid_header_cache_length(),
450            config.invalid_header_hit_eviction_threshold(),
451            header.num_hash(),
452            kind,
453            runtime.state_trie_overlay_worker_pool(),
454        );
455
456        let task = Self::new(
457            provider,
458            consensus,
459            payload_validator,
460            tx,
461            state,
462            canonical_in_memory_state,
463            persistence,
464            persistence_state,
465            payload_builder,
466            config,
467            kind,
468            evm_config,
469            changeset_cache,
470            runtime,
471        );
472        let incoming = task.incoming_tx.clone();
473        spawn_os_thread("engine", || {
474            increase_thread_priority();
475            task.run()
476        });
477        (incoming, outgoing)
478    }
479
480    /// Returns a [`TreeOutcome`] indicating the forkchoice head is valid and canonical.
481    fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
482        TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
483            PayloadStatusEnum::Valid,
484            Some(state.head_block_hash),
485        )))
486    }
487
488    /// Returns a new [`Sender`] to send messages to this type.
489    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
490        self.incoming_tx.clone()
491    }
492
493    /// How many blocks the canonical tip is ahead of the last persisted block. A large gap means
494    /// persistence is falling behind execution.
495    const fn persistence_gap(&self) -> u64 {
496        self.state
497            .tree_state
498            .canonical_block_number()
499            .saturating_sub(self.persistence_state.last_persisted_block.number)
500    }
501
502    /// Returns `true` when the main loop should stop draining the tree input channel.
503    ///
504    /// This is the case when persistence is already running and the gap between the canonical tip
505    /// and the last persisted block has reached the configured threshold.
506    const fn should_backpressure(&self) -> bool {
507        self.persistence_state.in_progress() &&
508            self.persistence_gap() >= self.config.persistence_backpressure_threshold()
509    }
510
511    /// Run the engine API handler.
512    ///
513    /// This will block the current thread and process incoming messages.
514    pub fn run(mut self) {
515        loop {
516            // Each iteration has three phases:
517            //
518            // 1. Non-blocking poll for persistence completion. If the background flush already
519            //    landed, absorb the result now so the gap calculation below is fresh.
520            // 2. Decide how to wait for the next event. When the canonical-to-persisted gap exceeds
521            //    the backpressure threshold we only block on the persistence receiver, leaving new
522            //    engine requests sitting in the unbounded upstream channel.
523            // 3. Handle the event (engine message or persistence completion) and kick off a new
524            //    persistence cycle if the threshold is met again.
525            //
526            // The net effect: when the persistence gap exceeds the threshold, we stop
527            // processing incoming messages and let them queue in the channel. This is only a
528            // soft form of backpressure: it delays replies and, more importantly, prevents
529            // executing further blocks that would pile up in the persistence queue - where each
530            // block carries heavier state (eg. trie updates) than the raw payload sitting in the
531            // engine channel.
532            //
533            // Standard Ethereum CLs won't truly back off - the engine API has no
534            // backpressure semantics, and CLs typically timeout after ≈8s and resend - so
535            // this cannot prevent the incoming channel from growing under sustained load.
536            // But it shifts the bottleneck to the lighter-weight incoming queue rather than
537            // the costlier persistence pipeline. Other clients that respect reply latency
538            // can treat the delayed responses as a signal to chill out.
539            match self.try_poll_persistence() {
540                Ok(true) => {
541                    if let Err(err) = self.advance_persistence() {
542                        error!(target: "engine::tree", %err, "Advancing persistence failed");
543                        return
544                    }
545                    continue;
546                }
547                Ok(false) => {}
548                Err(err) => {
549                    error!(target: "engine::tree", %err, "Polling persistence failed");
550                    return
551                }
552            }
553
554            let event = if self.should_backpressure() {
555                self.metrics.engine.backpressure_active.set(1.0);
556                let stall_start = Instant::now();
557                let event = self.wait_for_persistence_event();
558                self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
559                event
560            } else {
561                self.metrics.engine.backpressure_active.set(0.0);
562                self.wait_for_event()
563            };
564
565            match event {
566                LoopEvent::EngineMessage(msg) => {
567                    debug!(target: "engine::tree", %msg, "received new engine message");
568                    match self.on_engine_message(msg) {
569                        Ok(ops::ControlFlow::Break(())) => return,
570                        Ok(ops::ControlFlow::Continue(())) => {}
571                        Err(fatal) => {
572                            error!(target: "engine::tree", %fatal, "insert block fatal error");
573                            return
574                        }
575                    }
576                }
577                LoopEvent::PersistenceComplete { result, start_time } => {
578                    if let Err(err) = self.on_persistence_complete(result, start_time) {
579                        error!(target: "engine::tree", %err, "Persistence complete handling failed");
580                        return
581                    }
582                }
583                LoopEvent::Disconnected => {
584                    error!(target: "engine::tree", "Channel disconnected");
585                    return
586                }
587            }
588
589            // Always check if we need to trigger new persistence after any event:
590            // - After engine messages: new blocks may have been inserted that exceed the
591            //   persistence threshold
592            // - After persistence completion: we can now persist more blocks if needed
593            if let Err(err) = self.advance_persistence() {
594                error!(target: "engine::tree", %err, "Advancing persistence failed");
595                return
596            }
597        }
598    }
599
600    /// Blocks until the in-flight persistence task completes, used when we are under
601    /// backpressure.
602    ///
603    /// Unlike `wait_for_event`, this deliberately does not read from the tree input channel. Any
604    /// requests sent to the tree remain queued upstream until persistence catches up.
605    fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
606        let maybe_persistence = self.persistence_state.rx.take();
607
608        if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
609            match persistence_rx.recv() {
610                Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
611                Err(_) => LoopEvent::Disconnected,
612            }
613        } else {
614            self.wait_for_event()
615        }
616    }
617
618    /// Blocks until the next event is ready: either an incoming engine message or a persistence
619    /// completion (if one is in progress).
620    ///
621    /// Uses biased selection to prioritize persistence completion to update in-memory state and
622    /// unblock further writes.
623    fn wait_for_event(&mut self) -> LoopEvent<T, N> {
624        // Take ownership of persistence rx if present
625        let maybe_persistence = self.persistence_state.rx.take();
626
627        if let Some((persistence_rx, start_time, action)) = maybe_persistence {
628            // Biased select prioritizes persistence completion to update in memory state and
629            // unblock further writes
630            crossbeam_channel::select_biased! {
631                recv(persistence_rx) -> result => {
632                    // Don't put it back - consumed (oneshot-like behavior)
633                    match result {
634                        Ok(result) => LoopEvent::PersistenceComplete {
635                            result,
636                            start_time,
637                        },
638                        Err(_) => LoopEvent::Disconnected,
639                    }
640                },
641                recv(self.incoming) -> msg => {
642                    // Put the persistence rx back - we didn't consume it
643                    self.persistence_state.rx = Some((persistence_rx, start_time, action));
644                    match msg {
645                        Ok(m) => LoopEvent::EngineMessage(m),
646                        Err(_) => LoopEvent::Disconnected,
647                    }
648                },
649            }
650        } else {
651            // No persistence in progress - just wait on incoming
652            match self.incoming.recv() {
653                Ok(m) => LoopEvent::EngineMessage(m),
654                Err(_) => LoopEvent::Disconnected,
655            }
656        }
657    }
658
659    /// Invoked when previously requested blocks were downloaded.
660    ///
661    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
662    /// will execute the first batch and send the remaining blocks back through the channel so that
663    /// block request processing isn't blocked for a long time.
664    fn on_downloaded(
665        &mut self,
666        mut blocks: Vec<SealedBlock<N::Block>>,
667    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
668        if blocks.is_empty() {
669            // nothing to execute
670            return Ok(None)
671        }
672
673        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
674        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
675        for block in blocks.drain(..batch) {
676            if let Some(event) = self.on_downloaded_block(block)? {
677                let needs_backfill = event.is_backfill_action();
678                self.on_tree_event(event)?;
679                if needs_backfill {
680                    // can exit early if backfill is needed
681                    return Ok(None)
682                }
683            }
684        }
685
686        // if we still have blocks to execute, send them as a followup request
687        if !blocks.is_empty() {
688            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
689        }
690
691        Ok(None)
692    }
693
694    /// When the Consensus layer receives a new block via the consensus gossip protocol,
695    /// the transactions in the block are sent to the execution layer in the form of a
696    /// [`PayloadTypes::ExecutionData`], for example
697    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
698    /// Execution layer executes the transactions and validates the state in the block header,
699    /// then passes validation data back to Consensus layer, that adds the block to the head of
700    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
701    /// network in the form of a "Beacon block".
702    ///
703    /// These responses should adhere to the [Engine API Spec for
704    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
705    ///
706    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
707    /// returns an error if an internal error occurred.
708    #[instrument(
709        level = "debug",
710        target = "engine::tree",
711        skip_all,
712        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
713    )]
714    fn on_new_payload(
715        &mut self,
716        payload: T::ExecutionData,
717    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
718        trace!(target: "engine::tree", "invoked new payload");
719
720        // start timing for the new payload process
721        let start = Instant::now();
722
723        // Ensures that the given payload does not violate any consensus rules that concern the
724        // block's layout, like:
725        //    - missing or invalid base fee
726        //    - invalid extra data
727        //    - invalid transactions
728        //    - incorrect hash
729        //    - the versioned hashes passed with the payload do not exactly match transaction
730        //      versioned hashes
731        //    - the block does not contain blob transactions if it is pre-cancun
732        //
733        // This validates the following engine API rule:
734        //
735        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
736        //    validation by taking the following steps:
737        //
738        //   1. Obtain the actual array by concatenating blob versioned hashes lists
739        //      (`tx.blob_versioned_hashes`) of each [blob
740        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
741        //      in the payload, respecting the order of inclusion. If the payload has no blob
742        //      transactions the expected array **MUST** be `[]`.
743        //
744        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
745        //      null}` if the expected and the actual arrays don't match.
746        //
747        // This validation **MUST** be instantly run in all cases even during active sync process.
748
749        let num_hash = payload.num_hash();
750        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
751        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
752
753        let block_hash = num_hash.hash;
754
755        // Check for invalid ancestors
756        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
757            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
758            return Ok(TreeOutcome::new(status));
759        }
760
761        // record pre-execution phase duration
762        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
763
764        let mut outcome = if self.backfill_sync_state.is_idle() {
765            self.try_insert_payload(payload)?.into_outcome()
766        } else {
767            TreeOutcome::new(self.try_buffer_payload(payload)?)
768        };
769
770        // if the block is valid and it is the current sync target head, make it canonical
771        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
772            // Only create the canonical event if this block isn't already the canonical head
773            if self.state.tree_state.canonical_block_hash() != block_hash {
774                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
775                    sync_target_head: block_hash,
776                }));
777            }
778        }
779
780        // record total newPayload duration
781        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
782
783        Ok(outcome)
784    }
785
786    /// Processes a payload during normal sync operation.
787    #[instrument(level = "debug", target = "engine::tree", skip_all)]
788    fn try_insert_payload(
789        &mut self,
790        payload: T::ExecutionData,
791    ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
792        let block_hash = payload.block_hash();
793        let num_hash = payload.num_hash();
794        let parent_hash = payload.parent_hash();
795        let mut latest_valid_hash = None;
796
797        match self.insert_payload(payload) {
798            Ok(status) => {
799                let (status, already_seen) = match status {
800                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
801                        latest_valid_hash = Some(block_hash);
802                        self.try_connect_buffered_blocks(num_hash)?;
803                        (PayloadStatusEnum::Valid, false)
804                    }
805                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
806                        latest_valid_hash = Some(block_hash);
807                        (PayloadStatusEnum::Valid, true)
808                    }
809                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
810                        (PayloadStatusEnum::Syncing, false)
811                    }
812                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
813                        // not known to be invalid, but we don't know anything else
814                        (PayloadStatusEnum::Syncing, true)
815                    }
816                };
817
818                Ok(TryInsertPayloadResult {
819                    status: PayloadStatus::new(status, latest_valid_hash),
820                    already_seen,
821                })
822            }
823            Err(error) => {
824                let status = match error {
825                    InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
826                    InsertPayloadError::Payload(error) => {
827                        self.on_new_payload_error(error, num_hash, parent_hash)?
828                    }
829                };
830
831                Ok(TryInsertPayloadResult { status, already_seen: false })
832            }
833        }
834    }
835
836    /// Stores a payload for later processing during backfill sync.
837    ///
838    /// During backfill, the node lacks the state needed to validate payloads,
839    /// so they are buffered (stored in memory) until their parent blocks are synced.
840    ///
841    /// Returns:
842    /// - `Syncing`: Payload successfully buffered
843    /// - Error status: Payload is malformed or invalid
844    fn try_buffer_payload(
845        &mut self,
846        payload: T::ExecutionData,
847    ) -> Result<PayloadStatus, InsertBlockFatalError> {
848        let parent_hash = payload.parent_hash();
849        let num_hash = payload.num_hash();
850
851        match self.payload_validator.convert_payload_to_block(payload) {
852            // if the block is well-formed, buffer it for later
853            Ok(block) => {
854                if let Err(error) = self.buffer_block(block) {
855                    Ok(self.on_insert_block_error(error)?)
856                } else {
857                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
858                }
859            }
860            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
861        }
862    }
863
864    /// Returns the new chain for the given head.
865    ///
866    /// This also handles reorgs.
867    ///
868    /// Note: This does not update the tracked state and instead returns the new chain based on the
869    /// given head.
870    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
871        // get the executed new head block
872        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
873            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
874            self.metrics.engine.executed_new_block_cache_miss.increment(1);
875            return Ok(None)
876        };
877
878        let new_head_number = new_head_block.recovered_block().number();
879        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
880
881        let mut new_chain = vec![new_head_block.clone()];
882        let mut current_hash = new_head_block.recovered_block().parent_hash();
883        let mut current_number = new_head_number - 1;
884
885        // Walk back the new chain until we reach a block we know about
886        //
887        // This is only done for in-memory blocks, because we should not have persisted any blocks
888        // that are _above_ the current canonical head.
889        while current_number > current_canonical_number {
890            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
891            {
892                current_hash = block.recovered_block().parent_hash();
893                current_number -= 1;
894                new_chain.push(block);
895            } else {
896                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
897                // This should never happen as we're walking back a chain that should connect to
898                // the canonical chain
899                return Ok(None)
900            }
901        }
902
903        // If we have reached the current canonical head by walking back from the target, then we
904        // know this represents an extension of the canonical chain.
905        if current_hash == self.state.tree_state.current_canonical_head.hash {
906            new_chain.reverse();
907
908            // Simple extension of the current chain
909            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
910        }
911
912        // We have a reorg. Walk back both chains to find the fork point.
913        let mut old_chain = Vec::new();
914        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
915
916        // If the canonical chain is ahead of the new chain,
917        // gather all blocks until new head number.
918        while current_canonical_number > current_number {
919            let block = self.canonical_block_by_hash(old_hash)?;
920            old_hash = block.recovered_block().parent_hash();
921            old_chain.push(block);
922            current_canonical_number -= 1;
923        }
924
925        // Both new and old chain pointers are now at the same height.
926        debug_assert_eq!(current_number, current_canonical_number);
927
928        // Walk both chains from specified hashes at same height until
929        // a common ancestor (fork block) is reached.
930        while old_hash != current_hash {
931            let block = self.canonical_block_by_hash(old_hash)?;
932            old_hash = block.recovered_block().parent_hash();
933            old_chain.push(block);
934
935            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
936            {
937                current_hash = block.recovered_block().parent_hash();
938                new_chain.push(block);
939            } else {
940                // This shouldn't happen as we've already walked this path
941                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
942                return Ok(None)
943            }
944        }
945        new_chain.reverse();
946        old_chain.reverse();
947
948        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
949    }
950
951    /// Updates the latest block state to the specified canonical ancestor.
952    ///
953    /// This method ensures that the latest block tracks the given canonical header by resetting
954    ///
955    /// # Arguments
956    /// * `canonical_header` - The canonical header to set as the new head
957    ///
958    /// # Returns
959    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
960    ///
961    /// Caution: This unwinds the canonical chain
962    fn update_latest_block_to_canonical_ancestor(
963        &mut self,
964        canonical_header: &SealedHeader<N::BlockHeader>,
965    ) -> ProviderResult<()> {
966        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
967        let current_head_number = self.state.tree_state.canonical_block_number();
968        let new_head_number = canonical_header.number();
969        let new_head_hash = canonical_header.hash();
970
971        // Update tree state with the new canonical head
972        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
973
974        // Handle the state update based on whether this is an unwind scenario
975        if new_head_number < current_head_number {
976            debug!(
977                target: "engine::tree",
978                current_head = current_head_number,
979                new_head = new_head_number,
980                new_head_hash = ?new_head_hash,
981                "FCU unwind detected: reverting to canonical ancestor"
982            );
983
984            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
985        } else {
986            debug!(
987                target: "engine::tree",
988                previous_head = current_head_number,
989                new_head = new_head_number,
990                new_head_hash = ?new_head_hash,
991                "Advancing latest block to canonical ancestor"
992            );
993            self.handle_chain_advance_or_same_height(canonical_header)
994        }
995    }
996
997    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
998    /// to the canonical header
999    fn handle_canonical_chain_unwind(
1000        &self,
1001        current_head_number: u64,
1002        canonical_header: &SealedHeader<N::BlockHeader>,
1003    ) -> ProviderResult<()> {
1004        let new_head_number = canonical_header.number();
1005        debug!(
1006            target: "engine::tree",
1007            from = current_head_number,
1008            to = new_head_number,
1009            "Handling unwind: collecting blocks to remove from in-memory state"
1010        );
1011
1012        // Collect blocks that need to be removed from memory
1013        let old_blocks =
1014            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
1015
1016        // Load and apply the canonical ancestor block
1017        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1018    }
1019
1020    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
1021    fn collect_blocks_for_canonical_unwind(
1022        &self,
1023        new_head_number: u64,
1024        current_head_number: u64,
1025    ) -> Vec<ExecutedBlock<N>> {
1026        let mut old_blocks =
1027            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1028
1029        for block_num in (new_head_number + 1)..=current_head_number {
1030            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1031                let executed_block = block_state.block_ref().clone();
1032                old_blocks.push(executed_block);
1033                debug!(
1034                    target: "engine::tree",
1035                    block_number = block_num,
1036                    "Collected block for removal from in-memory state"
1037                );
1038            }
1039        }
1040
1041        if old_blocks.is_empty() {
1042            debug!(
1043                target: "engine::tree",
1044                "No blocks found in memory to remove, will clear and reset state"
1045            );
1046        }
1047
1048        old_blocks
1049    }
1050
1051    /// Applies the canonical ancestor block via a reorg operation.
1052    fn apply_canonical_ancestor_via_reorg(
1053        &self,
1054        canonical_header: &SealedHeader<N::BlockHeader>,
1055        old_blocks: Vec<ExecutedBlock<N>>,
1056    ) -> ProviderResult<()> {
1057        let new_head_hash = canonical_header.hash();
1058        let new_head_number = canonical_header.number();
1059
1060        // Load the canonical ancestor's block
1061        let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1062        // Perform the reorg to properly handle the unwind
1063        self.canonical_in_memory_state
1064            .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1065
1066        // CRITICAL: Update the canonical head after the reorg
1067        // This ensures get_canonical_head() returns the correct block
1068        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1069
1070        debug!(
1071            target: "engine::tree",
1072            block_number = new_head_number,
1073            block_hash = ?new_head_hash,
1074            "Successfully loaded canonical ancestor into memory via reorg"
1075        );
1076
1077        Ok(())
1078    }
1079
1080    /// Handles chain advance or same height scenarios.
1081    fn handle_chain_advance_or_same_height(
1082        &self,
1083        canonical_header: &SealedHeader<N::BlockHeader>,
1084    ) -> ProviderResult<()> {
1085        // Load the block into memory if it's not already present
1086        self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1087
1088        // Update the canonical head header
1089        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1090
1091        Ok(())
1092    }
1093
1094    /// Ensures a block is loaded into memory if not already present.
1095    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1096        // Check if block is already in memory
1097        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1098            return Ok(());
1099        }
1100
1101        // Load the block from storage
1102        let executed_block = self.canonical_block_by_hash(block_hash)?;
1103        self.canonical_in_memory_state
1104            .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1105
1106        debug!(
1107            target: "engine::tree",
1108            block_number,
1109            block_hash = ?block_hash,
1110            "Added canonical block to in-memory state"
1111        );
1112
1113        Ok(())
1114    }
1115
1116    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1117    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1118    /// chain.
1119    ///
1120    /// These responses should adhere to the [Engine API Spec for
1121    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1122    ///
1123    /// Returns an error if an internal error occurred like a database error.
1124    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1125    fn on_forkchoice_updated(
1126        &mut self,
1127        state: ForkchoiceState,
1128        attrs: Option<T::PayloadAttributes>,
1129    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1130        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1131
1132        self.building_payload = attrs.is_some() && self.config.suppress_persistence_during_build();
1133
1134        // Record metrics
1135        self.record_forkchoice_metrics();
1136
1137        // Pre-validation of forkchoice state
1138        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1139            return Ok(TreeOutcome::new(early_result));
1140        }
1141
1142        // Return early if we are on the correct fork
1143        if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1144            return Ok(result);
1145        }
1146
1147        // Attempt to apply a chain update when the head differs from our canonical chain.
1148        // This handles reorgs and chain extensions by making the specified head canonical.
1149        if let Some(result) = self.apply_chain_update(state, &attrs)? {
1150            return Ok(result);
1151        }
1152
1153        // Fallback that ensures to catch up to the network's state.
1154        self.handle_missing_block(state)
1155    }
1156
1157    /// Records metrics for forkchoice updated calls
1158    fn record_forkchoice_metrics(&self) {
1159        self.canonical_in_memory_state.on_forkchoice_update_received();
1160    }
1161
1162    /// Pre-validates the forkchoice state and returns early if validation fails.
1163    ///
1164    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1165    /// returned. Returns `None` if validation passes and processing should continue.
1166    fn validate_forkchoice_state(
1167        &mut self,
1168        state: ForkchoiceState,
1169    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1170        if state.head_block_hash.is_zero() {
1171            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1172        }
1173
1174        // Check if the new head hash is connected to any ancestor that we previously marked as
1175        // invalid
1176        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1177        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1178            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1179        }
1180
1181        if !self.backfill_sync_state.is_idle() {
1182            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1183            // exclusive access to the database
1184            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1185            return Ok(Some(OnForkChoiceUpdated::syncing()));
1186        }
1187
1188        Ok(None)
1189    }
1190
1191    /// Handles the case where the forkchoice head is already canonical.
1192    ///
1193    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1194    /// processing is complete. Returns `None` if the head is not canonical and processing
1195    /// should continue.
1196    fn handle_canonical_head(
1197        &self,
1198        state: ForkchoiceState,
1199        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1200    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1201        // Process the forkchoice update by trying to make the head block canonical
1202        //
1203        // We can only process this forkchoice update if:
1204        // - we have the `head` block
1205        // - the head block is part of a chain that is connected to the canonical chain. This
1206        //   includes reorgs.
1207        //
1208        // Performing a FCU involves:
1209        // - marking the FCU's head block as canonical
1210        // - updating in memory state to reflect the new canonical chain
1211        // - updating canonical state trackers
1212        // - emitting a canonicalization event for the new chain (including reorg)
1213        // - if we have payload attributes, delegate them to the payload service
1214
1215        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1216            return Ok(None);
1217        }
1218
1219        trace!(target: "engine::tree", "fcu head hash is already canonical");
1220
1221        // Update the safe and finalized blocks and ensure their values are valid
1222        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1223            // safe or finalized hashes are invalid
1224            return Ok(Some(TreeOutcome::new(outcome)));
1225        }
1226
1227        // Process payload attributes if the head is already canonical
1228        if let Some(attr) = attrs {
1229            let tip = self
1230                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1231                .ok_or_else(|| {
1232                    // If we can't find the canonical block, then something is wrong and we need
1233                    // to return an error
1234                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1235                })?;
1236            // Clone only when we actually need to process the attributes
1237            let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1238            return Ok(Some(TreeOutcome::new(updated)));
1239        }
1240
1241        // The head block is already canonical
1242        Ok(Some(Self::valid_outcome(state)))
1243    }
1244
1245    /// Applies chain update for the new head block and processes payload attributes.
1246    ///
1247    /// This method handles the case where the forkchoice head differs from our current canonical
1248    /// head. It attempts to make the specified head block canonical by:
1249    /// - Checking if the head is already part of the canonical chain
1250    /// - Applying chain reorganizations (reorgs) if necessary
1251    /// - Processing payload attributes if provided
1252    /// - Returning the appropriate forkchoice update response
1253    ///
1254    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1255    /// Returns `None` if no chain update was needed or possible.
1256    fn apply_chain_update(
1257        &mut self,
1258        state: ForkchoiceState,
1259        attrs: &Option<T::PayloadAttributes>,
1260    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1261        // Check if the head is already part of the canonical chain
1262        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1263            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1264
1265            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1266            // own chain at will, so we need to always trigger a new payload job if requested.
1267            if self.engine_kind.is_opstack() ||
1268                self.config.always_process_payload_attributes_on_canonical_head()
1269            {
1270                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1271                // part of the canonical chain. We need to update the latest block state to reflect
1272                // the canonical ancestor. This ensures that state providers and the transaction
1273                // pool operate with the correct chain state after forkchoice update processing, and
1274                // new payloads built on the reorg'd head will be added to the tree immediately.
1275                if self.config.unwind_canonical_header() {
1276                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1277                }
1278
1279                if let Some(attr) = attrs {
1280                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1281                    // Clone only when we actually need to process the attributes
1282                    let updated =
1283                        self.process_payload_attributes(attr.clone(), &canonical_header, state);
1284                    return Ok(Some(TreeOutcome::new(updated)));
1285                }
1286            }
1287
1288            // According to the Engine API specification, client software MAY skip an update of the
1289            // forkchoice state and MUST NOT begin a payload build process if
1290            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1291            // of canonical chain, i.e. the ancestor passed payload validation process
1292            // and deemed `VALID`. In the case of such an event, client software MUST
1293            // return `{payloadStatus: {status: VALID, latestValidHash:
1294            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1295
1296            // The head block is already canonical and we're not processing payload attributes,
1297            // so we're not triggering a payload job and can return right away
1298            return Ok(Some(Self::valid_outcome(state)));
1299        }
1300
1301        // Ensure we can apply a new chain update for the head block
1302        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1303            let tip = chain_update.tip().clone_sealed_header();
1304            self.on_canonical_chain_update(chain_update);
1305
1306            // Update the safe and finalized blocks and ensure their values are valid
1307            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1308                // safe or finalized hashes are invalid
1309                return Ok(Some(TreeOutcome::new(outcome)));
1310            }
1311
1312            if let Some(attr) = attrs {
1313                // Clone only when we actually need to process the attributes
1314                let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1315                return Ok(Some(TreeOutcome::new(updated)));
1316            }
1317
1318            return Ok(Some(Self::valid_outcome(state)));
1319        }
1320
1321        Ok(None)
1322    }
1323
1324    /// Handles the case where the head block is missing and needs to be downloaded.
1325    ///
1326    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1327    /// Returns a `TreeOutcome` with syncing status and download event.
1328    fn handle_missing_block(
1329        &self,
1330        state: ForkchoiceState,
1331    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1332        // We don't have the block to perform the forkchoice update
1333        // We assume the FCU is valid and at least the head is missing,
1334        // so we need to start syncing to it
1335        //
1336        // find the appropriate target to sync to, if we don't have the safe block hash then we
1337        // start syncing to the safe block via backfill first
1338        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1339        // check that safe block is valid and missing
1340        !state.safe_block_hash.is_zero() &&
1341        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1342        {
1343            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1344            state.safe_block_hash
1345        } else {
1346            state.head_block_hash
1347        };
1348
1349        let target = self.lowest_buffered_ancestor_or(target);
1350        trace!(target: "engine::tree", %target, "downloading missing block");
1351
1352        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1353            PayloadStatusEnum::Syncing,
1354        )))
1355        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1356    }
1357
1358    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1359    /// the current persistence action while we're removing blocks.
1360    fn remove_blocks(&mut self, new_tip_num: u64) {
1361        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1362        if new_tip_num < self.persistence_state.last_persisted_block.number {
1363            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1364            let (tx, rx) = crossbeam_channel::bounded(1);
1365            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1366            self.persistence_state.start_remove(new_tip_num, rx);
1367        }
1368    }
1369
1370    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1371    /// the current persistence action while we're saving blocks.
1372    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1373        if blocks_to_persist.is_empty() {
1374            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1375            return
1376        }
1377
1378        // NOTE: checked non-empty above
1379        let highest_num_hash = blocks_to_persist
1380            .iter()
1381            .max_by_key(|block| block.recovered_block().number())
1382            .map(|b| b.recovered_block().num_hash())
1383            .expect("Checked non-empty persisting blocks");
1384
1385        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1386        let (tx, rx) = crossbeam_channel::bounded(1);
1387        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1388
1389        self.persistence_state.start_save(highest_num_hash, rx);
1390    }
1391
1392    /// Triggers new persistence actions if no persistence task is currently in progress.
1393    ///
1394    /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
1395    /// Persistence completion is handled separately via the `wait_for_event` method.
1396    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1397        if !self.persistence_state.in_progress() {
1398            if let Some(new_tip_num) = self.find_disk_reorg()? {
1399                self.remove_blocks(new_tip_num)
1400            } else if self.should_persist() {
1401                let blocks_to_persist =
1402                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1403                self.persist_blocks(blocks_to_persist);
1404            }
1405        }
1406
1407        Ok(())
1408    }
1409
1410    /// Finishes termination by persisting all remaining blocks and signaling completion.
1411    ///
1412    /// This blocks until all persistence is complete. Always signals completion,
1413    /// even if an error occurs.
1414    fn finish_termination(
1415        &mut self,
1416        pending_termination: oneshot::Sender<()>,
1417    ) -> Result<(), AdvancePersistenceError> {
1418        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1419        let result = self.persist_until_complete();
1420        let _ = pending_termination.send(());
1421        result
1422    }
1423
1424    /// Persists all remaining blocks until none are left.
1425    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1426        loop {
1427            // Wait for any in-progress persistence to complete (blocking)
1428            if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1429                debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1430                let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1431                self.on_persistence_complete(result, start_time)?;
1432            }
1433
1434            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1435
1436            if blocks_to_persist.is_empty() {
1437                debug!(target: "engine::tree", "persistence complete, signaling termination");
1438                return Ok(())
1439            }
1440
1441            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1442            self.persist_blocks(blocks_to_persist);
1443        }
1444    }
1445
1446    /// Tries to poll for a completed persistence task (non-blocking).
1447    ///
1448    /// Returns `true` if a persistence task was completed, `false` otherwise.
1449    fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1450        let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1451            return Ok(false);
1452        };
1453
1454        match rx.try_recv() {
1455            Ok(result) => {
1456                self.on_persistence_complete(result, start_time)?;
1457                Ok(true)
1458            }
1459            Err(crossbeam_channel::TryRecvError::Empty) => {
1460                // Not ready yet, put it back
1461                self.persistence_state.rx = Some((rx, start_time, action));
1462                Ok(false)
1463            }
1464            Err(crossbeam_channel::TryRecvError::Disconnected) => {
1465                Err(AdvancePersistenceError::ChannelClosed)
1466            }
1467        }
1468    }
1469
1470    /// Handles a completed persistence task.
1471    fn on_persistence_complete(
1472        &mut self,
1473        result: PersistenceResult,
1474        start_time: Instant,
1475    ) -> Result<(), AdvancePersistenceError> {
1476        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1477
1478        let commit_duration = result.commit_duration;
1479        let Some(BlockNumHash {
1480            hash: last_persisted_block_hash,
1481            number: last_persisted_block_number,
1482        }) = result.last_block
1483        else {
1484            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1485            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1486            return Ok(())
1487        };
1488
1489        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1490        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1491
1492        // Evict trie changesets for blocks below the eviction threshold.
1493        // Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
1494        // the finalized block if set.
1495        let min_threshold =
1496            last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1497        let eviction_threshold =
1498            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1499                // Use the minimum of finalized block and retention threshold to be conservative
1500                finalized.number.min(min_threshold)
1501            } else {
1502                // When finalized is not set (e.g., on L2s), use the retention threshold
1503                min_threshold
1504            };
1505        debug!(
1506            target: "engine::tree",
1507            last_persisted = last_persisted_block_number,
1508            finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1509            eviction_threshold,
1510            "Evicting changesets below threshold"
1511        );
1512        self.changeset_cache.evict(eviction_threshold);
1513
1514        self.on_new_persisted_block()?;
1515
1516        self.purge_timing_stats(last_persisted_block_number, commit_duration);
1517
1518        Ok(())
1519    }
1520
1521    /// Handles a message from the engine.
1522    ///
1523    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1524    fn on_engine_message(
1525        &mut self,
1526        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1527    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1528        match msg {
1529            FromEngine::Event(event) => match event {
1530                FromOrchestrator::BackfillSyncStarted => {
1531                    debug!(target: "engine::tree", "received backfill sync started event");
1532                    self.backfill_sync_state = BackfillSyncState::Active;
1533                }
1534                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1535                    self.on_backfill_sync_finished(ctrl)?;
1536                }
1537                FromOrchestrator::Terminate { tx } => {
1538                    debug!(target: "engine::tree", "received terminate request");
1539                    if let Err(err) = self.finish_termination(tx) {
1540                        error!(target: "engine::tree", %err, "Termination failed");
1541                    }
1542                    return Ok(ops::ControlFlow::Break(()))
1543                }
1544            },
1545            FromEngine::Request(request) => {
1546                match request {
1547                    EngineApiRequest::InsertExecutedBlock(payload) => {
1548                        let block_num_hash = payload.recovered_block.num_hash();
1549                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1550                            // outdated block that can be skipped
1551                            return Ok(ops::ControlFlow::Continue(()))
1552                        }
1553
1554                        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
1555                            // block already known to the tree (e.g. delivered via newPayload first)
1556                            return Ok(ops::ControlFlow::Continue(()))
1557                        }
1558
1559                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1560                        let now = Instant::now();
1561
1562                        let block = match self
1563                            .payload_validator
1564                            .on_inserted_executed_block(payload, &self.state)
1565                        {
1566                            Ok(block) => block,
1567                            Err(err) => {
1568                                warn!(target: "engine::tree", %err, block=?block_num_hash, "Failed to insert already executed block");
1569                                return Ok(ops::ControlFlow::Continue(()))
1570                            }
1571                        };
1572
1573                        // if the parent is the canonical head, we can insert the block as the
1574                        // pending block
1575                        if self.state.tree_state.canonical_block_hash() ==
1576                            block.recovered_block().parent_hash()
1577                        {
1578                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1579                            self.canonical_in_memory_state.set_pending_block(block.clone());
1580                        }
1581
1582                        self.state.tree_state.insert_executed(block.clone());
1583                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1584                        self.emit_event(EngineApiEvent::BeaconConsensus(
1585                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1586                        ));
1587                    }
1588                    EngineApiRequest::Beacon(request) => {
1589                        match request {
1590                            BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1591                                let has_attrs = payload_attrs.is_some();
1592
1593                                let start = Instant::now();
1594                                let mut output = self.on_forkchoice_updated(state, payload_attrs);
1595
1596                                if let Ok(res) = &mut output {
1597                                    // track last received forkchoice state
1598                                    self.state
1599                                        .forkchoice_state_tracker
1600                                        .set_latest(state, res.outcome.forkchoice_status());
1601
1602                                    // emit an event about the handled FCU
1603                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1604                                        state,
1605                                        res.outcome.forkchoice_status(),
1606                                    ));
1607
1608                                    // handle the event if any
1609                                    self.on_maybe_tree_event(res.event.take())?;
1610                                }
1611
1612                                if let Err(ref err) = output {
1613                                    error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1614                                }
1615
1616                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1617                                    start,
1618                                    &mut self.metrics.engine.new_payload.latest_finish_at,
1619                                    has_attrs,
1620                                    &output,
1621                                );
1622
1623                                if let Err(err) =
1624                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1625                                {
1626                                    self.metrics
1627                                        .engine
1628                                        .failed_forkchoice_updated_response_deliveries
1629                                        .increment(1);
1630                                    warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1631                                }
1632                            }
1633                            BeaconEngineMessage::NewPayload { payload, tx } => {
1634                                let start = Instant::now();
1635                                let gas_used = payload.gas_used();
1636                                let num_hash = payload.num_hash();
1637                                let mut output = self.on_new_payload(payload);
1638                                self.metrics.engine.new_payload.update_response_metrics(
1639                                    start,
1640                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1641                                    &output,
1642                                    gas_used,
1643                                );
1644
1645                                let maybe_event =
1646                                    output.as_mut().ok().and_then(|out| out.event.take());
1647
1648                                // emit response
1649                                if let Err(err) =
1650                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1651                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1652                                    }))
1653                                {
1654                                    warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1655                                    self.metrics
1656                                        .engine
1657                                        .failed_new_payload_response_deliveries
1658                                        .increment(1);
1659                                }
1660
1661                                // handle the event if any
1662                                self.on_maybe_tree_event(maybe_event)?;
1663                            }
1664                            BeaconEngineMessage::RethNewPayload {
1665                                payload,
1666                                wait_for_persistence,
1667                                wait_for_caches,
1668                                tx,
1669                                enqueued_at,
1670                            } => {
1671                                debug!(
1672                                    target: "engine::tree",
1673                                    wait_for_persistence,
1674                                    wait_for_caches,
1675                                    "Processing reth_newPayload"
1676                                );
1677
1678                                let backpressure_wait = enqueued_at.elapsed();
1679
1680                                let explicit_persistence_wait = if wait_for_persistence {
1681                                    let pending_persistence = self.persistence_state.rx.take();
1682                                    if let Some((rx, start_time, _action)) = pending_persistence {
1683                                        let (persistence_tx, persistence_rx) =
1684                                            std::sync::mpsc::channel();
1685                                        self.runtime.spawn_blocking_named(
1686                                            "wait-persist",
1687                                            move || {
1688                                                let start = Instant::now();
1689                                                let result = rx
1690                                                    .recv()
1691                                                    .expect("persistence state channel closed");
1692                                                let _ = persistence_tx.send((
1693                                                    result,
1694                                                    start_time,
1695                                                    start.elapsed(),
1696                                                ));
1697                                            },
1698                                        );
1699                                        let (result, start_time, wait_duration) = persistence_rx
1700                                            .recv()
1701                                            .expect("persistence result channel closed");
1702                                        let _ = self.on_persistence_complete(result, start_time);
1703                                        wait_duration
1704                                    } else {
1705                                        Duration::ZERO
1706                                    }
1707                                } else {
1708                                    Duration::ZERO
1709                                };
1710
1711                                let cache_wait = wait_for_caches
1712                                    .then(|| self.payload_validator.wait_for_caches());
1713
1714                                let start = Instant::now();
1715                                let gas_used = payload.gas_used();
1716                                let num_hash = payload.num_hash();
1717                                let mut output = self.on_new_payload(payload);
1718                                let latency = start.elapsed();
1719                                self.metrics.engine.new_payload.update_response_metrics(
1720                                    start,
1721                                    &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1722                                    &output,
1723                                    gas_used,
1724                                );
1725
1726                                let maybe_event =
1727                                    output.as_mut().ok().and_then(|out| out.event.take());
1728
1729                                let timings = NewPayloadTimings {
1730                                    latency,
1731                                    persistence_wait: backpressure_wait + explicit_persistence_wait,
1732                                    execution_cache_wait: cache_wait
1733                                        .map(|wait| wait.execution_cache),
1734                                    sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1735                                };
1736                                if let Err(err) =
1737                                    tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1738                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1739                                    }))
1740                                {
1741                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1742                                    self.metrics
1743                                        .engine
1744                                        .failed_new_payload_response_deliveries
1745                                        .increment(1);
1746                                }
1747
1748                                self.on_maybe_tree_event(maybe_event)?;
1749                            }
1750                        }
1751                    }
1752                }
1753            }
1754            FromEngine::DownloadedBlocks(blocks) => {
1755                if let Some(event) = self.on_downloaded(blocks)? {
1756                    self.on_tree_event(event)?;
1757                }
1758            }
1759        }
1760        Ok(ops::ControlFlow::Continue(()))
1761    }
1762
1763    /// Invoked if the backfill sync has finished to target.
1764    ///
1765    /// At this point we consider the block synced to the backfill target.
1766    ///
1767    /// Checks the tracked finalized block against the block on disk and requests another backfill
1768    /// run if the distance to the tip exceeds the threshold for another backfill run.
1769    ///
1770    /// This will also do the necessary housekeeping of the tree state, this includes:
1771    ///  - removing all blocks below the backfill height
1772    ///  - resetting the canonical in-memory state
1773    ///
1774    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1775    /// target block.
1776    fn on_backfill_sync_finished(
1777        &mut self,
1778        ctrl: ControlFlow,
1779    ) -> Result<(), InsertBlockFatalError> {
1780        debug!(target: "engine::tree", "received backfill sync finished event");
1781        self.backfill_sync_state = BackfillSyncState::Idle;
1782
1783        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1784        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1785            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1786            // update the `invalid_headers` cache with the new invalid header
1787            self.state.invalid_headers.insert(**bad_block);
1788
1789            // if this was an unwind then the target is the new height
1790            Some(*target)
1791        } else {
1792            // backfill height is the block number that the backfill finished at
1793            ctrl.block_number()
1794        };
1795
1796        // backfill height is the block number that the backfill finished at
1797        let Some(backfill_height) = backfill_height else { return Ok(()) };
1798
1799        // state house keeping after backfill sync
1800        // remove all executed blocks below the backfill height
1801        //
1802        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1803        // before that
1804        let Some(backfill_num_hash) = self
1805            .provider
1806            .block_hash(backfill_height)?
1807            .map(|hash| BlockNumHash { hash, number: backfill_height })
1808        else {
1809            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1810            return Ok(())
1811        };
1812
1813        if ctrl.is_unwind() {
1814            // the node reset so we need to clear everything above that height so that backfill
1815            // height is the new canonical block.
1816            self.state.tree_state.reset(backfill_num_hash)
1817        } else {
1818            self.state.tree_state.remove_until(
1819                backfill_num_hash,
1820                self.persistence_state.last_persisted_block.hash,
1821                Some(backfill_num_hash),
1822            );
1823        }
1824
1825        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1826        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1827
1828        // remove all buffered blocks below the backfill height
1829        self.state.buffer.remove_old_blocks(backfill_height);
1830        self.purge_timing_stats(backfill_height, None);
1831        // we remove all entries because now we're synced to the backfill target and consider this
1832        // the canonical chain
1833        self.canonical_in_memory_state.clear_state();
1834
1835        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1836            // update the tracked chain height, after backfill sync both the canonical height and
1837            // persisted height are the same
1838            self.state.tree_state.set_canonical_head(new_head.num_hash());
1839            self.persistence_state.finish(new_head.hash(), new_head.number());
1840
1841            // update the tracked canonical head
1842            self.canonical_in_memory_state.set_canonical_head(new_head);
1843        }
1844
1845        // check if we need to run backfill again by comparing the most recent backfill target
1846        // height to the backfill height
1847        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1848        else {
1849            return Ok(())
1850        };
1851        if !self.engine_kind.is_opstack() && sync_target_state.finalized_block_hash.is_zero() {
1852            // no finalized block, can't check distance on non-OP Stack chains
1853            return Ok(())
1854        }
1855        let target_hash = self.backfill_target_hash(sync_target_state);
1856        if target_hash.is_zero() {
1857            return Ok(())
1858        }
1859        // get the block number of the backfill target block, if we have it buffered
1860        let newest_target = self.state.buffer.block(&target_hash).map(|block| block.number());
1861
1862        // The block number that the backfill finished at - if the progress or newest target is
1863        // None then we can't check the distance anyways.
1864        //
1865        // If both are Some, we perform another distance check and return the desired backfill
1866        // target
1867        if let Some(backfill_target) =
1868            ctrl.block_number().zip(newest_target).and_then(|(progress, target_number)| {
1869                // Determines whether or not we should run backfill again, in case
1870                // the new gap is still large enough and requires running backfill again
1871                self.backfill_sync_target(progress, target_number, None)
1872            })
1873        {
1874            // request another backfill run
1875            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1876                backfill_target.into(),
1877            )));
1878            return Ok(())
1879        };
1880
1881        // Check if there are more blocks to sync between current head and FCU target
1882        if let Some(lowest_buffered) =
1883            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1884        {
1885            let current_head_num = self.state.tree_state.current_canonical_head.number;
1886            let target_head_num = lowest_buffered.number();
1887
1888            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1889            {
1890                // There are blocks between current head and FCU target, download them
1891                debug!(
1892                    target: "engine::tree",
1893                    %current_head_num,
1894                    %target_head_num,
1895                    %distance,
1896                    "Backfill complete, downloading remaining blocks to reach FCU target"
1897                );
1898
1899                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1900                    lowest_buffered.parent_hash(),
1901                    distance,
1902                )));
1903                return Ok(());
1904            }
1905        } else {
1906            // We don't have the head block or any of its ancestors buffered. Request
1907            // a download for the head block which will then trigger further sync.
1908            debug!(
1909                target: "engine::tree",
1910                head_hash = %sync_target_state.head_block_hash,
1911                "Backfill complete but head block not buffered, requesting download"
1912            );
1913            self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1914                sync_target_state.head_block_hash,
1915            )));
1916            return Ok(());
1917        }
1918
1919        // try to close the gap by executing buffered blocks that are child blocks of the new head
1920        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1921    }
1922
1923    /// Attempts to make the given target canonical.
1924    ///
1925    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1926    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1927        if let Some(chain_update) = self.on_new_head(target)? {
1928            self.on_canonical_chain_update(chain_update);
1929        }
1930
1931        self.on_canonicalized_sync_target(target);
1932
1933        Ok(())
1934    }
1935
1936    /// Applies the tracked forkchoice state once its sync target head becomes canonical.
1937    fn on_canonicalized_sync_target(&mut self, target: B256) {
1938        let Some(sync_target_state) = self
1939            .state
1940            .forkchoice_state_tracker
1941            .sync_target_state()
1942            .filter(|state| state.head_block_hash == target)
1943        else {
1944            return;
1945        };
1946
1947        if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
1948            debug!(
1949                target: "engine::tree",
1950                head = %sync_target_state.head_block_hash,
1951                safe = %sync_target_state.safe_block_hash,
1952                finalized = %sync_target_state.finalized_block_hash,
1953                ?outcome,
1954                "Canonicalized sync target head before safe/finalized could be applied"
1955            );
1956            return;
1957        }
1958
1959        self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
1960    }
1961
1962    /// Convenience function to handle an optional tree event.
1963    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1964        if let Some(event) = event {
1965            self.on_tree_event(event)?;
1966        }
1967
1968        Ok(())
1969    }
1970
1971    /// Handles a tree event.
1972    ///
1973    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1974    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1975        match event {
1976            TreeEvent::TreeAction(action) => match action {
1977                TreeAction::MakeCanonical { sync_target_head } => {
1978                    self.make_canonical(sync_target_head)?;
1979                }
1980            },
1981            TreeEvent::BackfillAction(action) => {
1982                self.emit_event(EngineApiEvent::BackfillAction(action));
1983            }
1984            TreeEvent::Download(action) => {
1985                self.emit_event(EngineApiEvent::Download(action));
1986            }
1987        }
1988
1989        Ok(())
1990    }
1991
1992    /// Removes timing stats for blocks at or below `below_number`.
1993    ///
1994    /// No-op when detailed block logging is disabled (no stats are recorded in that case).
1995    /// When `commit_duration` is provided and a slow block threshold is configured, checks
1996    /// each removed block against the threshold and emits a [`ConsensusEngineEvent::SlowBlock`]
1997    /// event for blocks that exceed it.
1998    fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
1999        let threshold = self.config.slow_block_threshold();
2000        let check_slow = commit_duration.is_some() && threshold.is_some();
2001
2002        // Two-pass: collect keys first because emit_event borrows &mut self.
2003        let keys_to_remove: Vec<B256> = self
2004            .execution_timing_stats
2005            .iter()
2006            .filter(|(_, stats)| stats.block_number <= below_number)
2007            .map(|(k, _)| *k)
2008            .collect();
2009
2010        for key in keys_to_remove {
2011            let stats = self.execution_timing_stats.remove(&key).expect("key just found");
2012            if check_slow {
2013                let commit_dur = commit_duration.expect("checked above");
2014                // state_read_duration is already included in execution_duration
2015                let total_duration =
2016                    stats.execution_duration + stats.state_hash_duration + commit_dur;
2017
2018                if total_duration > threshold.expect("checked above") {
2019                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2020                        stats,
2021                        commit_duration: Some(commit_dur),
2022                        total_duration,
2023                    }));
2024                }
2025            }
2026        }
2027    }
2028
2029    /// Emits an outgoing event to the engine.
2030    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
2031        let event = event.into();
2032
2033        if event.is_backfill_action() {
2034            debug_assert_eq!(
2035                self.backfill_sync_state,
2036                BackfillSyncState::Idle,
2037                "backfill action should only be emitted when backfill is idle"
2038            );
2039
2040            if self.persistence_state.in_progress() {
2041                // backfill sync and persisting data are mutually exclusive, so we can't start
2042                // backfill while we're still persisting
2043                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
2044                return
2045            }
2046
2047            self.backfill_sync_state = BackfillSyncState::Pending;
2048            self.metrics.engine.pipeline_runs.increment(1);
2049            debug!(target: "engine::tree", "emitting backfill action event");
2050        }
2051
2052        let _ = self.outgoing.send(event).inspect_err(
2053            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2054        );
2055    }
2056
2057    /// Returns true if the canonical chain length minus the last persisted
2058    /// block is greater than or equal to the persistence threshold,
2059    /// backfill is not running, and no payload is currently being built.
2060    pub const fn should_persist(&self) -> bool {
2061        if self.building_payload {
2062            return false
2063        }
2064
2065        if !self.backfill_sync_state.is_idle() {
2066            // can't persist if backfill is running
2067            return false
2068        }
2069
2070        let min_block = self.persistence_state.last_persisted_block.number;
2071        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2072            self.config.persistence_threshold()
2073    }
2074
2075    /// Returns a batch of consecutive canonical blocks to persist in the range
2076    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
2077    fn get_canonical_blocks_to_persist(
2078        &self,
2079        target: PersistTarget,
2080    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2081        // We will calculate the state root using the database, so we need to be sure there are no
2082        // changes
2083        debug_assert!(!self.persistence_state.in_progress());
2084
2085        let mut blocks_to_persist = Vec::new();
2086        let mut current_hash = self.state.tree_state.canonical_block_hash();
2087        let last_persisted_number = self.persistence_state.last_persisted_block.number;
2088        let canonical_head_number = self.state.tree_state.canonical_block_number();
2089
2090        let target_number = match target {
2091            PersistTarget::Head => canonical_head_number,
2092            PersistTarget::Threshold => {
2093                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2094            }
2095        };
2096
2097        debug!(
2098            target: "engine::tree",
2099            ?current_hash,
2100            ?last_persisted_number,
2101            ?canonical_head_number,
2102            ?target_number,
2103            "Returning canonical blocks to persist"
2104        );
2105        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
2106            if block.recovered_block().number() <= last_persisted_number {
2107                break;
2108            }
2109
2110            if block.recovered_block().number() <= target_number {
2111                blocks_to_persist.push(block.clone());
2112            }
2113
2114            current_hash = block.recovered_block().parent_hash();
2115        }
2116
2117        // Reverse the order so that the oldest block comes first
2118        blocks_to_persist.reverse();
2119
2120        Ok(blocks_to_persist)
2121    }
2122
2123    /// This clears the blocks from the in-memory tree state that have been persisted to the
2124    /// database.
2125    ///
2126    /// This also updates the canonical in-memory state to reflect the newest persisted block
2127    /// height.
2128    ///
2129    /// Assumes that `finish` has been called on the `persistence_state` at least once
2130    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2131        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
2132        // state.
2133        if let Some(remove_above) = self.find_disk_reorg()? {
2134            self.remove_blocks(remove_above);
2135            return Ok(())
2136        }
2137
2138        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2139        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2140        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2141            number: self.persistence_state.last_persisted_block.number,
2142            hash: self.persistence_state.last_persisted_block.hash,
2143        });
2144        Ok(())
2145    }
2146
2147    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
2148    ///
2149    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
2150    /// or the database. If the required historical data (such as trie change sets) has been
2151    /// pruned for a given block, this operation will return an error. On archive nodes, it
2152    /// can retrieve any block.
2153    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2154    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2155        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2156        // check memory first
2157        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2158            return Ok(block.clone())
2159        }
2160
2161        let (block, senders) = self
2162            .provider
2163            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2164            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2165            .split_sealed();
2166        let mut execution_output = self
2167            .provider
2168            .get_state(block.header().number())?
2169            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2170        let hashed_state = self.provider.hashed_post_state(execution_output.state());
2171
2172        debug!(
2173            target: "engine::tree",
2174            number = ?block.number(),
2175            "computing block trie updates",
2176        );
2177        let db_provider = self.provider.database_provider_ro()?;
2178        let trie_updates = reth_trie_db::compute_block_trie_updates(
2179            &self.changeset_cache,
2180            &db_provider,
2181            block.number(),
2182        )?;
2183
2184        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2185        let sorted_trie_updates = Arc::new(trie_updates);
2186        let trie_data = ComputedTrieData::new(sorted_hashed_state, sorted_trie_updates);
2187
2188        let execution_output = Arc::new(BlockExecutionOutput {
2189            state: execution_output.bundle,
2190            result: BlockExecutionResult {
2191                receipts: execution_output.receipts.pop().unwrap_or_default(),
2192                requests: execution_output.requests.pop().unwrap_or_default(),
2193                gas_used: block.gas_used(),
2194                blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2195            },
2196        });
2197
2198        Ok(ExecutedBlock::new(
2199            Arc::new(RecoveredBlock::new_sealed(block, senders)),
2200            execution_output,
2201            trie_data,
2202        ))
2203    }
2204
2205    /// Returns `true` if a block with the given hash is known, either in memory or in the
2206    /// database. This is a lightweight existence check that avoids constructing a full
2207    /// [`SealedHeader`].
2208    fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2209        if self.state.tree_state.contains_hash(&hash) {
2210            Ok(true)
2211        } else {
2212            self.provider.is_known(hash)
2213        }
2214    }
2215
2216    /// Return sealed block header from in-memory state or database by hash.
2217    fn sealed_header_by_hash(
2218        &self,
2219        hash: B256,
2220    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2221        // check memory first
2222        let header = self.state.tree_state.sealed_header_by_hash(&hash);
2223
2224        if header.is_some() {
2225            Ok(header)
2226        } else {
2227            self.provider.sealed_header_by_hash(hash)
2228        }
2229    }
2230
2231    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
2232    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
2233    /// not exist in the buffer, this returns the hash that is passed in.
2234    ///
2235    /// Returns the parent hash of the block itself if the block is buffered and has no other
2236    /// buffered ancestors.
2237    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2238        self.state
2239            .buffer
2240            .lowest_ancestor(&hash)
2241            .map(|block| block.parent_hash())
2242            .unwrap_or_else(|| hash)
2243    }
2244
2245    /// If validation fails, the response MUST contain the latest valid hash:
2246    ///
2247    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
2248    ///     conditions:
2249    ///     - It is fully validated and deemed VALID
2250    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
2251    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
2252    ///     conditions are satisfied by a `PoW` block.
2253    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
2254    ///     the above conditions.
2255    fn latest_valid_hash_for_invalid_payload(
2256        &mut self,
2257        parent_hash: B256,
2258    ) -> ProviderResult<Option<B256>> {
2259        // Check if parent exists in side chain or in canonical chain.
2260        if self.has_block_by_hash(parent_hash)? {
2261            return Ok(Some(parent_hash))
2262        }
2263
2264        // iterate over ancestors in the invalid cache
2265        // until we encounter the first valid ancestor
2266        let mut current_hash = parent_hash;
2267        let mut current_block = self.state.invalid_headers.get(&current_hash);
2268        while let Some(block_with_parent) = current_block {
2269            current_hash = block_with_parent.parent;
2270            current_block = self.state.invalid_headers.get(&current_hash);
2271
2272            // If current_header is None, then the current_hash does not have an invalid
2273            // ancestor in the cache, check its presence in blockchain tree
2274            if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2275                return Ok(Some(current_hash))
2276            }
2277        }
2278        Ok(None)
2279    }
2280
2281    /// Prepares the invalid payload response for the given hash, checking the
2282    /// database for the parent hash and populating the payload status with the latest valid hash
2283    /// according to the engine api spec.
2284    fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2285        let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2286            // Edge case: the `latestValid` field is the zero hash if the parent block is the
2287            // terminal PoW block, which we need to identify by looking at the parent's block
2288            // difficulty
2289            Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2290            Some(_) => Some(parent_hash),
2291            None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2292        };
2293
2294        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2295            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2296        })
2297        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2298    }
2299
2300    /// Returns true if the given hash is the last received sync target block.
2301    ///
2302    /// See [`ForkchoiceStateTracker::sync_target_state`]
2303    fn is_sync_target_head(&self, block_hash: B256) -> bool {
2304        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2305            return target.head_block_hash == block_hash
2306        }
2307        false
2308    }
2309
2310    /// Returns true if the given hash is part of the last received sync target fork choice update.
2311    ///
2312    /// See [`ForkchoiceStateTracker::sync_target_state`]
2313    fn is_any_sync_target(&self, block_hash: B256) -> bool {
2314        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2315            return target.contains(block_hash)
2316        }
2317        false
2318    }
2319
2320    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
2321    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
2322    ///
2323    /// Returns a payload status response according to the engine API spec if the block is known to
2324    /// be invalid.
2325    fn check_invalid_ancestor_with_head(
2326        &mut self,
2327        check: B256,
2328        head: &SealedBlock<N::Block>,
2329    ) -> ProviderResult<Option<PayloadStatus>> {
2330        // check if the check hash was previously marked as invalid
2331        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2332
2333        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2334    }
2335
2336    /// Invoked when a new payload received is invalid.
2337    fn on_invalid_new_payload(
2338        &mut self,
2339        head: SealedBlock<N::Block>,
2340        invalid: BlockWithParent,
2341    ) -> ProviderResult<PayloadStatus> {
2342        // populate the latest valid hash field
2343        let status = self.prepare_invalid_response(invalid.parent)?;
2344
2345        // insert the head block into the invalid header cache
2346        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2347        self.emit_event(ConsensusEngineEvent::InvalidBlock {
2348            block: Box::new(head),
2349            error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2350        });
2351
2352        Ok(status)
2353    }
2354
2355    /// Finds any invalid ancestor for the given payload.
2356    ///
2357    /// This function first checks if the block itself is in the invalid headers cache (to
2358    /// avoid re-executing a known-invalid block). Then it walks up the chain of buffered
2359    /// ancestors and checks if any ancestor is marked as invalid.
2360    ///
2361    /// The check works by:
2362    /// 1. Checking if the block hash itself is in the `invalid_headers` map
2363    /// 2. Finding the lowest buffered ancestor for the given block hash
2364    /// 3. If the ancestor is the same as the block hash itself, using the parent hash instead
2365    /// 4. Checking if this ancestor is in the `invalid_headers` map
2366    ///
2367    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
2368    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2369        let parent_hash = payload.parent_hash();
2370        let block_hash = payload.block_hash();
2371
2372        // Check if the block itself is already known to be invalid, avoiding re-execution
2373        if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2374            return Some(entry);
2375        }
2376
2377        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2378        if lowest_buffered_ancestor == block_hash {
2379            lowest_buffered_ancestor = parent_hash;
2380        }
2381
2382        // Check if the block has an invalid ancestor
2383        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2384    }
2385
2386    /// Handles a payload that has an invalid ancestor.
2387    ///
2388    /// This function validates the payload and processes it according to whether it's
2389    /// well-formed or malformed:
2390    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2391    ///    known-bad block, which violates consensus rules
2392    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2393    ///    validated due to its own structural issues
2394    fn handle_invalid_ancestor_payload(
2395        &mut self,
2396        payload: T::ExecutionData,
2397        invalid: BlockWithParent,
2398    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2399        let parent_hash = payload.parent_hash();
2400        let num_hash = payload.num_hash();
2401
2402        // Here we might have 2 cases
2403        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2404        //    remember it as invalid
2405        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2406        //    an error and forget it
2407        let block = match self.payload_validator.convert_payload_to_block(payload) {
2408            Ok(block) => block,
2409            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2410        };
2411
2412        Ok(self.on_invalid_new_payload(block, invalid)?)
2413    }
2414
2415    /// Checks if the given `head` points to an invalid header, which requires a specific response
2416    /// to a forkchoice update.
2417    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2418        // check if the head was previously marked as invalid
2419        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2420
2421        // Try to prepare invalid response, but handle errors gracefully
2422        match self.prepare_invalid_response(header.parent) {
2423            Ok(status) => Ok(Some(status)),
2424            Err(err) => {
2425                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2426                // Return a basic invalid status without latest valid hash
2427                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2428                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2429                })))
2430            }
2431        }
2432    }
2433
2434    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2435    /// and block body itself.
2436    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2437        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2438            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2439            return Err(e)
2440        }
2441
2442        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2443            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2444            return Err(e)
2445        }
2446
2447        Ok(())
2448    }
2449
2450    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2451    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2452    fn try_connect_buffered_blocks(
2453        &mut self,
2454        parent: BlockNumHash,
2455    ) -> Result<(), InsertBlockFatalError> {
2456        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2457
2458        if blocks.is_empty() {
2459            // nothing to append
2460            return Ok(())
2461        }
2462
2463        let now = Instant::now();
2464        let block_count = blocks.len();
2465        for child in blocks {
2466            let child_num_hash = child.num_hash();
2467            match self.insert_block(child) {
2468                Ok(res) => {
2469                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2470                    if self.is_any_sync_target(child_num_hash.hash) &&
2471                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2472                    {
2473                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2474                        // we just inserted a block that we know is part of the canonical chain, so
2475                        // we can make it canonical
2476                        self.make_canonical(child_num_hash.hash)?;
2477                    }
2478                }
2479                Err(err) => {
2480                    if let InsertPayloadError::Block(err) = err {
2481                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2482                        if let Err(fatal) = self.on_insert_block_error(err) {
2483                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2484                            return Err(fatal)
2485                        }
2486                    }
2487                }
2488            }
2489        }
2490
2491        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2492        Ok(())
2493    }
2494
2495    /// Pre-validates the block and inserts it into the buffer.
2496    fn buffer_block(
2497        &mut self,
2498        block: SealedBlock<N::Block>,
2499    ) -> Result<(), InsertBlockError<N::Block>> {
2500        if let Err(err) = self.validate_block(&block) {
2501            return Err(InsertBlockError::consensus_error(err, block))
2502        }
2503        self.state.buffer.insert_block(block);
2504        Ok(())
2505    }
2506
2507    /// Returns true if the distance from the local tip to the block is greater than the configured
2508    /// threshold.
2509    ///
2510    /// If the `local_tip` is greater than the `block`, then this will return false.
2511    #[inline]
2512    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2513        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2514    }
2515
2516    /// Returns how far the local tip is from the given block. If the local tip is at the same
2517    /// height or its block number is greater than the given block, this returns None.
2518    #[inline]
2519    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2520        if block > local_tip {
2521            Some(block - local_tip)
2522        } else {
2523            None
2524        }
2525    }
2526
2527    /// Returns the block hash that backfill should target.
2528    ///
2529    /// Defaults to the finalized block hash. On OP Stack, the CL finalizes in large batches and the
2530    /// finalized hash can lag the canonical tip by a wide margin, so backfill targets the head.
2531    ///
2532    /// The zero-finalized optimistic-sync fallback for non-OP Stack chains is handled by
2533    /// [`Self::backfill_sync_target`].
2534    const fn backfill_target_hash(&self, state: ForkchoiceState) -> B256 {
2535        if self.engine_kind.is_opstack() {
2536            state.head_block_hash
2537        } else {
2538            state.finalized_block_hash
2539        }
2540    }
2541
2542    /// Returns the target hash to sync to if the distance from the local tip is greater than the
2543    /// threshold and we're not yet synced to the backfill target (see
2544    /// [`Self::backfill_target_hash`]).
2545    ///
2546    /// If this is invoked after a new block has been downloaded, the downloaded block could be
2547    /// the (missing) target block.
2548    fn backfill_sync_target(
2549        &self,
2550        canonical_tip_num: u64,
2551        target_block_number: u64,
2552        downloaded_block: Option<BlockNumHash>,
2553    ) -> Option<B256> {
2554        let state = self.state.forkchoice_state_tracker.sync_target_state()?;
2555        let target_hash = self.backfill_target_hash(state);
2556
2557        // check if the downloaded block is the tracked backfill target
2558        let exceeds_backfill_threshold = match downloaded_block.as_ref() {
2559            // if we downloaded the target block we can now check how far we're off
2560            Some(downloaded_block) if downloaded_block.hash == target_hash => {
2561                self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2562            }
2563            _ => match self.state.buffer.block(&target_hash) {
2564                // if we have buffered the target block, we should check how far we're off
2565                Some(buffered_target) => {
2566                    self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_target.number())
2567                }
2568                // check if the distance exceeds the threshold for backfill sync
2569                None => self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number),
2570            },
2571        };
2572
2573        if !exceeds_backfill_threshold {
2574            return None
2575        }
2576
2577        // if we have already canonicalized the target block, we should skip backfill
2578        match self.provider.header_by_hash_or_number(target_hash.into()) {
2579            Err(err) => {
2580                warn!(target: "engine::tree", %err, "Failed to get backfill target block header");
2581                None
2582            }
2583            // we don't have the block yet and the distance exceeds the allowed threshold
2584            Ok(None) if !target_hash.is_zero() => Some(target_hash),
2585            Ok(None) => {
2586                // OPTIMISTIC SYNCING
2587                //
2588                // It can happen when the node is doing an
2589                // optimistic sync, where the CL has no knowledge of the finalized hash,
2590                // but is expecting the EL to sync as high
2591                // as possible before finalizing.
2592                //
2593                // This usually doesn't happen on ETH mainnet since CLs use the more
2594                // secure checkpoint syncing.
2595                //
2596                // However, optimism chains will do this. The risk of a reorg is however
2597                // low.
2598                debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2599                Some(state.head_block_hash)
2600            }
2601            // we're fully synced to the target block
2602            Ok(Some(_)) => None,
2603        }
2604    }
2605
2606    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2607    /// happen if a reorg is happening while we are persisting a block.
2608    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2609        let mut canonical = self.state.tree_state.current_canonical_head;
2610        let mut persisted = self.persistence_state.last_persisted_block;
2611
2612        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2613            Ok(self
2614                .sealed_header_by_hash(num_hash.hash)?
2615                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2616                .parent_num_hash())
2617        };
2618
2619        // Happy path, canonical chain is ahead or equal to persisted chain.
2620        // Walk canonical chain back to make sure that it connects to persisted chain.
2621        while canonical.number > persisted.number {
2622            canonical = parent_num_hash(canonical)?;
2623        }
2624
2625        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2626        if canonical == persisted {
2627            return Ok(None);
2628        }
2629
2630        // At this point, we know that `persisted` block can't be reached by walking the canonical
2631        // chain back. In this case we need to truncate it to the first canonical block it connects
2632        // to.
2633
2634        // Firstly, walk back until we reach the same height as `canonical`.
2635        while persisted.number > canonical.number {
2636            persisted = parent_num_hash(persisted)?;
2637        }
2638
2639        debug_assert_eq!(persisted.number, canonical.number);
2640
2641        // Now walk both chains back until we find a common ancestor.
2642        while persisted.hash != canonical.hash {
2643            canonical = parent_num_hash(canonical)?;
2644            persisted = parent_num_hash(persisted)?;
2645        }
2646
2647        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2648
2649        Ok(Some(persisted.number))
2650    }
2651
2652    /// Invoked when we the canonical chain has been updated.
2653    ///
2654    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2655    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2656        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2657        let start = Instant::now();
2658
2659        // update the tracked canonical head
2660        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2661
2662        let tip = chain_update.tip().clone_sealed_header();
2663        let notification = chain_update.to_chain_notification();
2664
2665        // reinsert any missing reorged blocks
2666        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2667            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2668            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2669            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2670
2671            self.update_reorg_metrics(old.len(), old_first);
2672            self.reinsert_reorged_blocks(new.clone());
2673            self.reinsert_reorged_blocks(old.clone());
2674        }
2675
2676        // update the tracked in-memory state with the new chain
2677        self.canonical_in_memory_state.update_chain(chain_update);
2678        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2679
2680        // Update metrics based on new tip
2681        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2682
2683        // sends an event to all active listeners about the new canonical chain
2684        self.canonical_in_memory_state.notify_canon_state(notification);
2685
2686        // emit event
2687        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2688            Box::new(tip),
2689            start.elapsed(),
2690        ));
2691    }
2692
2693    /// This updates metrics based on the given reorg length and first reorged block number.
2694    fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2695        if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2696            if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2697                first_reorged_block <= finalized.number
2698            {
2699                self.metrics.tree.reorgs.finalized.increment(1);
2700            } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2701                first_reorged_block <= safe.number
2702            {
2703                self.metrics.tree.reorgs.safe.increment(1);
2704            } else {
2705                self.metrics.tree.reorgs.head.increment(1);
2706            }
2707        } else {
2708            debug_unreachable!("Reorged chain doesn't have any blocks");
2709        }
2710        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2711    }
2712
2713    /// This reinserts any blocks in the new chain that do not already exist in the tree
2714    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2715        for block in new_chain {
2716            if self
2717                .state
2718                .tree_state
2719                .executed_block_by_hash(block.recovered_block().hash())
2720                .is_none()
2721            {
2722                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2723                self.state.tree_state.insert_executed(block);
2724            }
2725        }
2726    }
2727
2728    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2729    ///
2730    /// This mainly compares the missing parent of the downloaded block with the current canonical
2731    /// tip, and decides whether or not backfill sync should be triggered.
2732    fn on_disconnected_downloaded_block(
2733        &self,
2734        downloaded_block: BlockNumHash,
2735        missing_parent: BlockNumHash,
2736        head: BlockNumHash,
2737    ) -> Option<TreeEvent> {
2738        // compare the missing parent with the canonical tip
2739        if let Some(target) =
2740            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2741        {
2742            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2743            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2744        }
2745
2746        // continue downloading the missing parent
2747        //
2748        // this happens if either:
2749        //  * the missing parent block num < canonical tip num
2750        //    * this case represents a missing block on a fork that is shorter than the canonical
2751        //      chain
2752        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2753        //    less than the backfill threshold
2754        //    * this case represents a potentially long range of blocks to download and execute
2755        let request = if let Some(distance) =
2756            self.distance_from_local_tip(head.number, missing_parent.number)
2757        {
2758            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2759            DownloadRequest::BlockRange(missing_parent.hash, distance)
2760        } else {
2761            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2762            // This happens when the missing parent is on an outdated
2763            // sidechain and we can only download the missing block itself
2764            DownloadRequest::single_block(missing_parent.hash)
2765        };
2766
2767        Some(TreeEvent::Download(request))
2768    }
2769
2770    /// Handles a downloaded block that was successfully inserted as valid.
2771    ///
2772    /// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
2773    /// If it matches a non-head sync target (safe or finalized), makes it canonical inline
2774    /// and triggers a download for the remaining blocks towards the actual head.
2775    /// Otherwise, tries to connect buffered blocks.
2776    fn on_valid_downloaded_block(
2777        &mut self,
2778        block_num_hash: BlockNumHash,
2779    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2780        // check if we just inserted a block that's part of sync targets,
2781        // i.e. head, safe, or finalized
2782        if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2783            sync_target.contains(block_num_hash.hash)
2784        {
2785            debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2786
2787            if sync_target.head_block_hash == block_num_hash.hash {
2788                // we just inserted the sync target head block, make it canonical
2789                return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2790                    sync_target_head: block_num_hash.hash,
2791                })))
2792            }
2793
2794            // This block is part of the sync target (safe or finalized) but not the
2795            // head. Make it canonical and try to connect any buffered children, then
2796            // continue downloading towards the actual head if needed.
2797            self.make_canonical(block_num_hash.hash)?;
2798            self.try_connect_buffered_blocks(block_num_hash)?;
2799
2800            // Check if we've reached the sync target head after connecting buffered
2801            // blocks (e.g. the head block may have already been buffered).
2802            if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2803                let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2804                trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2805                return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2806            }
2807
2808            return Ok(None)
2809        }
2810        trace!(target: "engine::tree", "appended downloaded block");
2811        self.try_connect_buffered_blocks(block_num_hash)?;
2812        Ok(None)
2813    }
2814
2815    /// Invoked with a block downloaded from the network
2816    ///
2817    /// Returns an event with the appropriate action to take, such as:
2818    ///  - download more missing blocks
2819    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2820    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2821    fn on_downloaded_block(
2822        &mut self,
2823        block: SealedBlock<N::Block>,
2824    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2825        let block_num_hash = block.num_hash();
2826        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2827        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2828            return Ok(None)
2829        }
2830
2831        if !self.backfill_sync_state.is_idle() {
2832            return Ok(None)
2833        }
2834
2835        // try to append the block
2836        match self.insert_block(block) {
2837            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2838                return self.on_valid_downloaded_block(block_num_hash);
2839            }
2840            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2841                // block is not connected to the canonical head, we need to download
2842                // its missing branch first
2843                return Ok(self.on_disconnected_downloaded_block(
2844                    block_num_hash,
2845                    missing_ancestor,
2846                    head,
2847                ))
2848            }
2849            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2850                trace!(target: "engine::tree", "downloaded block already executed");
2851            }
2852            Err(err) => {
2853                if let InsertPayloadError::Block(err) = err {
2854                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2855                    if let Err(fatal) = self.on_insert_block_error(err) {
2856                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2857                        return Err(fatal)
2858                    }
2859                }
2860            }
2861        }
2862        Ok(None)
2863    }
2864
2865    /// Inserts a payload into the tree and executes it.
2866    ///
2867    /// This function validates the payload's basic structure, then executes it using the
2868    /// payload validator. The execution includes running all transactions in the payload
2869    /// and validating the resulting state transitions.
2870    ///
2871    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2872    /// or `InsertPayloadError` if validation or execution failed.
2873    fn insert_payload(
2874        &mut self,
2875        payload: T::ExecutionData,
2876    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2877        self.insert_block_or_payload(
2878            payload.block_with_parent(),
2879            payload,
2880            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2881            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2882        )
2883    }
2884
2885    fn insert_block(
2886        &mut self,
2887        block: SealedBlock<N::Block>,
2888    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2889        self.insert_block_or_payload(
2890            block.block_with_parent(),
2891            block,
2892            |validator, block, ctx| validator.validate_block(block, ctx),
2893            |_, block| Ok(block),
2894        )
2895    }
2896
2897    /// Inserts a block or payload into the blockchain tree with full execution.
2898    ///
2899    /// This is a generic function that handles both blocks and payloads by accepting
2900    /// a block identifier, input data, and execution/validation functions. It performs
2901    /// comprehensive checks and execution:
2902    ///
2903    /// - Validates that the block doesn't already exist in the tree
2904    /// - Ensures parent state is available, buffering if necessary
2905    /// - Executes the block/payload using the provided execute function
2906    /// - Handles both canonical and fork chain insertions
2907    /// - Updates pending block state when appropriate
2908    /// - Emits consensus engine events and records metrics
2909    ///
2910    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2911    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2912    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2913    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2914    fn insert_block_or_payload<Input, Err>(
2915        &mut self,
2916        block_id: BlockWithParent,
2917        input: Input,
2918        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ValidationOutput<N>, Err>,
2919        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2920    ) -> Result<InsertPayloadOk, Err>
2921    where
2922        Err: From<InsertBlockError<N::Block>>,
2923    {
2924        let block_insert_start = Instant::now();
2925        let block_num_hash = block_id.block;
2926        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2927
2928        // Check if block already exists - first in memory, then DB only if it could be persisted
2929        if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2930            convert_to_block(self, input)?;
2931            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2932        }
2933
2934        // Only query DB if block could be persisted (number <= last persisted block).
2935        // New blocks from CL always have number > last persisted, so skip DB lookup for them.
2936        if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2937            match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2938                Err(err) => {
2939                    let block = convert_to_block(self, input)?;
2940                    return Err(InsertBlockError::new(block, err.into()).into());
2941                }
2942                Ok(Some(_)) => {
2943                    convert_to_block(self, input)?;
2944                    return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2945                }
2946                Ok(None) => {}
2947            }
2948        }
2949
2950        // Ensure that the parent state is available.
2951        match self.state_provider_builder(block_id.parent) {
2952            Err(err) => {
2953                let block = convert_to_block(self, input)?;
2954                return Err(InsertBlockError::new(block, err.into()).into());
2955            }
2956            Ok(None) => {
2957                let block = convert_to_block(self, input)?;
2958
2959                // we don't have the state required to execute this block, buffering it and find the
2960                // missing parent block
2961                let missing_ancestor = self
2962                    .state
2963                    .buffer
2964                    .lowest_ancestor(&block.parent_hash())
2965                    .map(|block| block.parent_num_hash())
2966                    .unwrap_or_else(|| block.parent_num_hash());
2967
2968                self.state.buffer.insert_block(block);
2969
2970                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2971                    head: self.state.tree_state.current_canonical_head,
2972                    missing_ancestor,
2973                }))
2974            }
2975            Ok(Some(_)) => {}
2976        }
2977
2978        // determine whether we are on a fork chain by comparing the block number with the
2979        // canonical head. This is a simple check that is sufficient for the event emission below.
2980        // A block is considered a fork if its number is less than or equal to the canonical head,
2981        // as this indicates there's already a canonical block at that height.
2982        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2983
2984        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2985
2986        let start = Instant::now();
2987
2988        let ValidationOutput { executed_block: executed, execution_timing_stats: timing_stats } =
2989            execute(&mut self.payload_validator, input, ctx)?;
2990
2991        // Emit slow block event immediately after execution so it appears even when
2992        // persistence hasn't completed yet (e.g. blocks arriving faster than persistence).
2993        if let Some(stats) = timing_stats {
2994            if let Some(threshold) = self.config.slow_block_threshold() {
2995                let total_duration = stats.execution_duration + stats.state_hash_duration;
2996                if total_duration > threshold {
2997                    self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2998                        stats: stats.clone(),
2999                        commit_duration: None,
3000                        total_duration,
3001                    }));
3002                }
3003            }
3004            self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
3005        }
3006
3007        // if the parent is the canonical head, we can insert the block as the pending block
3008        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
3009        {
3010            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
3011            self.canonical_in_memory_state.set_pending_block(executed.clone());
3012        }
3013
3014        self.state.tree_state.insert_executed(executed.clone());
3015        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
3016
3017        // emit insert event
3018        let elapsed = start.elapsed();
3019        let engine_event = if is_fork {
3020            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
3021        } else {
3022            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
3023        };
3024        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
3025
3026        self.metrics
3027            .engine
3028            .block_insert_total_duration
3029            .record(block_insert_start.elapsed().as_secs_f64());
3030        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
3031        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
3032    }
3033
3034    /// Handles an error that occurred while inserting a block.
3035    ///
3036    /// If this is a validation error this will mark the block as invalid.
3037    ///
3038    /// Returns the proper payload status response if the block is invalid.
3039    fn on_insert_block_error(
3040        &mut self,
3041        error: InsertBlockError<N::Block>,
3042    ) -> Result<PayloadStatus, InsertBlockFatalError> {
3043        let (block, error) = error.split();
3044
3045        // if invalid block, we check the validation error. Otherwise return the fatal
3046        // error.
3047        let validation_err = error.ensure_validation_error()?;
3048
3049        // If the error was due to an invalid payload, the payload is added to the
3050        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
3051        // returned.
3052        warn!(
3053            target: "engine::tree",
3054            invalid_hash=%block.hash(),
3055            invalid_number=block.number(),
3056            %validation_err,
3057            "Invalid block error on new payload",
3058        );
3059        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3060
3061        // keep track of the invalid header unless the consensus impl considers it transient
3062        let is_transient = match &validation_err {
3063            InsertBlockValidationError::Consensus(err) => self.consensus.is_transient_error(err),
3064            _ => false,
3065        };
3066        if is_transient {
3067            warn!(
3068                target: "engine::tree",
3069                invalid_hash=%block.hash(),
3070                invalid_number=block.number(),
3071                %validation_err,
3072                "Skipping invalid header cache insert for transient validation error",
3073            );
3074        } else {
3075            self.state.invalid_headers.insert(block.block_with_parent());
3076        }
3077        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock {
3078            block: Box::new(block),
3079            error: validation_err.to_string(),
3080        }));
3081
3082        Ok(PayloadStatus::new(
3083            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3084            latest_valid_hash,
3085        ))
3086    }
3087
3088    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
3089    fn on_new_payload_error(
3090        &mut self,
3091        error: NewPayloadError,
3092        payload_num_hash: NumHash,
3093        parent_hash: B256,
3094    ) -> ProviderResult<PayloadStatus> {
3095        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3096        // we need to convert the error to a payload status (response to the CL)
3097
3098        let latest_valid_hash =
3099            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3100                // Engine-API rules:
3101                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
3102                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
3103                None
3104            } else {
3105                self.latest_valid_hash_for_invalid_payload(parent_hash)?
3106            };
3107
3108        let status = PayloadStatusEnum::from(error);
3109        Ok(PayloadStatus::new(status, latest_valid_hash))
3110    }
3111
3112    /// Attempts to find the header for the given block hash if it is canonical.
3113    pub fn find_canonical_header(
3114        &self,
3115        hash: B256,
3116    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3117        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3118
3119        if canonical.is_none() {
3120            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3121        }
3122
3123        Ok(canonical)
3124    }
3125
3126    /// Updates the tracked finalized block if we have it.
3127    fn update_finalized_block(
3128        &self,
3129        finalized_block_hash: B256,
3130    ) -> Result<(), OnForkChoiceUpdated> {
3131        if finalized_block_hash.is_zero() {
3132            return Ok(())
3133        }
3134
3135        match self.find_canonical_header(finalized_block_hash) {
3136            Ok(None) => {
3137                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3138                // if the finalized block is not known, we can't update the finalized block
3139                return Err(OnForkChoiceUpdated::invalid_state())
3140            }
3141            Ok(Some(finalized)) => {
3142                if Some(finalized.num_hash()) !=
3143                    self.canonical_in_memory_state.get_finalized_num_hash()
3144                {
3145                    // we're also persisting the finalized block on disk so we can reload it on
3146                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3147                    let _ = self.persistence.save_finalized_block_number(finalized.number());
3148                    self.canonical_in_memory_state.set_finalized(finalized.clone());
3149                    // Update finalized block height metric
3150                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3151                }
3152            }
3153            Err(err) => {
3154                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3155            }
3156        }
3157
3158        Ok(())
3159    }
3160
3161    /// Updates the tracked safe block if we have it
3162    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3163        if safe_block_hash.is_zero() {
3164            return Ok(())
3165        }
3166
3167        match self.find_canonical_header(safe_block_hash) {
3168            Ok(None) => {
3169                debug!(target: "engine::tree", "Safe block not found in canonical chain");
3170                // if the safe block is not known, we can't update the safe block
3171                return Err(OnForkChoiceUpdated::invalid_state())
3172            }
3173            Ok(Some(safe)) => {
3174                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3175                    // we're also persisting the safe block on disk so we can reload it on
3176                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
3177                    let _ = self.persistence.save_safe_block_number(safe.number());
3178                    self.canonical_in_memory_state.set_safe(safe.clone());
3179                    // Update safe block height metric
3180                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
3181                }
3182            }
3183            Err(err) => {
3184                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3185            }
3186        }
3187
3188        Ok(())
3189    }
3190
3191    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
3192    /// made canonical.
3193    ///
3194    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
3195    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
3196    ///
3197    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
3198    /// are consistent with the head block.
3199    fn ensure_consistent_forkchoice_state(
3200        &self,
3201        state: ForkchoiceState,
3202    ) -> Result<(), OnForkChoiceUpdated> {
3203        // Ensure that the finalized block, if not zero, is known and in the canonical chain
3204        // after the head block is canonicalized.
3205        //
3206        // This ensures that the finalized block is consistent with the head block, i.e. the
3207        // finalized block is an ancestor of the head block.
3208        self.update_finalized_block(state.finalized_block_hash)?;
3209
3210        // Also ensure that the safe block, if not zero, is known and in the canonical chain
3211        // after the head block is canonicalized.
3212        //
3213        // This ensures that the safe block is consistent with the head block, i.e. the safe
3214        // block is an ancestor of the head block.
3215        self.update_safe_block(state.safe_block_hash)
3216    }
3217
3218    /// Validates the payload attributes with respect to the header and fork choice state.
3219    ///
3220    /// This is called during `engine_forkchoiceUpdated` when the CL provides payload attributes,
3221    /// indicating it wants the EL to start building a new block.
3222    ///
3223    /// Runs [`PayloadValidator::validate_payload_attributes_against_header`](reth_engine_primitives::PayloadValidator::validate_payload_attributes_against_header) to ensure
3224    /// `payloadAttributes.timestamp > headBlock.timestamp` per the Engine API spec.
3225    ///
3226    /// If validation passes, sends the attributes to the payload builder to start a new
3227    /// payload job. If it fails, returns `INVALID_PAYLOAD_ATTRIBUTES` without rolling back
3228    /// the forkchoice update.
3229    ///
3230    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
3231    /// return an error if the payload attributes are invalid.
3232    fn process_payload_attributes(
3233        &self,
3234        attributes: T::PayloadAttributes,
3235        head: &N::BlockHeader,
3236        state: ForkchoiceState,
3237    ) -> OnForkChoiceUpdated {
3238        if let Err(err) =
3239            self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3240        {
3241            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3242            return OnForkChoiceUpdated::invalid_payload_attributes()
3243        }
3244
3245        // 8. Client software MUST begin a payload build process building on top of
3246        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
3247        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
3248        //    The build process is specified in the Payload building section.
3249
3250        let cache = if self.config.share_execution_cache_with_payload_builder() {
3251            self.payload_validator.cache_for(state.head_block_hash)
3252        } else {
3253            None
3254        };
3255
3256        let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3257            self.payload_validator.sparse_trie_handle_for(
3258                state.head_block_hash,
3259                head.state_root(),
3260                &self.state,
3261            )
3262        } else {
3263            None
3264        };
3265
3266        // send the payload to the builder and return the receiver for the pending payload
3267        // id, initiating payload job is handled asynchronously
3268        let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3269            parent_hash: state.head_block_hash,
3270            attributes,
3271            cache,
3272            trie_handle,
3273        });
3274
3275        // Client software MUST respond to this method call in the following way:
3276        // {
3277        //      payloadStatus: {
3278        //          status: VALID,
3279        //          latestValidHash: forkchoiceState.headBlockHash,
3280        //          validationError: null
3281        //      },
3282        //      payloadId: buildProcessId
3283        // }
3284        //
3285        // if the payload is deemed VALID and the build process has begun.
3286        OnForkChoiceUpdated::updated_with_pending_payload_id(
3287            PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3288            pending_payload_id,
3289        )
3290    }
3291
3292    /// Remove all blocks up to __and including__ the given block number.
3293    ///
3294    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
3295    /// those which have a fork point at or below the finalized hash.
3296    ///
3297    /// Canonical blocks below the upper bound will still be removed.
3298    pub(crate) fn remove_before(
3299        &mut self,
3300        upper_bound: BlockNumHash,
3301        finalized_hash: Option<B256>,
3302    ) -> ProviderResult<()> {
3303        // first fetch the finalized block number and then call the remove_before method on
3304        // tree_state
3305        let num = if let Some(hash) = finalized_hash {
3306            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3307        } else {
3308            None
3309        };
3310
3311        self.state.tree_state.remove_until(
3312            upper_bound,
3313            self.persistence_state.last_persisted_block.hash,
3314            num,
3315        );
3316        Ok(())
3317    }
3318
3319    /// Returns a builder for creating state providers for the given hash.
3320    ///
3321    /// This is an optimization for parallel execution contexts where we want to avoid
3322    /// creating state providers in the critical path.
3323    pub fn state_provider_builder(
3324        &self,
3325        hash: B256,
3326    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3327    where
3328        P: BlockReader + StateProviderFactory + StateReader + Clone,
3329    {
3330        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3331            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3332            // the block leads back to the canonical chain
3333            return Ok(Some(StateProviderBuilder::new(
3334                self.provider.clone(),
3335                historical,
3336                Some(blocks),
3337            )))
3338        }
3339
3340        // Check if the block is persisted
3341        if let Some(header) = self.provider.header(hash)? {
3342            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3343            // For persisted blocks, we create a builder that will fetch state directly from the
3344            // database
3345            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3346        }
3347
3348        debug!(target: "engine::tree", %hash, "no canonical state found for block");
3349        Ok(None)
3350    }
3351}
3352
3353/// Events received in the main engine loop.
3354#[derive(Debug)]
3355enum LoopEvent<T, N>
3356where
3357    N: NodePrimitives,
3358    T: PayloadTypes,
3359{
3360    /// An engine API message was received.
3361    EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3362    /// A persistence task completed.
3363    PersistenceComplete {
3364        /// The unified result of the persistence operation.
3365        result: PersistenceResult,
3366        /// When the persistence operation started.
3367        start_time: Instant,
3368    },
3369    /// A channel was disconnected.
3370    Disconnected,
3371}
3372
3373/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
3374/// variant.
3375///
3376/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
3377/// is valid or not.
3378#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3379pub enum BlockStatus {
3380    /// The block is valid and block extends canonical chain.
3381    Valid,
3382    /// The block may be valid and has an unknown missing ancestor.
3383    Disconnected {
3384        /// Current canonical head.
3385        head: BlockNumHash,
3386        /// The lowest ancestor block that is not connected to the canonical chain.
3387        missing_ancestor: BlockNumHash,
3388    },
3389}
3390
3391/// How a payload was inserted if it was valid.
3392///
3393/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3394/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3395#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3396pub enum InsertPayloadOk {
3397    /// The payload was valid, but we have already seen it.
3398    AlreadySeen(BlockStatus),
3399    /// The payload was valid and inserted into the tree.
3400    Inserted(BlockStatus),
3401}
3402
3403/// Target for block persistence.
3404#[derive(Debug, Clone, Copy)]
3405enum PersistTarget {
3406    /// Persist up to `canonical_head - memory_block_buffer_target`.
3407    Threshold,
3408    /// Persist all blocks up to and including the canonical head.
3409    Head,
3410}
3411
3412/// Result of waiting for caches to become available.
3413#[derive(Debug, Clone, Copy, Default)]
3414pub struct CacheWaitDurations {
3415    /// Time spent waiting for the execution cache lock.
3416    pub execution_cache: Duration,
3417    /// Time spent waiting for the sparse trie lock.
3418    pub sparse_trie: Duration,
3419}
3420
3421/// Trait for types that can wait for caches to become available.
3422///
3423/// This is used by `reth_newPayload` endpoint to ensure that payload processing
3424/// waits for any ongoing operations to complete before starting.
3425pub trait WaitForCaches {
3426    /// Waits for cache updates to complete.
3427    ///
3428    /// Returns the time spent waiting for each cache separately.
3429    fn wait_for_caches(&self) -> CacheWaitDurations;
3430}