reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use reth_chain_state::{
17    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
18    NewCanonicalChain,
19};
20use reth_consensus::{Consensus, FullConsensus};
21use reth_engine_primitives::{
22    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
23    ForkchoiceStateTracker, OnForkChoiceUpdated,
24};
25use reth_errors::{ConsensusError, ProviderResult};
26use reth_evm::{ConfigureEvm, OnStateHook};
27use reth_payload_builder::PayloadBuilderHandle;
28use reth_payload_primitives::{
29    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
30};
31use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
32use reth_provider::{
33    BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
34    StateProviderFactory, StateReader, TransactionVariant, TrieReader,
35};
36use reth_revm::database::StateProviderDatabase;
37use reth_stages_api::ControlFlow;
38use revm::state::EvmState;
39use state::TreeState;
40use std::{
41    fmt::Debug,
42    sync::{
43        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
44        Arc,
45    },
46    time::Instant,
47};
48use tokio::sync::{
49    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
50    oneshot::{self, error::TryRecvError},
51};
52use tracing::*;
53
54mod block_buffer;
55mod cached_state;
56pub mod error;
57pub mod instrumented_state;
58mod invalid_headers;
59mod metrics;
60mod payload_processor;
61pub mod payload_validator;
62mod persistence_state;
63pub mod precompile_cache;
64#[cfg(test)]
65mod tests;
66// TODO(alexey): compare trie updates in `insert_block_inner`
67#[expect(unused)]
68mod trie_updates;
69
70use crate::tree::error::AdvancePersistenceError;
71pub use block_buffer::BlockBuffer;
72pub use invalid_headers::InvalidHeaderCache;
73pub use payload_processor::*;
74pub use payload_validator::{BasicEngineValidator, EngineValidator};
75pub use persistence_state::PersistenceState;
76pub use reth_engine_primitives::TreeConfig;
77
78pub mod state;
79
80/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
81///
82/// This is the default threshold, and represents the distance (gap) from the local head to a
83/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
84/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
85///
86/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
87/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
88/// backfill this gap.
89pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91/// A builder for creating state providers that can be used across threads.
92#[derive(Clone, Debug)]
93pub struct StateProviderBuilder<N: NodePrimitives, P> {
94    /// The provider factory used to create providers.
95    provider_factory: P,
96    /// The historical block hash to fetch state from.
97    historical: B256,
98    /// The blocks that form the chain from historical to target and are in memory.
99    overlay: Option<Vec<ExecutedBlock<N>>>,
100}
101
102impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
103    /// Creates a new state provider from the provider factory, historical block hash and optional
104    /// overlaid blocks.
105    pub const fn new(
106        provider_factory: P,
107        historical: B256,
108        overlay: Option<Vec<ExecutedBlock<N>>>,
109    ) -> Self {
110        Self { provider_factory, historical, overlay }
111    }
112}
113
114impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
115where
116    P: BlockReader + StateProviderFactory + StateReader + Clone,
117{
118    /// Creates a new state provider from this builder.
119    pub fn build(&self) -> ProviderResult<StateProviderBox> {
120        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
121        if let Some(overlay) = self.overlay.clone() {
122            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
123        }
124        Ok(provider)
125    }
126}
127
128/// Tracks the state of the engine api internals.
129///
130/// This type is not shareable.
131#[derive(Debug)]
132pub struct EngineApiTreeState<N: NodePrimitives> {
133    /// Tracks the state of the blockchain tree.
134    tree_state: TreeState<N>,
135    /// Tracks the forkchoice state updates received by the CL.
136    forkchoice_state_tracker: ForkchoiceStateTracker,
137    /// Buffer of detached blocks.
138    buffer: BlockBuffer<N::Block>,
139    /// Tracks the header of invalid payloads that were rejected by the engine because they're
140    /// invalid.
141    invalid_headers: InvalidHeaderCache,
142}
143
144impl<N: NodePrimitives> EngineApiTreeState<N> {
145    fn new(
146        block_buffer_limit: u32,
147        max_invalid_header_cache_length: u32,
148        canonical_block: BlockNumHash,
149        engine_kind: EngineApiKind,
150    ) -> Self {
151        Self {
152            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
153            buffer: BlockBuffer::new(block_buffer_limit),
154            tree_state: TreeState::new(canonical_block, engine_kind),
155            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
156        }
157    }
158}
159
160/// The outcome of a tree operation.
161#[derive(Debug)]
162pub struct TreeOutcome<T> {
163    /// The outcome of the operation.
164    pub outcome: T,
165    /// An optional event to tell the caller to do something.
166    pub event: Option<TreeEvent>,
167}
168
169impl<T> TreeOutcome<T> {
170    /// Create new tree outcome.
171    pub const fn new(outcome: T) -> Self {
172        Self { outcome, event: None }
173    }
174
175    /// Set event on the outcome.
176    pub fn with_event(mut self, event: TreeEvent) -> Self {
177        self.event = Some(event);
178        self
179    }
180}
181
182/// Events that are triggered by Tree Chain
183#[derive(Debug)]
184pub enum TreeEvent {
185    /// Tree action is needed.
186    TreeAction(TreeAction),
187    /// Backfill action is needed.
188    BackfillAction(BackfillAction),
189    /// Block download is needed.
190    Download(DownloadRequest),
191}
192
193impl TreeEvent {
194    /// Returns true if the event is a backfill action.
195    const fn is_backfill_action(&self) -> bool {
196        matches!(self, Self::BackfillAction(_))
197    }
198}
199
200/// The actions that can be performed on the tree.
201#[derive(Debug)]
202pub enum TreeAction {
203    /// Make target canonical.
204    MakeCanonical {
205        /// The sync target head hash
206        sync_target_head: B256,
207    },
208}
209
210/// Wrapper struct that combines metrics and state hook
211struct MeteredStateHook {
212    metrics: reth_evm::metrics::ExecutorMetrics,
213    inner_hook: Box<dyn OnStateHook>,
214}
215
216impl OnStateHook for MeteredStateHook {
217    fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
218        // Update the metrics for the number of accounts, storage slots and bytecodes loaded
219        let accounts = state.keys().len();
220        let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
221        let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
222
223        self.metrics.accounts_loaded_histogram.record(accounts as f64);
224        self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
225        self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
226
227        // Call the original state hook
228        self.inner_hook.on_state(source, state);
229    }
230}
231
232/// The engine API tree handler implementation.
233///
234/// This type is responsible for processing engine API requests, maintaining the canonical state and
235/// emitting events.
236pub struct EngineApiTreeHandler<N, P, T, V, C>
237where
238    N: NodePrimitives,
239    T: PayloadTypes,
240    C: ConfigureEvm<Primitives = N> + 'static,
241{
242    provider: P,
243    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
244    payload_validator: V,
245    /// Keeps track of internals such as executed and buffered blocks.
246    state: EngineApiTreeState<N>,
247    /// The half for sending messages to the engine.
248    ///
249    /// This is kept so that we can queue in messages to ourself that we can process later, for
250    /// example distributing workload across multiple messages that would otherwise take too long
251    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
252    /// them one by one so that we can handle incoming engine API in between and don't become
253    /// unresponsive. This can happen during live sync transition where we're trying to close the
254    /// gap (up to 3 epochs of blocks in the worst case).
255    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
256    /// Incoming engine API requests.
257    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
258    /// Outgoing events that are emitted to the handler.
259    outgoing: UnboundedSender<EngineApiEvent<N>>,
260    /// Channels to the persistence layer.
261    persistence: PersistenceHandle<N>,
262    /// Tracks the state changes of the persistence task.
263    persistence_state: PersistenceState,
264    /// Flag indicating the state of the node's backfill synchronization process.
265    backfill_sync_state: BackfillSyncState,
266    /// Keeps track of the state of the canonical chain that isn't persisted yet.
267    /// This is intended to be accessed from external sources, such as rpc.
268    canonical_in_memory_state: CanonicalInMemoryState<N>,
269    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
270    /// updates
271    payload_builder: PayloadBuilderHandle<T>,
272    /// Configuration settings.
273    config: TreeConfig,
274    /// Metrics for the engine api.
275    metrics: EngineApiMetrics,
276    /// The engine API variant of this handler
277    engine_kind: EngineApiKind,
278    /// The EVM configuration.
279    evm_config: C,
280}
281
282impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
283    for EngineApiTreeHandler<N, P, T, V, C>
284where
285    N: NodePrimitives,
286    C: Debug + ConfigureEvm<Primitives = N>,
287{
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        f.debug_struct("EngineApiTreeHandler")
290            .field("provider", &self.provider)
291            .field("consensus", &self.consensus)
292            .field("payload_validator", &self.payload_validator)
293            .field("state", &self.state)
294            .field("incoming_tx", &self.incoming_tx)
295            .field("persistence", &self.persistence)
296            .field("persistence_state", &self.persistence_state)
297            .field("backfill_sync_state", &self.backfill_sync_state)
298            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
299            .field("payload_builder", &self.payload_builder)
300            .field("config", &self.config)
301            .field("metrics", &self.metrics)
302            .field("engine_kind", &self.engine_kind)
303            .field("evm_config", &self.evm_config)
304            .finish()
305    }
306}
307
308impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
309where
310    N: NodePrimitives,
311    P: DatabaseProviderFactory
312        + BlockReader<Block = N::Block, Header = N::BlockHeader>
313        + StateProviderFactory
314        + StateReader<Receipt = N::Receipt>
315        + HashedPostStateProvider
316        + TrieReader
317        + Clone
318        + 'static,
319    <P as DatabaseProviderFactory>::Provider:
320        BlockReader<Block = N::Block, Header = N::BlockHeader>,
321    C: ConfigureEvm<Primitives = N> + 'static,
322    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
323    V: EngineValidator<T>,
324{
325    /// Creates a new [`EngineApiTreeHandler`].
326    #[expect(clippy::too_many_arguments)]
327    pub fn new(
328        provider: P,
329        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
330        payload_validator: V,
331        outgoing: UnboundedSender<EngineApiEvent<N>>,
332        state: EngineApiTreeState<N>,
333        canonical_in_memory_state: CanonicalInMemoryState<N>,
334        persistence: PersistenceHandle<N>,
335        persistence_state: PersistenceState,
336        payload_builder: PayloadBuilderHandle<T>,
337        config: TreeConfig,
338        engine_kind: EngineApiKind,
339        evm_config: C,
340    ) -> Self {
341        let (incoming_tx, incoming) = std::sync::mpsc::channel();
342
343        Self {
344            provider,
345            consensus,
346            payload_validator,
347            incoming,
348            outgoing,
349            persistence,
350            persistence_state,
351            backfill_sync_state: BackfillSyncState::Idle,
352            state,
353            canonical_in_memory_state,
354            payload_builder,
355            config,
356            metrics: Default::default(),
357            incoming_tx,
358            engine_kind,
359            evm_config,
360        }
361    }
362
363    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
364    /// own thread.
365    ///
366    /// Returns the sender through which incoming requests can be sent to the task and the receiver
367    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
368    #[expect(clippy::complexity)]
369    pub fn spawn_new(
370        provider: P,
371        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
372        payload_validator: V,
373        persistence: PersistenceHandle<N>,
374        payload_builder: PayloadBuilderHandle<T>,
375        canonical_in_memory_state: CanonicalInMemoryState<N>,
376        config: TreeConfig,
377        kind: EngineApiKind,
378        evm_config: C,
379    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
380    {
381        let best_block_number = provider.best_block_number().unwrap_or(0);
382        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
383
384        let persistence_state = PersistenceState {
385            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
386            rx: None,
387        };
388
389        let (tx, outgoing) = unbounded_channel();
390        let state = EngineApiTreeState::new(
391            config.block_buffer_limit(),
392            config.max_invalid_header_cache_length(),
393            header.num_hash(),
394            kind,
395        );
396
397        let task = Self::new(
398            provider,
399            consensus,
400            payload_validator,
401            tx,
402            state,
403            canonical_in_memory_state,
404            persistence,
405            persistence_state,
406            payload_builder,
407            config,
408            kind,
409            evm_config,
410        );
411        let incoming = task.incoming_tx.clone();
412        std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
413        (incoming, outgoing)
414    }
415
416    /// Returns a new [`Sender`] to send messages to this type.
417    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
418        self.incoming_tx.clone()
419    }
420
421    /// Run the engine API handler.
422    ///
423    /// This will block the current thread and process incoming messages.
424    pub fn run(mut self) {
425        loop {
426            match self.try_recv_engine_message() {
427                Ok(Some(msg)) => {
428                    debug!(target: "engine::tree", %msg, "received new engine message");
429                    if let Err(fatal) = self.on_engine_message(msg) {
430                        error!(target: "engine::tree", %fatal, "insert block fatal error");
431                        return
432                    }
433                }
434                Ok(None) => {
435                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
436                }
437                Err(_err) => {
438                    error!(target: "engine::tree", "Engine channel disconnected");
439                    return
440                }
441            }
442
443            if let Err(err) = self.advance_persistence() {
444                error!(target: "engine::tree", %err, "Advancing persistence failed");
445                return
446            }
447        }
448    }
449
450    /// Invoked when previously requested blocks were downloaded.
451    ///
452    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
453    /// will execute the first batch and send the remaining blocks back through the channel so that
454    /// block request processing isn't blocked for a long time.
455    fn on_downloaded(
456        &mut self,
457        mut blocks: Vec<SealedBlock<N::Block>>,
458    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
459        if blocks.is_empty() {
460            // nothing to execute
461            return Ok(None)
462        }
463
464        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
465        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
466        for block in blocks.drain(..batch) {
467            if let Some(event) = self.on_downloaded_block(block)? {
468                let needs_backfill = event.is_backfill_action();
469                self.on_tree_event(event)?;
470                if needs_backfill {
471                    // can exit early if backfill is needed
472                    return Ok(None)
473                }
474            }
475        }
476
477        // if we still have blocks to execute, send them as a followup request
478        if !blocks.is_empty() {
479            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
480        }
481
482        Ok(None)
483    }
484
485    /// When the Consensus layer receives a new block via the consensus gossip protocol,
486    /// the transactions in the block are sent to the execution layer in the form of a
487    /// [`PayloadTypes::ExecutionData`], for example
488    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
489    /// Execution layer executes the transactions and validates the state in the block header,
490    /// then passes validation data back to Consensus layer, that adds the block to the head of
491    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
492    /// network in the form of a "Beacon block".
493    ///
494    /// These responses should adhere to the [Engine API Spec for
495    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
496    ///
497    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
498    /// returns an error if an internal error occurred.
499    #[instrument(
500        level = "debug",
501        target = "engine::tree",
502        skip_all,
503        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
504    )]
505    fn on_new_payload(
506        &mut self,
507        payload: T::ExecutionData,
508    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
509        trace!(target: "engine::tree", "invoked new payload");
510
511        // start timing for the new payload process
512        let start = Instant::now();
513
514        // Ensures that the given payload does not violate any consensus rules that concern the
515        // block's layout, like:
516        //    - missing or invalid base fee
517        //    - invalid extra data
518        //    - invalid transactions
519        //    - incorrect hash
520        //    - the versioned hashes passed with the payload do not exactly match transaction
521        //      versioned hashes
522        //    - the block does not contain blob transactions if it is pre-cancun
523        //
524        // This validates the following engine API rule:
525        //
526        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
527        //    validation by taking the following steps:
528        //
529        //   1. Obtain the actual array by concatenating blob versioned hashes lists
530        //      (`tx.blob_versioned_hashes`) of each [blob
531        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
532        //      in the payload, respecting the order of inclusion. If the payload has no blob
533        //      transactions the expected array **MUST** be `[]`.
534        //
535        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
536        //      null}` if the expected and the actual arrays don't match.
537        //
538        // This validation **MUST** be instantly run in all cases even during active sync process.
539
540        let num_hash = payload.num_hash();
541        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
542        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
543
544        let block_hash = num_hash.hash;
545
546        // Check for invalid ancestors
547        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
548            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
549            return Ok(TreeOutcome::new(status));
550        }
551
552        // record pre-execution phase duration
553        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
554
555        let status = if self.backfill_sync_state.is_idle() {
556            self.try_insert_payload(payload)?
557        } else {
558            self.try_buffer_payload(payload)?
559        };
560
561        let mut outcome = TreeOutcome::new(status);
562        // if the block is valid and it is the current sync target head, make it canonical
563        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
564            // Only create the canonical event if this block isn't already the canonical head
565            if self.state.tree_state.canonical_block_hash() != block_hash {
566                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
567                    sync_target_head: block_hash,
568                }));
569            }
570        }
571
572        // record total newPayload duration
573        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
574
575        Ok(outcome)
576    }
577
578    /// Processes a payload during normal sync operation.
579    ///
580    /// Returns:
581    /// - `Valid`: Payload successfully validated and inserted
582    /// - `Syncing`: Parent missing, payload buffered for later
583    /// - Error status: Payload is invalid
584    #[instrument(level = "debug", target = "engine::tree", skip_all)]
585    fn try_insert_payload(
586        &mut self,
587        payload: T::ExecutionData,
588    ) -> Result<PayloadStatus, InsertBlockFatalError> {
589        let block_hash = payload.block_hash();
590        let num_hash = payload.num_hash();
591        let parent_hash = payload.parent_hash();
592        let mut latest_valid_hash = None;
593
594        match self.insert_payload(payload) {
595            Ok(status) => {
596                let status = match status {
597                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
598                        latest_valid_hash = Some(block_hash);
599                        self.try_connect_buffered_blocks(num_hash)?;
600                        PayloadStatusEnum::Valid
601                    }
602                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
603                        latest_valid_hash = Some(block_hash);
604                        PayloadStatusEnum::Valid
605                    }
606                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
607                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
608                        // not known to be invalid, but we don't know anything else
609                        PayloadStatusEnum::Syncing
610                    }
611                };
612
613                Ok(PayloadStatus::new(status, latest_valid_hash))
614            }
615            Err(error) => match error {
616                InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
617                InsertPayloadError::Payload(error) => {
618                    Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
619                }
620            },
621        }
622    }
623
624    /// Stores a payload for later processing during backfill sync.
625    ///
626    /// During backfill, the node lacks the state needed to validate payloads,
627    /// so they are buffered (stored in memory) until their parent blocks are synced.
628    ///
629    /// Returns:
630    /// - `Syncing`: Payload successfully buffered
631    /// - Error status: Payload is malformed or invalid
632    fn try_buffer_payload(
633        &mut self,
634        payload: T::ExecutionData,
635    ) -> Result<PayloadStatus, InsertBlockFatalError> {
636        let parent_hash = payload.parent_hash();
637        let num_hash = payload.num_hash();
638
639        match self.payload_validator.convert_payload_to_block(payload) {
640            // if the block is well-formed, buffer it for later
641            Ok(block) => {
642                if let Err(error) = self.buffer_block(block) {
643                    Ok(self.on_insert_block_error(error)?)
644                } else {
645                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
646                }
647            }
648            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
649        }
650    }
651
652    /// Returns the new chain for the given head.
653    ///
654    /// This also handles reorgs.
655    ///
656    /// Note: This does not update the tracked state and instead returns the new chain based on the
657    /// given head.
658    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
659        // get the executed new head block
660        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
661            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
662            self.metrics.engine.executed_new_block_cache_miss.increment(1);
663            return Ok(None)
664        };
665
666        let new_head_number = new_head_block.recovered_block().number();
667        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
668
669        let mut new_chain = vec![new_head_block.clone()];
670        let mut current_hash = new_head_block.recovered_block().parent_hash();
671        let mut current_number = new_head_number - 1;
672
673        // Walk back the new chain until we reach a block we know about
674        //
675        // This is only done for in-memory blocks, because we should not have persisted any blocks
676        // that are _above_ the current canonical head.
677        while current_number > current_canonical_number {
678            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
679            {
680                current_hash = block.recovered_block().parent_hash();
681                current_number -= 1;
682                new_chain.push(block);
683            } else {
684                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
685                // This should never happen as we're walking back a chain that should connect to
686                // the canonical chain
687                return Ok(None)
688            }
689        }
690
691        // If we have reached the current canonical head by walking back from the target, then we
692        // know this represents an extension of the canonical chain.
693        if current_hash == self.state.tree_state.current_canonical_head.hash {
694            new_chain.reverse();
695
696            // Simple extension of the current chain
697            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
698        }
699
700        // We have a reorg. Walk back both chains to find the fork point.
701        let mut old_chain = Vec::new();
702        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
703
704        // If the canonical chain is ahead of the new chain,
705        // gather all blocks until new head number.
706        while current_canonical_number > current_number {
707            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
708                old_hash = block.recovered_block().parent_hash();
709                old_chain.push(block);
710                current_canonical_number -= 1;
711            } else {
712                // This shouldn't happen as we're walking back the canonical chain
713                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
714                return Ok(None)
715            }
716        }
717
718        // Both new and old chain pointers are now at the same height.
719        debug_assert_eq!(current_number, current_canonical_number);
720
721        // Walk both chains from specified hashes at same height until
722        // a common ancestor (fork block) is reached.
723        while old_hash != current_hash {
724            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
725                old_hash = block.recovered_block().parent_hash();
726                old_chain.push(block);
727            } else {
728                // This shouldn't happen as we're walking back the canonical chain
729                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
730                return Ok(None)
731            }
732
733            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
734            {
735                current_hash = block.recovered_block().parent_hash();
736                new_chain.push(block);
737            } else {
738                // This shouldn't happen as we've already walked this path
739                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
740                return Ok(None)
741            }
742        }
743        new_chain.reverse();
744        old_chain.reverse();
745
746        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
747    }
748
749    /// Updates the latest block state to the specified canonical ancestor.
750    ///
751    /// This method ensures that the latest block tracks the given canonical header by resetting
752    ///
753    /// # Arguments
754    /// * `canonical_header` - The canonical header to set as the new head
755    ///
756    /// # Returns
757    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
758    ///
759    /// Caution: This unwinds the canonical chain
760    fn update_latest_block_to_canonical_ancestor(
761        &mut self,
762        canonical_header: &SealedHeader<N::BlockHeader>,
763    ) -> ProviderResult<()> {
764        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
765        let current_head_number = self.state.tree_state.canonical_block_number();
766        let new_head_number = canonical_header.number();
767        let new_head_hash = canonical_header.hash();
768
769        // Update tree state with the new canonical head
770        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
771
772        // Handle the state update based on whether this is an unwind scenario
773        if new_head_number < current_head_number {
774            debug!(
775                target: "engine::tree",
776                current_head = current_head_number,
777                new_head = new_head_number,
778                new_head_hash = ?new_head_hash,
779                "FCU unwind detected: reverting to canonical ancestor"
780            );
781
782            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
783        } else {
784            debug!(
785                target: "engine::tree",
786                previous_head = current_head_number,
787                new_head = new_head_number,
788                new_head_hash = ?new_head_hash,
789                "Advancing latest block to canonical ancestor"
790            );
791            self.handle_chain_advance_or_same_height(canonical_header)
792        }
793    }
794
795    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
796    /// to the canonical header
797    fn handle_canonical_chain_unwind(
798        &self,
799        current_head_number: u64,
800        canonical_header: &SealedHeader<N::BlockHeader>,
801    ) -> ProviderResult<()> {
802        let new_head_number = canonical_header.number();
803        debug!(
804            target: "engine::tree",
805            from = current_head_number,
806            to = new_head_number,
807            "Handling unwind: collecting blocks to remove from in-memory state"
808        );
809
810        // Collect blocks that need to be removed from memory
811        let old_blocks =
812            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
813
814        // Load and apply the canonical ancestor block
815        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
816    }
817
818    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
819    fn collect_blocks_for_canonical_unwind(
820        &self,
821        new_head_number: u64,
822        current_head_number: u64,
823    ) -> Vec<ExecutedBlock<N>> {
824        let mut old_blocks = Vec::new();
825
826        for block_num in (new_head_number + 1)..=current_head_number {
827            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
828                let executed_block = block_state.block_ref().clone();
829                old_blocks.push(executed_block);
830                debug!(
831                    target: "engine::tree",
832                    block_number = block_num,
833                    "Collected block for removal from in-memory state"
834                );
835            }
836        }
837
838        if old_blocks.is_empty() {
839            debug!(
840                target: "engine::tree",
841                "No blocks found in memory to remove, will clear and reset state"
842            );
843        }
844
845        old_blocks
846    }
847
848    /// Applies the canonical ancestor block via a reorg operation.
849    fn apply_canonical_ancestor_via_reorg(
850        &self,
851        canonical_header: &SealedHeader<N::BlockHeader>,
852        old_blocks: Vec<ExecutedBlock<N>>,
853    ) -> ProviderResult<()> {
854        let new_head_hash = canonical_header.hash();
855        let new_head_number = canonical_header.number();
856
857        // Try to load the canonical ancestor's block
858        match self.canonical_block_by_hash(new_head_hash)? {
859            Some(executed_block) => {
860                // Perform the reorg to properly handle the unwind
861                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
862                    new: vec![executed_block],
863                    old: old_blocks,
864                });
865
866                // CRITICAL: Update the canonical head after the reorg
867                // This ensures get_canonical_head() returns the correct block
868                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
869
870                debug!(
871                    target: "engine::tree",
872                    block_number = new_head_number,
873                    block_hash = ?new_head_hash,
874                    "Successfully loaded canonical ancestor into memory via reorg"
875                );
876            }
877            None => {
878                // Fallback: update header only if block cannot be found
879                warn!(
880                    target: "engine::tree",
881                    block_hash = ?new_head_hash,
882                    "Could not find canonical ancestor block, updating header only"
883                );
884                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
885            }
886        }
887
888        Ok(())
889    }
890
891    /// Handles chain advance or same height scenarios.
892    fn handle_chain_advance_or_same_height(
893        &self,
894        canonical_header: &SealedHeader<N::BlockHeader>,
895    ) -> ProviderResult<()> {
896        let new_head_number = canonical_header.number();
897        let new_head_hash = canonical_header.hash();
898
899        // Update the canonical head header
900        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
901
902        // Load the block into memory if it's not already present
903        self.ensure_block_in_memory(new_head_number, new_head_hash)
904    }
905
906    /// Ensures a block is loaded into memory if not already present.
907    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
908        // Check if block is already in memory
909        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
910            return Ok(());
911        }
912
913        // Try to load the block from storage
914        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
915            self.canonical_in_memory_state
916                .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
917
918            debug!(
919                target: "engine::tree",
920                block_number,
921                block_hash = ?block_hash,
922                "Added canonical block to in-memory state"
923            );
924        }
925
926        Ok(())
927    }
928
929    /// Determines if the given block is part of a fork by checking that these
930    /// conditions are true:
931    /// * walking back from the target hash to verify that the target hash is not part of an
932    ///   extension of the canonical chain.
933    /// * walking back from the current head to verify that the target hash is not already part of
934    ///   the canonical chain.
935    ///
936    /// The header is required as an arg, because we might be checking that the header is a fork
937    /// block before it's in the tree state and before it's in the database.
938    fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
939        let target_hash = target.block.hash;
940        // verify that the given hash is not part of an extension of the canon chain.
941        let canonical_head = self.state.tree_state.canonical_head();
942        let mut current_hash;
943        let mut current_block = target;
944        loop {
945            if current_block.block.hash == canonical_head.hash {
946                return Ok(false)
947            }
948            // We already passed the canonical head
949            if current_block.block.number <= canonical_head.number {
950                break
951            }
952            current_hash = current_block.parent;
953
954            let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
955            current_block = next_block.block_with_parent();
956        }
957
958        // verify that the given hash is not already part of canonical chain stored in memory
959        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
960            return Ok(false)
961        }
962
963        // verify that the given hash is not already part of persisted canonical chain
964        if self.provider.block_number(target_hash)?.is_some() {
965            return Ok(false)
966        }
967
968        Ok(true)
969    }
970
971    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
972    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
973    /// chain.
974    ///
975    /// These responses should adhere to the [Engine API Spec for
976    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
977    ///
978    /// Returns an error if an internal error occurred like a database error.
979    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
980    fn on_forkchoice_updated(
981        &mut self,
982        state: ForkchoiceState,
983        attrs: Option<T::PayloadAttributes>,
984        version: EngineApiMessageVersion,
985    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
986        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
987
988        // Record metrics
989        self.record_forkchoice_metrics();
990
991        // Pre-validation of forkchoice state
992        if let Some(early_result) = self.validate_forkchoice_state(state)? {
993            return Ok(TreeOutcome::new(early_result));
994        }
995
996        // Return early if we are on the correct fork
997        if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
998            return Ok(result);
999        }
1000
1001        // Attempt to apply a chain update when the head differs from our canonical chain.
1002        // This handles reorgs and chain extensions by making the specified head canonical.
1003        if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1004            return Ok(result);
1005        }
1006
1007        // Fallback that ensures to catch up to the network's state.
1008        self.handle_missing_block(state)
1009    }
1010
1011    /// Records metrics for forkchoice updated calls
1012    fn record_forkchoice_metrics(&self) {
1013        self.canonical_in_memory_state.on_forkchoice_update_received();
1014    }
1015
1016    /// Pre-validates the forkchoice state and returns early if validation fails.
1017    ///
1018    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1019    /// returned. Returns `None` if validation passes and processing should continue.
1020    fn validate_forkchoice_state(
1021        &mut self,
1022        state: ForkchoiceState,
1023    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1024        if state.head_block_hash.is_zero() {
1025            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1026        }
1027
1028        // Check if the new head hash is connected to any ancestor that we previously marked as
1029        // invalid
1030        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1031        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1032            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1033        }
1034
1035        if !self.backfill_sync_state.is_idle() {
1036            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1037            // exclusive access to the database
1038            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1039            return Ok(Some(OnForkChoiceUpdated::syncing()));
1040        }
1041
1042        Ok(None)
1043    }
1044
1045    /// Handles the case where the forkchoice head is already canonical.
1046    ///
1047    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1048    /// processing is complete. Returns `None` if the head is not canonical and processing
1049    /// should continue.
1050    fn handle_canonical_head(
1051        &self,
1052        state: ForkchoiceState,
1053        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1054        version: EngineApiMessageVersion,
1055    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1056        // Process the forkchoice update by trying to make the head block canonical
1057        //
1058        // We can only process this forkchoice update if:
1059        // - we have the `head` block
1060        // - the head block is part of a chain that is connected to the canonical chain. This
1061        //   includes reorgs.
1062        //
1063        // Performing a FCU involves:
1064        // - marking the FCU's head block as canonical
1065        // - updating in memory state to reflect the new canonical chain
1066        // - updating canonical state trackers
1067        // - emitting a canonicalization event for the new chain (including reorg)
1068        // - if we have payload attributes, delegate them to the payload service
1069
1070        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1071            return Ok(None);
1072        }
1073
1074        trace!(target: "engine::tree", "fcu head hash is already canonical");
1075
1076        // Update the safe and finalized blocks and ensure their values are valid
1077        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1078            // safe or finalized hashes are invalid
1079            return Ok(Some(TreeOutcome::new(outcome)));
1080        }
1081
1082        // Process payload attributes if the head is already canonical
1083        if let Some(attr) = attrs {
1084            let tip = self
1085                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1086                .ok_or_else(|| {
1087                    // If we can't find the canonical block, then something is wrong and we need
1088                    // to return an error
1089                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1090                })?;
1091            // Clone only when we actually need to process the attributes
1092            let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1093            return Ok(Some(TreeOutcome::new(updated)));
1094        }
1095
1096        // The head block is already canonical
1097        let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1098            PayloadStatusEnum::Valid,
1099            Some(state.head_block_hash),
1100        )));
1101        Ok(Some(outcome))
1102    }
1103
1104    /// Applies chain update for the new head block and processes payload attributes.
1105    ///
1106    /// This method handles the case where the forkchoice head differs from our current canonical
1107    /// head. It attempts to make the specified head block canonical by:
1108    /// - Checking if the head is already part of the canonical chain
1109    /// - Applying chain reorganizations (reorgs) if necessary
1110    /// - Processing payload attributes if provided
1111    /// - Returning the appropriate forkchoice update response
1112    ///
1113    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1114    /// Returns `None` if no chain update was needed or possible.
1115    fn apply_chain_update(
1116        &mut self,
1117        state: ForkchoiceState,
1118        attrs: &Option<T::PayloadAttributes>,
1119        version: EngineApiMessageVersion,
1120    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1121        // Check if the head is already part of the canonical chain
1122        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1123            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1124
1125            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1126            // own chain at will, so we need to always trigger a new payload job if requested.
1127            if self.engine_kind.is_opstack() ||
1128                self.config.always_process_payload_attributes_on_canonical_head()
1129            {
1130                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1131                // part of the canonical chain. We need to update the latest block state to reflect
1132                // the canonical ancestor. This ensures that state providers and the transaction
1133                // pool operate with the correct chain state after forkchoice update processing, and
1134                // new payloads built on the reorg'd head will be added to the tree immediately.
1135                if self.config.unwind_canonical_header() {
1136                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1137                }
1138
1139                if let Some(attr) = attrs {
1140                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1141                    // Clone only when we actually need to process the attributes
1142                    let updated = self.process_payload_attributes(
1143                        attr.clone(),
1144                        &canonical_header,
1145                        state,
1146                        version,
1147                    );
1148                    return Ok(Some(TreeOutcome::new(updated)));
1149                }
1150            }
1151
1152            // According to the Engine API specification, client software MAY skip an update of the
1153            // forkchoice state and MUST NOT begin a payload build process if
1154            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1155            // of canonical chain, i.e. the ancestor passed payload validation process
1156            // and deemed `VALID`. In the case of such an event, client software MUST
1157            // return `{payloadStatus: {status: VALID, latestValidHash:
1158            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1159
1160            // The head block is already canonical and we're not processing payload attributes,
1161            // so we're not triggering a payload job and can return right away
1162
1163            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1164                PayloadStatusEnum::Valid,
1165                Some(state.head_block_hash),
1166            )));
1167            return Ok(Some(outcome));
1168        }
1169
1170        // Ensure we can apply a new chain update for the head block
1171        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1172            let tip = chain_update.tip().clone_sealed_header();
1173            self.on_canonical_chain_update(chain_update);
1174
1175            // Update the safe and finalized blocks and ensure their values are valid
1176            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1177                // safe or finalized hashes are invalid
1178                return Ok(Some(TreeOutcome::new(outcome)));
1179            }
1180
1181            if let Some(attr) = attrs {
1182                // Clone only when we actually need to process the attributes
1183                let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1184                return Ok(Some(TreeOutcome::new(updated)));
1185            }
1186
1187            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1188                PayloadStatusEnum::Valid,
1189                Some(state.head_block_hash),
1190            )));
1191            return Ok(Some(outcome));
1192        }
1193
1194        Ok(None)
1195    }
1196
1197    /// Handles the case where the head block is missing and needs to be downloaded.
1198    ///
1199    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1200    /// Returns a `TreeOutcome` with syncing status and download event.
1201    fn handle_missing_block(
1202        &self,
1203        state: ForkchoiceState,
1204    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1205        // We don't have the block to perform the forkchoice update
1206        // We assume the FCU is valid and at least the head is missing,
1207        // so we need to start syncing to it
1208        //
1209        // find the appropriate target to sync to, if we don't have the safe block hash then we
1210        // start syncing to the safe block via backfill first
1211        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1212        // check that safe block is valid and missing
1213        !state.safe_block_hash.is_zero() &&
1214        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1215        {
1216            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1217            state.safe_block_hash
1218        } else {
1219            state.head_block_hash
1220        };
1221
1222        let target = self.lowest_buffered_ancestor_or(target);
1223        trace!(target: "engine::tree", %target, "downloading missing block");
1224
1225        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1226            PayloadStatusEnum::Syncing,
1227        )))
1228        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1229    }
1230
1231    /// Attempts to receive the next engine request.
1232    ///
1233    /// If there's currently no persistence action in progress, this will block until a new request
1234    /// is received. If there's a persistence action in progress, this will try to receive the
1235    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1236    /// received in time.
1237    ///
1238    /// Returns an error if the engine channel is disconnected.
1239    #[expect(clippy::type_complexity)]
1240    fn try_recv_engine_message(
1241        &self,
1242    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1243        if self.persistence_state.in_progress() {
1244            // try to receive the next request with a timeout to not block indefinitely
1245            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1246                Ok(msg) => Ok(Some(msg)),
1247                Err(err) => match err {
1248                    RecvTimeoutError::Timeout => Ok(None),
1249                    RecvTimeoutError::Disconnected => Err(RecvError),
1250                },
1251            }
1252        } else {
1253            self.incoming.recv().map(Some)
1254        }
1255    }
1256
1257    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1258    /// the current persistence action while we're removing blocks.
1259    fn remove_blocks(&mut self, new_tip_num: u64) {
1260        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1261        if new_tip_num < self.persistence_state.last_persisted_block.number {
1262            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1263            let (tx, rx) = oneshot::channel();
1264            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1265            self.persistence_state.start_remove(new_tip_num, rx);
1266        }
1267    }
1268
1269    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1270    /// the current persistence action while we're saving blocks.
1271    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1272        if blocks_to_persist.is_empty() {
1273            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1274            return
1275        }
1276
1277        // NOTE: checked non-empty above
1278        let highest_num_hash = blocks_to_persist
1279            .iter()
1280            .max_by_key(|block| block.recovered_block().number())
1281            .map(|b| b.recovered_block().num_hash())
1282            .expect("Checked non-empty persisting blocks");
1283
1284        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1285        let (tx, rx) = oneshot::channel();
1286        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1287
1288        self.persistence_state.start_save(highest_num_hash, rx);
1289    }
1290
1291    /// Attempts to advance the persistence state.
1292    ///
1293    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1294    /// or send a new persistence action if necessary.
1295    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1296        if self.persistence_state.in_progress() {
1297            let (mut rx, start_time, current_action) = self
1298                .persistence_state
1299                .rx
1300                .take()
1301                .expect("if a persistence task is in progress Receiver must be Some");
1302            // Check if persistence has complete
1303            match rx.try_recv() {
1304                Ok(last_persisted_hash_num) => {
1305                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1306                    let Some(BlockNumHash {
1307                        hash: last_persisted_block_hash,
1308                        number: last_persisted_block_number,
1309                    }) = last_persisted_hash_num
1310                    else {
1311                        // if this happened, then we persisted no blocks because we sent an
1312                        // empty vec of blocks
1313                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1314                        return Ok(())
1315                    };
1316
1317                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1318                    self.persistence_state
1319                        .finish(last_persisted_block_hash, last_persisted_block_number);
1320                    self.on_new_persisted_block()?;
1321                }
1322                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1323                Err(TryRecvError::Empty) => {
1324                    self.persistence_state.rx = Some((rx, start_time, current_action))
1325                }
1326            }
1327        }
1328
1329        if !self.persistence_state.in_progress() {
1330            if let Some(new_tip_num) = self.find_disk_reorg()? {
1331                self.remove_blocks(new_tip_num)
1332            } else if self.should_persist() {
1333                let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1334                self.persist_blocks(blocks_to_persist);
1335            }
1336        }
1337
1338        Ok(())
1339    }
1340
1341    /// Handles a message from the engine.
1342    fn on_engine_message(
1343        &mut self,
1344        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1345    ) -> Result<(), InsertBlockFatalError> {
1346        match msg {
1347            FromEngine::Event(event) => match event {
1348                FromOrchestrator::BackfillSyncStarted => {
1349                    debug!(target: "engine::tree", "received backfill sync started event");
1350                    self.backfill_sync_state = BackfillSyncState::Active;
1351                }
1352                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1353                    self.on_backfill_sync_finished(ctrl)?;
1354                }
1355            },
1356            FromEngine::Request(request) => {
1357                match request {
1358                    EngineApiRequest::InsertExecutedBlock(block) => {
1359                        let block_num_hash = block.recovered_block().num_hash();
1360                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1361                            // outdated block that can be skipped
1362                            return Ok(())
1363                        }
1364
1365                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1366                        let now = Instant::now();
1367
1368                        // if the parent is the canonical head, we can insert the block as the
1369                        // pending block
1370                        if self.state.tree_state.canonical_block_hash() ==
1371                            block.recovered_block().parent_hash()
1372                        {
1373                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1374                            self.canonical_in_memory_state.set_pending_block(block.clone());
1375                        }
1376
1377                        self.state.tree_state.insert_executed(block.clone());
1378                        self.payload_validator.on_inserted_executed_block(block.clone());
1379                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1380                        self.emit_event(EngineApiEvent::BeaconConsensus(
1381                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1382                        ));
1383                    }
1384                    EngineApiRequest::Beacon(request) => {
1385                        match request {
1386                            BeaconEngineMessage::ForkchoiceUpdated {
1387                                state,
1388                                payload_attrs,
1389                                tx,
1390                                version,
1391                            } => {
1392                                let has_attrs = payload_attrs.is_some();
1393
1394                                let start = Instant::now();
1395                                let mut output =
1396                                    self.on_forkchoice_updated(state, payload_attrs, version);
1397
1398                                if let Ok(res) = &mut output {
1399                                    // track last received forkchoice state
1400                                    self.state
1401                                        .forkchoice_state_tracker
1402                                        .set_latest(state, res.outcome.forkchoice_status());
1403
1404                                    // emit an event about the handled FCU
1405                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1406                                        state,
1407                                        res.outcome.forkchoice_status(),
1408                                    ));
1409
1410                                    // handle the event if any
1411                                    self.on_maybe_tree_event(res.event.take())?;
1412                                }
1413
1414                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1415                                    start,
1416                                    &mut self.metrics.engine.new_payload.latest_at,
1417                                    has_attrs,
1418                                    &output,
1419                                );
1420
1421                                if let Err(err) =
1422                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1423                                {
1424                                    self.metrics
1425                                        .engine
1426                                        .failed_forkchoice_updated_response_deliveries
1427                                        .increment(1);
1428                                    error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1429                                }
1430                            }
1431                            BeaconEngineMessage::NewPayload { payload, tx } => {
1432                                let start = Instant::now();
1433                                let gas_used = payload.gas_used();
1434                                let num_hash = payload.num_hash();
1435                                let mut output = self.on_new_payload(payload);
1436                                self.metrics
1437                                    .engine
1438                                    .new_payload
1439                                    .update_response_metrics(start, &output, gas_used);
1440
1441                                let maybe_event =
1442                                    output.as_mut().ok().and_then(|out| out.event.take());
1443
1444                                // emit response
1445                                if let Err(err) =
1446                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1447                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1448                                    }))
1449                                {
1450                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1451                                    self.metrics
1452                                        .engine
1453                                        .failed_new_payload_response_deliveries
1454                                        .increment(1);
1455                                }
1456
1457                                // handle the event if any
1458                                self.on_maybe_tree_event(maybe_event)?;
1459                            }
1460                        }
1461                    }
1462                }
1463            }
1464            FromEngine::DownloadedBlocks(blocks) => {
1465                if let Some(event) = self.on_downloaded(blocks)? {
1466                    self.on_tree_event(event)?;
1467                }
1468            }
1469        }
1470        Ok(())
1471    }
1472
1473    /// Invoked if the backfill sync has finished to target.
1474    ///
1475    /// At this point we consider the block synced to the backfill target.
1476    ///
1477    /// Checks the tracked finalized block against the block on disk and requests another backfill
1478    /// run if the distance to the tip exceeds the threshold for another backfill run.
1479    ///
1480    /// This will also do the necessary housekeeping of the tree state, this includes:
1481    ///  - removing all blocks below the backfill height
1482    ///  - resetting the canonical in-memory state
1483    ///
1484    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1485    /// target block.
1486    fn on_backfill_sync_finished(
1487        &mut self,
1488        ctrl: ControlFlow,
1489    ) -> Result<(), InsertBlockFatalError> {
1490        debug!(target: "engine::tree", "received backfill sync finished event");
1491        self.backfill_sync_state = BackfillSyncState::Idle;
1492
1493        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1494        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1495            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1496            // update the `invalid_headers` cache with the new invalid header
1497            self.state.invalid_headers.insert(**bad_block);
1498
1499            // if this was an unwind then the target is the new height
1500            Some(*target)
1501        } else {
1502            // backfill height is the block number that the backfill finished at
1503            ctrl.block_number()
1504        };
1505
1506        // backfill height is the block number that the backfill finished at
1507        let Some(backfill_height) = backfill_height else { return Ok(()) };
1508
1509        // state house keeping after backfill sync
1510        // remove all executed blocks below the backfill height
1511        //
1512        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1513        // before that
1514        let Some(backfill_num_hash) = self
1515            .provider
1516            .block_hash(backfill_height)?
1517            .map(|hash| BlockNumHash { hash, number: backfill_height })
1518        else {
1519            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1520            return Ok(())
1521        };
1522
1523        if ctrl.is_unwind() {
1524            // the node reset so we need to clear everything above that height so that backfill
1525            // height is the new canonical block.
1526            self.state.tree_state.reset(backfill_num_hash)
1527        } else {
1528            self.state.tree_state.remove_until(
1529                backfill_num_hash,
1530                self.persistence_state.last_persisted_block.hash,
1531                Some(backfill_num_hash),
1532            );
1533        }
1534
1535        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1536        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1537
1538        // remove all buffered blocks below the backfill height
1539        self.state.buffer.remove_old_blocks(backfill_height);
1540        // we remove all entries because now we're synced to the backfill target and consider this
1541        // the canonical chain
1542        self.canonical_in_memory_state.clear_state();
1543
1544        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1545            // update the tracked chain height, after backfill sync both the canonical height and
1546            // persisted height are the same
1547            self.state.tree_state.set_canonical_head(new_head.num_hash());
1548            self.persistence_state.finish(new_head.hash(), new_head.number());
1549
1550            // update the tracked canonical head
1551            self.canonical_in_memory_state.set_canonical_head(new_head);
1552        }
1553
1554        // check if we need to run backfill again by comparing the most recent finalized height to
1555        // the backfill height
1556        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1557        else {
1558            return Ok(())
1559        };
1560        if sync_target_state.finalized_block_hash.is_zero() {
1561            // no finalized block, can't check distance
1562            return Ok(())
1563        }
1564        // get the block number of the finalized block, if we have it
1565        let newest_finalized = self
1566            .state
1567            .buffer
1568            .block(&sync_target_state.finalized_block_hash)
1569            .map(|block| block.number());
1570
1571        // The block number that the backfill finished at - if the progress or newest
1572        // finalized is None then we can't check the distance anyways.
1573        //
1574        // If both are Some, we perform another distance check and return the desired
1575        // backfill target
1576        if let Some(backfill_target) =
1577            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1578                // Determines whether or not we should run backfill again, in case
1579                // the new gap is still large enough and requires running backfill again
1580                self.backfill_sync_target(progress, finalized_number, None)
1581            })
1582        {
1583            // request another backfill run
1584            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1585                backfill_target.into(),
1586            )));
1587            return Ok(())
1588        };
1589
1590        // Check if there are more blocks to sync between current head and FCU target
1591        if let Some(lowest_buffered) =
1592            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1593        {
1594            let current_head_num = self.state.tree_state.current_canonical_head.number;
1595            let target_head_num = lowest_buffered.number();
1596
1597            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1598            {
1599                // There are blocks between current head and FCU target, download them
1600                debug!(
1601                    target: "engine::tree",
1602                    %current_head_num,
1603                    %target_head_num,
1604                    %distance,
1605                    "Backfill complete, downloading remaining blocks to reach FCU target"
1606                );
1607
1608                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1609                    lowest_buffered.parent_hash(),
1610                    distance,
1611                )));
1612                return Ok(());
1613            }
1614        }
1615
1616        // try to close the gap by executing buffered blocks that are child blocks of the new head
1617        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1618    }
1619
1620    /// Attempts to make the given target canonical.
1621    ///
1622    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1623    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1624        if let Some(chain_update) = self.on_new_head(target)? {
1625            self.on_canonical_chain_update(chain_update);
1626        }
1627
1628        Ok(())
1629    }
1630
1631    /// Convenience function to handle an optional tree event.
1632    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1633        if let Some(event) = event {
1634            self.on_tree_event(event)?;
1635        }
1636
1637        Ok(())
1638    }
1639
1640    /// Handles a tree event.
1641    ///
1642    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1643    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1644        match event {
1645            TreeEvent::TreeAction(action) => match action {
1646                TreeAction::MakeCanonical { sync_target_head } => {
1647                    self.make_canonical(sync_target_head)?;
1648                }
1649            },
1650            TreeEvent::BackfillAction(action) => {
1651                self.emit_event(EngineApiEvent::BackfillAction(action));
1652            }
1653            TreeEvent::Download(action) => {
1654                self.emit_event(EngineApiEvent::Download(action));
1655            }
1656        }
1657
1658        Ok(())
1659    }
1660
1661    /// Emits an outgoing event to the engine.
1662    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1663        let event = event.into();
1664
1665        if event.is_backfill_action() {
1666            debug_assert_eq!(
1667                self.backfill_sync_state,
1668                BackfillSyncState::Idle,
1669                "backfill action should only be emitted when backfill is idle"
1670            );
1671
1672            if self.persistence_state.in_progress() {
1673                // backfill sync and persisting data are mutually exclusive, so we can't start
1674                // backfill while we're still persisting
1675                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1676                return
1677            }
1678
1679            self.backfill_sync_state = BackfillSyncState::Pending;
1680            self.metrics.engine.pipeline_runs.increment(1);
1681            debug!(target: "engine::tree", "emitting backfill action event");
1682        }
1683
1684        let _ = self.outgoing.send(event).inspect_err(
1685            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1686        );
1687    }
1688
1689    /// Returns true if the canonical chain length minus the last persisted
1690    /// block is greater than or equal to the persistence threshold and
1691    /// backfill is not running.
1692    pub const fn should_persist(&self) -> bool {
1693        if !self.backfill_sync_state.is_idle() {
1694            // can't persist if backfill is running
1695            return false
1696        }
1697
1698        let min_block = self.persistence_state.last_persisted_block.number;
1699        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1700            self.config.persistence_threshold()
1701    }
1702
1703    /// Returns a batch of consecutive canonical blocks to persist in the range
1704    /// `(last_persisted_number .. canonical_head - threshold]`. The expected
1705    /// order is oldest -> newest.
1706    fn get_canonical_blocks_to_persist(
1707        &self,
1708    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1709        // We will calculate the state root using the database, so we need to be sure there are no
1710        // changes
1711        debug_assert!(!self.persistence_state.in_progress());
1712
1713        let mut blocks_to_persist = Vec::new();
1714        let mut current_hash = self.state.tree_state.canonical_block_hash();
1715        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1716        let canonical_head_number = self.state.tree_state.canonical_block_number();
1717
1718        // Persist only up to block buffer target
1719        let target_number =
1720            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1721
1722        debug!(
1723            target: "engine::tree",
1724            ?current_hash,
1725            ?last_persisted_number,
1726            ?canonical_head_number,
1727            ?target_number,
1728            "Returning canonical blocks to persist"
1729        );
1730        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1731            if block.recovered_block().number() <= last_persisted_number {
1732                break;
1733            }
1734
1735            if block.recovered_block().number() <= target_number {
1736                blocks_to_persist.push(block.clone());
1737            }
1738
1739            current_hash = block.recovered_block().parent_hash();
1740        }
1741
1742        // Reverse the order so that the oldest block comes first
1743        blocks_to_persist.reverse();
1744
1745        Ok(blocks_to_persist)
1746    }
1747
1748    /// This clears the blocks from the in-memory tree state that have been persisted to the
1749    /// database.
1750    ///
1751    /// This also updates the canonical in-memory state to reflect the newest persisted block
1752    /// height.
1753    ///
1754    /// Assumes that `finish` has been called on the `persistence_state` at least once
1755    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1756        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1757        // state.
1758        if let Some(remove_above) = self.find_disk_reorg()? {
1759            self.remove_blocks(remove_above);
1760            return Ok(())
1761        }
1762
1763        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1764        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1765        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1766            number: self.persistence_state.last_persisted_block.number,
1767            hash: self.persistence_state.last_persisted_block.hash,
1768        });
1769        Ok(())
1770    }
1771
1772    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1773    ///
1774    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
1775    /// or the database. If the required historical data (such as trie change sets) has been
1776    /// pruned for a given block, this operation will return an error. On archive nodes, it
1777    /// can retrieve any block.
1778    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1779        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1780        // check memory first
1781        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1782            return Ok(Some(block.clone()))
1783        }
1784
1785        let (block, senders) = self
1786            .provider
1787            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1788            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1789            .split_sealed();
1790        let execution_output = self
1791            .provider
1792            .get_state(block.header().number())?
1793            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1794        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1795        let trie_updates = self.provider.get_block_trie_updates(block.number())?;
1796
1797        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
1798        let sorted_trie_updates = Arc::new(trie_updates);
1799        // Skip building trie input and anchor for DB-loaded blocks.
1800        let trie_data =
1801            ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
1802
1803        Ok(Some(ExecutedBlock::new(
1804            Arc::new(RecoveredBlock::new_sealed(block, senders)),
1805            Arc::new(execution_output),
1806            trie_data,
1807        )))
1808    }
1809
1810    /// Return sealed block header from in-memory state or database by hash.
1811    fn sealed_header_by_hash(
1812        &self,
1813        hash: B256,
1814    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1815        // check memory first
1816        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1817
1818        if header.is_some() {
1819            Ok(header)
1820        } else {
1821            self.provider.sealed_header_by_hash(hash)
1822        }
1823    }
1824
1825    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1826    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1827    /// not exist in the buffer, this returns the hash that is passed in.
1828    ///
1829    /// Returns the parent hash of the block itself if the block is buffered and has no other
1830    /// buffered ancestors.
1831    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1832        self.state
1833            .buffer
1834            .lowest_ancestor(&hash)
1835            .map(|block| block.parent_hash())
1836            .unwrap_or_else(|| hash)
1837    }
1838
1839    /// If validation fails, the response MUST contain the latest valid hash:
1840    ///
1841    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1842    ///     conditions:
1843    ///     - It is fully validated and deemed VALID
1844    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1845    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1846    ///     conditions are satisfied by a `PoW` block.
1847    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1848    ///     the above conditions.
1849    fn latest_valid_hash_for_invalid_payload(
1850        &mut self,
1851        parent_hash: B256,
1852    ) -> ProviderResult<Option<B256>> {
1853        // Check if parent exists in side chain or in canonical chain.
1854        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1855            return Ok(Some(parent_hash))
1856        }
1857
1858        // iterate over ancestors in the invalid cache
1859        // until we encounter the first valid ancestor
1860        let mut current_hash = parent_hash;
1861        let mut current_block = self.state.invalid_headers.get(&current_hash);
1862        while let Some(block_with_parent) = current_block {
1863            current_hash = block_with_parent.parent;
1864            current_block = self.state.invalid_headers.get(&current_hash);
1865
1866            // If current_header is None, then the current_hash does not have an invalid
1867            // ancestor in the cache, check its presence in blockchain tree
1868            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1869                return Ok(Some(current_hash))
1870            }
1871        }
1872        Ok(None)
1873    }
1874
1875    /// Prepares the invalid payload response for the given hash, checking the
1876    /// database for the parent hash and populating the payload status with the latest valid hash
1877    /// according to the engine api spec.
1878    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1879        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1880        // PoW block, which we need to identify by looking at the parent's block difficulty
1881        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1882            !parent.difficulty().is_zero()
1883        {
1884            parent_hash = B256::ZERO;
1885        }
1886
1887        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1888        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1889            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1890        })
1891        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1892    }
1893
1894    /// Returns true if the given hash is the last received sync target block.
1895    ///
1896    /// See [`ForkchoiceStateTracker::sync_target_state`]
1897    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1898        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1899            return target.head_block_hash == block_hash
1900        }
1901        false
1902    }
1903
1904    /// Returns true if the given hash is part of the last received sync target fork choice update.
1905    ///
1906    /// See [`ForkchoiceStateTracker::sync_target_state`]
1907    fn is_any_sync_target(&self, block_hash: B256) -> bool {
1908        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1909            return target.contains(block_hash)
1910        }
1911        false
1912    }
1913
1914    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1915    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1916    ///
1917    /// Returns a payload status response according to the engine API spec if the block is known to
1918    /// be invalid.
1919    fn check_invalid_ancestor_with_head(
1920        &mut self,
1921        check: B256,
1922        head: &SealedBlock<N::Block>,
1923    ) -> ProviderResult<Option<PayloadStatus>> {
1924        // check if the check hash was previously marked as invalid
1925        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1926
1927        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1928    }
1929
1930    /// Invoked when a new payload received is invalid.
1931    fn on_invalid_new_payload(
1932        &mut self,
1933        head: SealedBlock<N::Block>,
1934        invalid: BlockWithParent,
1935    ) -> ProviderResult<PayloadStatus> {
1936        // populate the latest valid hash field
1937        let status = self.prepare_invalid_response(invalid.parent)?;
1938
1939        // insert the head block into the invalid header cache
1940        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1941        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1942
1943        Ok(status)
1944    }
1945
1946    /// Finds any invalid ancestor for the given payload.
1947    ///
1948    /// This function walks up the chain of buffered ancestors from the payload's block
1949    /// hash and checks if any ancestor is marked as invalid in the tree state.
1950    ///
1951    /// The check works by:
1952    /// 1. Finding the lowest buffered ancestor for the given block hash
1953    /// 2. If the ancestor is the same as the block hash itself, using the parent hash instead
1954    /// 3. Checking if this ancestor is in the `invalid_headers` map
1955    ///
1956    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
1957    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1958        let parent_hash = payload.parent_hash();
1959        let block_hash = payload.block_hash();
1960        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1961        if lowest_buffered_ancestor == block_hash {
1962            lowest_buffered_ancestor = parent_hash;
1963        }
1964
1965        // Check if the block has an invalid ancestor
1966        self.state.invalid_headers.get(&lowest_buffered_ancestor)
1967    }
1968
1969    /// Handles a payload that has an invalid ancestor.
1970    ///
1971    /// This function validates the payload and processes it according to whether it's
1972    /// well-formed or malformed:
1973    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
1974    ///    known-bad block, which violates consensus rules
1975    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
1976    ///    validated due to its own structural issues
1977    fn handle_invalid_ancestor_payload(
1978        &mut self,
1979        payload: T::ExecutionData,
1980        invalid: BlockWithParent,
1981    ) -> Result<PayloadStatus, InsertBlockFatalError> {
1982        let parent_hash = payload.parent_hash();
1983        let num_hash = payload.num_hash();
1984
1985        // Here we might have 2 cases
1986        // 1. the block is well formed and indeed links to an invalid header, meaning we should
1987        //    remember it as invalid
1988        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
1989        //    an error and forget it
1990        let block = match self.payload_validator.convert_payload_to_block(payload) {
1991            Ok(block) => block,
1992            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
1993        };
1994
1995        Ok(self.on_invalid_new_payload(block, invalid)?)
1996    }
1997
1998    /// Checks if the given `head` points to an invalid header, which requires a specific response
1999    /// to a forkchoice update.
2000    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2001        // check if the head was previously marked as invalid
2002        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2003
2004        // Try to prepare invalid response, but handle errors gracefully
2005        match self.prepare_invalid_response(header.parent) {
2006            Ok(status) => Ok(Some(status)),
2007            Err(err) => {
2008                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2009                // Return a basic invalid status without latest valid hash
2010                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2011                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2012                })))
2013            }
2014        }
2015    }
2016
2017    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2018    /// and block body itself.
2019    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2020        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2021            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2022            return Err(e)
2023        }
2024
2025        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2026            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2027            return Err(e)
2028        }
2029
2030        Ok(())
2031    }
2032
2033    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2034    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2035    fn try_connect_buffered_blocks(
2036        &mut self,
2037        parent: BlockNumHash,
2038    ) -> Result<(), InsertBlockFatalError> {
2039        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2040
2041        if blocks.is_empty() {
2042            // nothing to append
2043            return Ok(())
2044        }
2045
2046        let now = Instant::now();
2047        let block_count = blocks.len();
2048        for child in blocks {
2049            let child_num_hash = child.num_hash();
2050            match self.insert_block(child) {
2051                Ok(res) => {
2052                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2053                    if self.is_any_sync_target(child_num_hash.hash) &&
2054                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2055                    {
2056                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2057                        // we just inserted a block that we know is part of the canonical chain, so
2058                        // we can make it canonical
2059                        self.make_canonical(child_num_hash.hash)?;
2060                    }
2061                }
2062                Err(err) => {
2063                    if let InsertPayloadError::Block(err) = err {
2064                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2065                        if let Err(fatal) = self.on_insert_block_error(err) {
2066                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2067                            return Err(fatal)
2068                        }
2069                    }
2070                }
2071            }
2072        }
2073
2074        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2075        Ok(())
2076    }
2077
2078    /// Pre-validates the block and inserts it into the buffer.
2079    fn buffer_block(
2080        &mut self,
2081        block: SealedBlock<N::Block>,
2082    ) -> Result<(), InsertBlockError<N::Block>> {
2083        if let Err(err) = self.validate_block(&block) {
2084            return Err(InsertBlockError::consensus_error(err, block))
2085        }
2086        self.state.buffer.insert_block(block);
2087        Ok(())
2088    }
2089
2090    /// Returns true if the distance from the local tip to the block is greater than the configured
2091    /// threshold.
2092    ///
2093    /// If the `local_tip` is greater than the `block`, then this will return false.
2094    #[inline]
2095    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2096        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2097    }
2098
2099    /// Returns how far the local tip is from the given block. If the local tip is at the same
2100    /// height or its block number is greater than the given block, this returns None.
2101    #[inline]
2102    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2103        if block > local_tip {
2104            Some(block - local_tip)
2105        } else {
2106            None
2107        }
2108    }
2109
2110    /// Returns the target hash to sync to if the distance from the local tip to the block is
2111    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2112    /// that block already).
2113    ///
2114    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2115    /// (missing) finalized block.
2116    fn backfill_sync_target(
2117        &self,
2118        canonical_tip_num: u64,
2119        target_block_number: u64,
2120        downloaded_block: Option<BlockNumHash>,
2121    ) -> Option<B256> {
2122        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2123
2124        // check if the downloaded block is the tracked finalized block
2125        let exceeds_backfill_threshold =
2126            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2127                // if we downloaded the finalized block we can now check how far we're off
2128                (Some(downloaded_block), Some(state))
2129                    if downloaded_block.hash == state.finalized_block_hash =>
2130                {
2131                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2132                }
2133                _ => match sync_target_state
2134                    .as_ref()
2135                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2136                {
2137                    Some(buffered_finalized) => {
2138                        // if we have buffered the finalized block, we should check how far we're
2139                        // off
2140                        self.exceeds_backfill_run_threshold(
2141                            canonical_tip_num,
2142                            buffered_finalized.number(),
2143                        )
2144                    }
2145                    None => {
2146                        // check if the distance exceeds the threshold for backfill sync
2147                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2148                    }
2149                },
2150            };
2151
2152        // if the number of missing blocks is greater than the max, trigger backfill
2153        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2154            // if we have already canonicalized the finalized block, we should skip backfill
2155            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2156                Err(err) => {
2157                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2158                }
2159                Ok(None) => {
2160                    // ensure the finalized block is known (not the zero hash)
2161                    if !state.finalized_block_hash.is_zero() {
2162                        // we don't have the block yet and the distance exceeds the allowed
2163                        // threshold
2164                        return Some(state.finalized_block_hash)
2165                    }
2166
2167                    // OPTIMISTIC SYNCING
2168                    //
2169                    // It can happen when the node is doing an
2170                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2171                    // but is expecting the EL to sync as high
2172                    // as possible before finalizing.
2173                    //
2174                    // This usually doesn't happen on ETH mainnet since CLs use the more
2175                    // secure checkpoint syncing.
2176                    //
2177                    // However, optimism chains will do this. The risk of a reorg is however
2178                    // low.
2179                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2180                    return Some(state.head_block_hash)
2181                }
2182                Ok(Some(_)) => {
2183                    // we're fully synced to the finalized block
2184                }
2185            }
2186        }
2187
2188        None
2189    }
2190
2191    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2192    /// happen if a reorg is happening while we are persisting a block.
2193    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2194        let mut canonical = self.state.tree_state.current_canonical_head;
2195        let mut persisted = self.persistence_state.last_persisted_block;
2196
2197        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2198            Ok(self
2199                .sealed_header_by_hash(num_hash.hash)?
2200                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2201                .parent_num_hash())
2202        };
2203
2204        // Happy path, canonical chain is ahead or equal to persisted chain.
2205        // Walk canonical chain back to make sure that it connects to persisted chain.
2206        while canonical.number > persisted.number {
2207            canonical = parent_num_hash(canonical)?;
2208        }
2209
2210        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2211        if canonical == persisted {
2212            return Ok(None);
2213        }
2214
2215        // At this point, we know that `persisted` block can't be reached by walking the canonical
2216        // chain back. In this case we need to truncate it to the first canonical block it connects
2217        // to.
2218
2219        // Firstly, walk back until we reach the same height as `canonical`.
2220        while persisted.number > canonical.number {
2221            persisted = parent_num_hash(persisted)?;
2222        }
2223
2224        debug_assert_eq!(persisted.number, canonical.number);
2225
2226        // Now walk both chains back until we find a common ancestor.
2227        while persisted.hash != canonical.hash {
2228            canonical = parent_num_hash(canonical)?;
2229            persisted = parent_num_hash(persisted)?;
2230        }
2231
2232        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2233
2234        Ok(Some(persisted.number))
2235    }
2236
2237    /// Invoked when we the canonical chain has been updated.
2238    ///
2239    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2240    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2241        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2242        let start = Instant::now();
2243
2244        // update the tracked canonical head
2245        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2246
2247        let tip = chain_update.tip().clone_sealed_header();
2248        let notification = chain_update.to_chain_notification();
2249
2250        // reinsert any missing reorged blocks
2251        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2252            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2253            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2254            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2255
2256            self.update_reorg_metrics(old.len());
2257            self.reinsert_reorged_blocks(new.clone());
2258            self.reinsert_reorged_blocks(old.clone());
2259        }
2260
2261        // update the tracked in-memory state with the new chain
2262        self.canonical_in_memory_state.update_chain(chain_update);
2263        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2264
2265        // Update metrics based on new tip
2266        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2267
2268        // sends an event to all active listeners about the new canonical chain
2269        self.canonical_in_memory_state.notify_canon_state(notification);
2270
2271        // emit event
2272        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2273            Box::new(tip),
2274            start.elapsed(),
2275        ));
2276    }
2277
2278    /// This updates metrics based on the given reorg length.
2279    fn update_reorg_metrics(&self, old_chain_length: usize) {
2280        self.metrics.tree.reorgs.increment(1);
2281        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2282    }
2283
2284    /// This reinserts any blocks in the new chain that do not already exist in the tree
2285    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2286        for block in new_chain {
2287            if self
2288                .state
2289                .tree_state
2290                .executed_block_by_hash(block.recovered_block().hash())
2291                .is_none()
2292            {
2293                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2294                self.state.tree_state.insert_executed(block);
2295            }
2296        }
2297    }
2298
2299    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2300    ///
2301    /// This mainly compares the missing parent of the downloaded block with the current canonical
2302    /// tip, and decides whether or not backfill sync should be triggered.
2303    fn on_disconnected_downloaded_block(
2304        &self,
2305        downloaded_block: BlockNumHash,
2306        missing_parent: BlockNumHash,
2307        head: BlockNumHash,
2308    ) -> Option<TreeEvent> {
2309        // compare the missing parent with the canonical tip
2310        if let Some(target) =
2311            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2312        {
2313            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2314            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2315        }
2316
2317        // continue downloading the missing parent
2318        //
2319        // this happens if either:
2320        //  * the missing parent block num < canonical tip num
2321        //    * this case represents a missing block on a fork that is shorter than the canonical
2322        //      chain
2323        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2324        //    less than the backfill threshold
2325        //    * this case represents a potentially long range of blocks to download and execute
2326        let request = if let Some(distance) =
2327            self.distance_from_local_tip(head.number, missing_parent.number)
2328        {
2329            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2330            DownloadRequest::BlockRange(missing_parent.hash, distance)
2331        } else {
2332            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2333            // This happens when the missing parent is on an outdated
2334            // sidechain and we can only download the missing block itself
2335            DownloadRequest::single_block(missing_parent.hash)
2336        };
2337
2338        Some(TreeEvent::Download(request))
2339    }
2340
2341    /// Invoked with a block downloaded from the network
2342    ///
2343    /// Returns an event with the appropriate action to take, such as:
2344    ///  - download more missing blocks
2345    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2346    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2347    fn on_downloaded_block(
2348        &mut self,
2349        block: SealedBlock<N::Block>,
2350    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2351        let block_num_hash = block.num_hash();
2352        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2353        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2354            return Ok(None)
2355        }
2356
2357        if !self.backfill_sync_state.is_idle() {
2358            return Ok(None)
2359        }
2360
2361        // try to append the block
2362        match self.insert_block(block) {
2363            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2364                // check if we just inserted a block that's part of sync targets,
2365                // i.e. head, safe, or finalized
2366                if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2367                    sync_target.contains(block_num_hash.hash)
2368                {
2369                    debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2370
2371                    // we just inserted a block that we know is part of the canonical chain, so we
2372                    // can make it canonical
2373                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2374                        sync_target_head: block_num_hash.hash,
2375                    })))
2376                }
2377                trace!(target: "engine::tree", "appended downloaded block");
2378                self.try_connect_buffered_blocks(block_num_hash)?;
2379            }
2380            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2381                // block is not connected to the canonical head, we need to download
2382                // its missing branch first
2383                return Ok(self.on_disconnected_downloaded_block(
2384                    block_num_hash,
2385                    missing_ancestor,
2386                    head,
2387                ))
2388            }
2389            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2390                trace!(target: "engine::tree", "downloaded block already executed");
2391            }
2392            Err(err) => {
2393                if let InsertPayloadError::Block(err) = err {
2394                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2395                    if let Err(fatal) = self.on_insert_block_error(err) {
2396                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2397                        return Err(fatal)
2398                    }
2399                }
2400            }
2401        }
2402        Ok(None)
2403    }
2404
2405    /// Inserts a payload into the tree and executes it.
2406    ///
2407    /// This function validates the payload's basic structure, then executes it using the
2408    /// payload validator. The execution includes running all transactions in the payload
2409    /// and validating the resulting state transitions.
2410    ///
2411    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2412    /// or `InsertPayloadError` if validation or execution failed.
2413    fn insert_payload(
2414        &mut self,
2415        payload: T::ExecutionData,
2416    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2417        self.insert_block_or_payload(
2418            payload.block_with_parent(),
2419            payload,
2420            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2421            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2422        )
2423    }
2424
2425    fn insert_block(
2426        &mut self,
2427        block: SealedBlock<N::Block>,
2428    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2429        self.insert_block_or_payload(
2430            block.block_with_parent(),
2431            block,
2432            |validator, block, ctx| validator.validate_block(block, ctx),
2433            |_, block| Ok(block),
2434        )
2435    }
2436
2437    /// Inserts a block or payload into the blockchain tree with full execution.
2438    ///
2439    /// This is a generic function that handles both blocks and payloads by accepting
2440    /// a block identifier, input data, and execution/validation functions. It performs
2441    /// comprehensive checks and execution:
2442    ///
2443    /// - Validates that the block doesn't already exist in the tree
2444    /// - Ensures parent state is available, buffering if necessary
2445    /// - Executes the block/payload using the provided execute function
2446    /// - Handles both canonical and fork chain insertions
2447    /// - Updates pending block state when appropriate
2448    /// - Emits consensus engine events and records metrics
2449    ///
2450    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2451    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2452    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2453    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2454    fn insert_block_or_payload<Input, Err>(
2455        &mut self,
2456        block_id: BlockWithParent,
2457        input: Input,
2458        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2459        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2460    ) -> Result<InsertPayloadOk, Err>
2461    where
2462        Err: From<InsertBlockError<N::Block>>,
2463    {
2464        let block_insert_start = Instant::now();
2465        let block_num_hash = block_id.block;
2466        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2467
2468        match self.sealed_header_by_hash(block_num_hash.hash) {
2469            Err(err) => {
2470                let block = convert_to_block(self, input)?;
2471                return Err(InsertBlockError::new(block, err.into()).into());
2472            }
2473            Ok(Some(_)) => {
2474                // We now assume that we already have this block in the tree. However, we need to
2475                // run the conversion to ensure that the block hash is valid.
2476                convert_to_block(self, input)?;
2477                return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2478            }
2479            _ => {}
2480        };
2481
2482        // Ensure that the parent state is available.
2483        match self.state_provider_builder(block_id.parent) {
2484            Err(err) => {
2485                let block = convert_to_block(self, input)?;
2486                return Err(InsertBlockError::new(block, err.into()).into());
2487            }
2488            Ok(None) => {
2489                let block = convert_to_block(self, input)?;
2490
2491                // we don't have the state required to execute this block, buffering it and find the
2492                // missing parent block
2493                let missing_ancestor = self
2494                    .state
2495                    .buffer
2496                    .lowest_ancestor(&block.parent_hash())
2497                    .map(|block| block.parent_num_hash())
2498                    .unwrap_or_else(|| block.parent_num_hash());
2499
2500                self.state.buffer.insert_block(block);
2501
2502                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2503                    head: self.state.tree_state.current_canonical_head,
2504                    missing_ancestor,
2505                }))
2506            }
2507            Ok(Some(_)) => {}
2508        }
2509
2510        // determine whether we are on a fork chain
2511        let is_fork = match self.is_fork(block_id) {
2512            Err(err) => {
2513                let block = convert_to_block(self, input)?;
2514                return Err(InsertBlockError::new(block, err.into()).into());
2515            }
2516            Ok(is_fork) => is_fork,
2517        };
2518
2519        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2520
2521        let start = Instant::now();
2522
2523        let executed = execute(&mut self.payload_validator, input, ctx)?;
2524
2525        // if the parent is the canonical head, we can insert the block as the pending block
2526        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2527        {
2528            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2529            self.canonical_in_memory_state.set_pending_block(executed.clone());
2530        }
2531
2532        self.state.tree_state.insert_executed(executed.clone());
2533        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2534
2535        // emit insert event
2536        let elapsed = start.elapsed();
2537        let engine_event = if is_fork {
2538            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2539        } else {
2540            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2541        };
2542        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2543
2544        self.metrics
2545            .engine
2546            .block_insert_total_duration
2547            .record(block_insert_start.elapsed().as_secs_f64());
2548        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2549        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2550    }
2551
2552    /// Handles an error that occurred while inserting a block.
2553    ///
2554    /// If this is a validation error this will mark the block as invalid.
2555    ///
2556    /// Returns the proper payload status response if the block is invalid.
2557    fn on_insert_block_error(
2558        &mut self,
2559        error: InsertBlockError<N::Block>,
2560    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2561        let (block, error) = error.split();
2562
2563        // if invalid block, we check the validation error. Otherwise return the fatal
2564        // error.
2565        let validation_err = error.ensure_validation_error()?;
2566
2567        // If the error was due to an invalid payload, the payload is added to the
2568        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2569        // returned.
2570        warn!(
2571            target: "engine::tree",
2572            invalid_hash=%block.hash(),
2573            invalid_number=block.number(),
2574            %validation_err,
2575            "Invalid block error on new payload",
2576        );
2577        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2578
2579        // keep track of the invalid header
2580        self.state.invalid_headers.insert(block.block_with_parent());
2581        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2582            Box::new(block),
2583        )));
2584
2585        Ok(PayloadStatus::new(
2586            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2587            latest_valid_hash,
2588        ))
2589    }
2590
2591    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2592    fn on_new_payload_error(
2593        &mut self,
2594        error: NewPayloadError,
2595        payload_num_hash: NumHash,
2596        parent_hash: B256,
2597    ) -> ProviderResult<PayloadStatus> {
2598        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2599        // we need to convert the error to a payload status (response to the CL)
2600
2601        let latest_valid_hash =
2602            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2603                // Engine-API rules:
2604                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2605                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2606                None
2607            } else {
2608                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2609            };
2610
2611        let status = PayloadStatusEnum::from(error);
2612        Ok(PayloadStatus::new(status, latest_valid_hash))
2613    }
2614
2615    /// Attempts to find the header for the given block hash if it is canonical.
2616    pub fn find_canonical_header(
2617        &self,
2618        hash: B256,
2619    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2620        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2621
2622        if canonical.is_none() {
2623            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2624        }
2625
2626        Ok(canonical)
2627    }
2628
2629    /// Updates the tracked finalized block if we have it.
2630    fn update_finalized_block(
2631        &self,
2632        finalized_block_hash: B256,
2633    ) -> Result<(), OnForkChoiceUpdated> {
2634        if finalized_block_hash.is_zero() {
2635            return Ok(())
2636        }
2637
2638        match self.find_canonical_header(finalized_block_hash) {
2639            Ok(None) => {
2640                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2641                // if the finalized block is not known, we can't update the finalized block
2642                return Err(OnForkChoiceUpdated::invalid_state())
2643            }
2644            Ok(Some(finalized)) => {
2645                if Some(finalized.num_hash()) !=
2646                    self.canonical_in_memory_state.get_finalized_num_hash()
2647                {
2648                    // we're also persisting the finalized block on disk so we can reload it on
2649                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2650                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2651                    self.canonical_in_memory_state.set_finalized(finalized.clone());
2652                    // Update finalized block height metric
2653                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2654                }
2655            }
2656            Err(err) => {
2657                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2658            }
2659        }
2660
2661        Ok(())
2662    }
2663
2664    /// Updates the tracked safe block if we have it
2665    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2666        if safe_block_hash.is_zero() {
2667            return Ok(())
2668        }
2669
2670        match self.find_canonical_header(safe_block_hash) {
2671            Ok(None) => {
2672                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2673                // if the safe block is not known, we can't update the safe block
2674                return Err(OnForkChoiceUpdated::invalid_state())
2675            }
2676            Ok(Some(safe)) => {
2677                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2678                    // we're also persisting the safe block on disk so we can reload it on
2679                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2680                    let _ = self.persistence.save_safe_block_number(safe.number());
2681                    self.canonical_in_memory_state.set_safe(safe.clone());
2682                    // Update safe block height metric
2683                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
2684                }
2685            }
2686            Err(err) => {
2687                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2688            }
2689        }
2690
2691        Ok(())
2692    }
2693
2694    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2695    /// made canonical.
2696    ///
2697    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2698    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2699    ///
2700    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2701    /// are consistent with the head block.
2702    fn ensure_consistent_forkchoice_state(
2703        &self,
2704        state: ForkchoiceState,
2705    ) -> Result<(), OnForkChoiceUpdated> {
2706        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2707        // after the head block is canonicalized.
2708        //
2709        // This ensures that the finalized block is consistent with the head block, i.e. the
2710        // finalized block is an ancestor of the head block.
2711        self.update_finalized_block(state.finalized_block_hash)?;
2712
2713        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2714        // after the head block is canonicalized.
2715        //
2716        // This ensures that the safe block is consistent with the head block, i.e. the safe
2717        // block is an ancestor of the head block.
2718        self.update_safe_block(state.safe_block_hash)
2719    }
2720
2721    /// Validates the payload attributes with respect to the header and fork choice state.
2722    ///
2723    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2724    /// return an error if the payload attributes are invalid.
2725    fn process_payload_attributes(
2726        &self,
2727        attrs: T::PayloadAttributes,
2728        head: &N::BlockHeader,
2729        state: ForkchoiceState,
2730        version: EngineApiMessageVersion,
2731    ) -> OnForkChoiceUpdated {
2732        if let Err(err) =
2733            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2734        {
2735            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2736            return OnForkChoiceUpdated::invalid_payload_attributes()
2737        }
2738
2739        // 8. Client software MUST begin a payload build process building on top of
2740        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2741        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2742        //    The build process is specified in the Payload building section.
2743        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2744            state.head_block_hash,
2745            attrs,
2746            version as u8,
2747        ) {
2748            Ok(attributes) => {
2749                // send the payload to the builder and return the receiver for the pending payload
2750                // id, initiating payload job is handled asynchronously
2751                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2752
2753                // Client software MUST respond to this method call in the following way:
2754                // {
2755                //      payloadStatus: {
2756                //          status: VALID,
2757                //          latestValidHash: forkchoiceState.headBlockHash,
2758                //          validationError: null
2759                //      },
2760                //      payloadId: buildProcessId
2761                // }
2762                //
2763                // if the payload is deemed VALID and the build process has begun.
2764                OnForkChoiceUpdated::updated_with_pending_payload_id(
2765                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2766                    pending_payload_id,
2767                )
2768            }
2769            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2770        }
2771    }
2772
2773    /// Remove all blocks up to __and including__ the given block number.
2774    ///
2775    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2776    /// those which have a fork point at or below the finalized hash.
2777    ///
2778    /// Canonical blocks below the upper bound will still be removed.
2779    pub(crate) fn remove_before(
2780        &mut self,
2781        upper_bound: BlockNumHash,
2782        finalized_hash: Option<B256>,
2783    ) -> ProviderResult<()> {
2784        // first fetch the finalized block number and then call the remove_before method on
2785        // tree_state
2786        let num = if let Some(hash) = finalized_hash {
2787            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2788        } else {
2789            None
2790        };
2791
2792        self.state.tree_state.remove_until(
2793            upper_bound,
2794            self.persistence_state.last_persisted_block.hash,
2795            num,
2796        );
2797        Ok(())
2798    }
2799
2800    /// Returns a builder for creating state providers for the given hash.
2801    ///
2802    /// This is an optimization for parallel execution contexts where we want to avoid
2803    /// creating state providers in the critical path.
2804    pub fn state_provider_builder(
2805        &self,
2806        hash: B256,
2807    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2808    where
2809        P: BlockReader + StateProviderFactory + StateReader + Clone,
2810    {
2811        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2812            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2813            // the block leads back to the canonical chain
2814            return Ok(Some(StateProviderBuilder::new(
2815                self.provider.clone(),
2816                historical,
2817                Some(blocks),
2818            )))
2819        }
2820
2821        // Check if the block is persisted
2822        if let Some(header) = self.provider.header(hash)? {
2823            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2824            // For persisted blocks, we create a builder that will fetch state directly from the
2825            // database
2826            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2827        }
2828
2829        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2830        Ok(None)
2831    }
2832}
2833
2834/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2835/// variant.
2836///
2837/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2838/// is valid or not.
2839#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2840pub enum BlockStatus {
2841    /// The block is valid and block extends canonical chain.
2842    Valid,
2843    /// The block may be valid and has an unknown missing ancestor.
2844    Disconnected {
2845        /// Current canonical head.
2846        head: BlockNumHash,
2847        /// The lowest ancestor block that is not connected to the canonical chain.
2848        missing_ancestor: BlockNumHash,
2849    },
2850}
2851
2852/// How a payload was inserted if it was valid.
2853///
2854/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
2855/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
2856#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2857pub enum InsertPayloadOk {
2858    /// The payload was valid, but we have already seen it.
2859    AlreadySeen(BlockStatus),
2860    /// The payload was valid and inserted into the tree.
2861    Inserted(BlockStatus),
2862}