reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use reth_chain_state::{
17    CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, OnForkChoiceUpdated,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::{ConfigureEvm, OnStateHook};
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{
28    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
29};
30use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
31use reth_provider::{
32    BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
33    StateProviderFactory, StateReader, TransactionVariant, TrieReader,
34};
35use reth_revm::database::StateProviderDatabase;
36use reth_stages_api::ControlFlow;
37use revm::state::EvmState;
38use state::TreeState;
39use std::{
40    fmt::Debug,
41    sync::{
42        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
43        Arc,
44    },
45    time::Instant,
46};
47use tokio::sync::{
48    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
49    oneshot::{self, error::TryRecvError},
50};
51use tracing::*;
52
53mod block_buffer;
54mod cached_state;
55pub mod error;
56mod instrumented_state;
57mod invalid_headers;
58mod metrics;
59mod payload_processor;
60pub mod payload_validator;
61mod persistence_state;
62pub mod precompile_cache;
63#[cfg(test)]
64mod tests;
65// TODO(alexey): compare trie updates in `insert_block_inner`
66#[expect(unused)]
67mod trie_updates;
68
69use crate::tree::error::AdvancePersistenceError;
70pub use block_buffer::BlockBuffer;
71pub use invalid_headers::InvalidHeaderCache;
72pub use payload_processor::*;
73pub use payload_validator::{BasicEngineValidator, EngineValidator};
74pub use persistence_state::PersistenceState;
75pub use reth_engine_primitives::TreeConfig;
76
77pub mod state;
78
79/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
80///
81/// This is the default threshold, and represents the distance (gap) from the local head to a
82/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
83/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
84///
85/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
86/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
87/// backfill this gap.
88pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
89
90/// A builder for creating state providers that can be used across threads.
91#[derive(Clone, Debug)]
92pub struct StateProviderBuilder<N: NodePrimitives, P> {
93    /// The provider factory used to create providers.
94    provider_factory: P,
95    /// The historical block hash to fetch state from.
96    historical: B256,
97    /// The blocks that form the chain from historical to target and are in memory.
98    overlay: Option<Vec<ExecutedBlock<N>>>,
99}
100
101impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
102    /// Creates a new state provider from the provider factory, historical block hash and optional
103    /// overlaid blocks.
104    pub const fn new(
105        provider_factory: P,
106        historical: B256,
107        overlay: Option<Vec<ExecutedBlock<N>>>,
108    ) -> Self {
109        Self { provider_factory, historical, overlay }
110    }
111}
112
113impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
114where
115    P: BlockReader + StateProviderFactory + StateReader + Clone,
116{
117    /// Creates a new state provider from this builder.
118    pub fn build(&self) -> ProviderResult<StateProviderBox> {
119        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
120        if let Some(overlay) = self.overlay.clone() {
121            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
122        }
123        Ok(provider)
124    }
125}
126
127/// Tracks the state of the engine api internals.
128///
129/// This type is not shareable.
130#[derive(Debug)]
131pub struct EngineApiTreeState<N: NodePrimitives> {
132    /// Tracks the state of the blockchain tree.
133    tree_state: TreeState<N>,
134    /// Tracks the forkchoice state updates received by the CL.
135    forkchoice_state_tracker: ForkchoiceStateTracker,
136    /// Buffer of detached blocks.
137    buffer: BlockBuffer<N::Block>,
138    /// Tracks the header of invalid payloads that were rejected by the engine because they're
139    /// invalid.
140    invalid_headers: InvalidHeaderCache,
141}
142
143impl<N: NodePrimitives> EngineApiTreeState<N> {
144    fn new(
145        block_buffer_limit: u32,
146        max_invalid_header_cache_length: u32,
147        canonical_block: BlockNumHash,
148        engine_kind: EngineApiKind,
149    ) -> Self {
150        Self {
151            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
152            buffer: BlockBuffer::new(block_buffer_limit),
153            tree_state: TreeState::new(canonical_block, engine_kind),
154            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
155        }
156    }
157}
158
159/// The outcome of a tree operation.
160#[derive(Debug)]
161pub struct TreeOutcome<T> {
162    /// The outcome of the operation.
163    pub outcome: T,
164    /// An optional event to tell the caller to do something.
165    pub event: Option<TreeEvent>,
166}
167
168impl<T> TreeOutcome<T> {
169    /// Create new tree outcome.
170    pub const fn new(outcome: T) -> Self {
171        Self { outcome, event: None }
172    }
173
174    /// Set event on the outcome.
175    pub fn with_event(mut self, event: TreeEvent) -> Self {
176        self.event = Some(event);
177        self
178    }
179}
180
181/// Events that are triggered by Tree Chain
182#[derive(Debug)]
183pub enum TreeEvent {
184    /// Tree action is needed.
185    TreeAction(TreeAction),
186    /// Backfill action is needed.
187    BackfillAction(BackfillAction),
188    /// Block download is needed.
189    Download(DownloadRequest),
190}
191
192impl TreeEvent {
193    /// Returns true if the event is a backfill action.
194    const fn is_backfill_action(&self) -> bool {
195        matches!(self, Self::BackfillAction(_))
196    }
197}
198
199/// The actions that can be performed on the tree.
200#[derive(Debug)]
201pub enum TreeAction {
202    /// Make target canonical.
203    MakeCanonical {
204        /// The sync target head hash
205        sync_target_head: B256,
206    },
207}
208
209/// Wrapper struct that combines metrics and state hook
210struct MeteredStateHook {
211    metrics: reth_evm::metrics::ExecutorMetrics,
212    inner_hook: Box<dyn OnStateHook>,
213}
214
215impl OnStateHook for MeteredStateHook {
216    fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
217        // Update the metrics for the number of accounts, storage slots and bytecodes loaded
218        let accounts = state.keys().len();
219        let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
220        let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
221
222        self.metrics.accounts_loaded_histogram.record(accounts as f64);
223        self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
224        self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
225
226        // Call the original state hook
227        self.inner_hook.on_state(source, state);
228    }
229}
230
231/// The engine API tree handler implementation.
232///
233/// This type is responsible for processing engine API requests, maintaining the canonical state and
234/// emitting events.
235pub struct EngineApiTreeHandler<N, P, T, V, C>
236where
237    N: NodePrimitives,
238    T: PayloadTypes,
239    C: ConfigureEvm<Primitives = N> + 'static,
240{
241    provider: P,
242    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
243    payload_validator: V,
244    /// Keeps track of internals such as executed and buffered blocks.
245    state: EngineApiTreeState<N>,
246    /// The half for sending messages to the engine.
247    ///
248    /// This is kept so that we can queue in messages to ourself that we can process later, for
249    /// example distributing workload across multiple messages that would otherwise take too long
250    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
251    /// them one by one so that we can handle incoming engine API in between and don't become
252    /// unresponsive. This can happen during live sync transition where we're trying to close the
253    /// gap (up to 3 epochs of blocks in the worst case).
254    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
255    /// Incoming engine API requests.
256    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
257    /// Outgoing events that are emitted to the handler.
258    outgoing: UnboundedSender<EngineApiEvent<N>>,
259    /// Channels to the persistence layer.
260    persistence: PersistenceHandle<N>,
261    /// Tracks the state changes of the persistence task.
262    persistence_state: PersistenceState,
263    /// Flag indicating the state of the node's backfill synchronization process.
264    backfill_sync_state: BackfillSyncState,
265    /// Keeps track of the state of the canonical chain that isn't persisted yet.
266    /// This is intended to be accessed from external sources, such as rpc.
267    canonical_in_memory_state: CanonicalInMemoryState<N>,
268    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
269    /// updates
270    payload_builder: PayloadBuilderHandle<T>,
271    /// Configuration settings.
272    config: TreeConfig,
273    /// Metrics for the engine api.
274    metrics: EngineApiMetrics,
275    /// The engine API variant of this handler
276    engine_kind: EngineApiKind,
277    /// The EVM configuration.
278    evm_config: C,
279}
280
281impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
282    for EngineApiTreeHandler<N, P, T, V, C>
283where
284    N: NodePrimitives,
285    C: Debug + ConfigureEvm<Primitives = N>,
286{
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        f.debug_struct("EngineApiTreeHandler")
289            .field("provider", &self.provider)
290            .field("consensus", &self.consensus)
291            .field("payload_validator", &self.payload_validator)
292            .field("state", &self.state)
293            .field("incoming_tx", &self.incoming_tx)
294            .field("persistence", &self.persistence)
295            .field("persistence_state", &self.persistence_state)
296            .field("backfill_sync_state", &self.backfill_sync_state)
297            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
298            .field("payload_builder", &self.payload_builder)
299            .field("config", &self.config)
300            .field("metrics", &self.metrics)
301            .field("engine_kind", &self.engine_kind)
302            .field("evm_config", &self.evm_config)
303            .finish()
304    }
305}
306
307impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
308where
309    N: NodePrimitives,
310    P: DatabaseProviderFactory
311        + BlockReader<Block = N::Block, Header = N::BlockHeader>
312        + StateProviderFactory
313        + StateReader<Receipt = N::Receipt>
314        + HashedPostStateProvider
315        + TrieReader
316        + Clone
317        + 'static,
318    <P as DatabaseProviderFactory>::Provider:
319        BlockReader<Block = N::Block, Header = N::BlockHeader>,
320    C: ConfigureEvm<Primitives = N> + 'static,
321    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
322    V: EngineValidator<T>,
323{
324    /// Creates a new [`EngineApiTreeHandler`].
325    #[expect(clippy::too_many_arguments)]
326    pub fn new(
327        provider: P,
328        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
329        payload_validator: V,
330        outgoing: UnboundedSender<EngineApiEvent<N>>,
331        state: EngineApiTreeState<N>,
332        canonical_in_memory_state: CanonicalInMemoryState<N>,
333        persistence: PersistenceHandle<N>,
334        persistence_state: PersistenceState,
335        payload_builder: PayloadBuilderHandle<T>,
336        config: TreeConfig,
337        engine_kind: EngineApiKind,
338        evm_config: C,
339    ) -> Self {
340        let (incoming_tx, incoming) = std::sync::mpsc::channel();
341
342        Self {
343            provider,
344            consensus,
345            payload_validator,
346            incoming,
347            outgoing,
348            persistence,
349            persistence_state,
350            backfill_sync_state: BackfillSyncState::Idle,
351            state,
352            canonical_in_memory_state,
353            payload_builder,
354            config,
355            metrics: Default::default(),
356            incoming_tx,
357            engine_kind,
358            evm_config,
359        }
360    }
361
362    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
363    /// own thread.
364    ///
365    /// Returns the sender through which incoming requests can be sent to the task and the receiver
366    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
367    #[expect(clippy::complexity)]
368    pub fn spawn_new(
369        provider: P,
370        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
371        payload_validator: V,
372        persistence: PersistenceHandle<N>,
373        payload_builder: PayloadBuilderHandle<T>,
374        canonical_in_memory_state: CanonicalInMemoryState<N>,
375        config: TreeConfig,
376        kind: EngineApiKind,
377        evm_config: C,
378    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
379    {
380        let best_block_number = provider.best_block_number().unwrap_or(0);
381        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
382
383        let persistence_state = PersistenceState {
384            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
385            rx: None,
386        };
387
388        let (tx, outgoing) = unbounded_channel();
389        let state = EngineApiTreeState::new(
390            config.block_buffer_limit(),
391            config.max_invalid_header_cache_length(),
392            header.num_hash(),
393            kind,
394        );
395
396        let task = Self::new(
397            provider,
398            consensus,
399            payload_validator,
400            tx,
401            state,
402            canonical_in_memory_state,
403            persistence,
404            persistence_state,
405            payload_builder,
406            config,
407            kind,
408            evm_config,
409        );
410        let incoming = task.incoming_tx.clone();
411        std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
412        (incoming, outgoing)
413    }
414
415    /// Returns a new [`Sender`] to send messages to this type.
416    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
417        self.incoming_tx.clone()
418    }
419
420    /// Run the engine API handler.
421    ///
422    /// This will block the current thread and process incoming messages.
423    pub fn run(mut self) {
424        loop {
425            match self.try_recv_engine_message() {
426                Ok(Some(msg)) => {
427                    debug!(target: "engine::tree", %msg, "received new engine message");
428                    if let Err(fatal) = self.on_engine_message(msg) {
429                        error!(target: "engine::tree", %fatal, "insert block fatal error");
430                        return
431                    }
432                }
433                Ok(None) => {
434                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
435                }
436                Err(_err) => {
437                    error!(target: "engine::tree", "Engine channel disconnected");
438                    return
439                }
440            }
441
442            if let Err(err) = self.advance_persistence() {
443                error!(target: "engine::tree", %err, "Advancing persistence failed");
444                return
445            }
446        }
447    }
448
449    /// Invoked when previously requested blocks were downloaded.
450    ///
451    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
452    /// will execute the first batch and send the remaining blocks back through the channel so that
453    /// block request processing isn't blocked for a long time.
454    fn on_downloaded(
455        &mut self,
456        mut blocks: Vec<RecoveredBlock<N::Block>>,
457    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
458        if blocks.is_empty() {
459            // nothing to execute
460            return Ok(None)
461        }
462
463        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
464        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
465        for block in blocks.drain(..batch) {
466            if let Some(event) = self.on_downloaded_block(block)? {
467                let needs_backfill = event.is_backfill_action();
468                self.on_tree_event(event)?;
469                if needs_backfill {
470                    // can exit early if backfill is needed
471                    return Ok(None)
472                }
473            }
474        }
475
476        // if we still have blocks to execute, send them as a followup request
477        if !blocks.is_empty() {
478            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
479        }
480
481        Ok(None)
482    }
483
484    /// When the Consensus layer receives a new block via the consensus gossip protocol,
485    /// the transactions in the block are sent to the execution layer in the form of a
486    /// [`PayloadTypes::ExecutionData`], for example
487    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
488    /// Execution layer executes the transactions and validates the state in the block header,
489    /// then passes validation data back to Consensus layer, that adds the block to the head of
490    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
491    /// network in the form of a "Beacon block".
492    ///
493    /// These responses should adhere to the [Engine API Spec for
494    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
495    ///
496    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
497    /// returns an error if an internal error occurred.
498    #[instrument(
499        level = "debug",
500        target = "engine::tree",
501        skip_all,
502        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
503    )]
504    fn on_new_payload(
505        &mut self,
506        payload: T::ExecutionData,
507    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508        trace!(target: "engine::tree", "invoked new payload");
509        self.metrics.engine.new_payload_messages.increment(1);
510
511        // start timing for the new payload process
512        let start = Instant::now();
513
514        // Ensures that the given payload does not violate any consensus rules that concern the
515        // block's layout, like:
516        //    - missing or invalid base fee
517        //    - invalid extra data
518        //    - invalid transactions
519        //    - incorrect hash
520        //    - the versioned hashes passed with the payload do not exactly match transaction
521        //      versioned hashes
522        //    - the block does not contain blob transactions if it is pre-cancun
523        //
524        // This validates the following engine API rule:
525        //
526        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
527        //    validation by taking the following steps:
528        //
529        //   1. Obtain the actual array by concatenating blob versioned hashes lists
530        //      (`tx.blob_versioned_hashes`) of each [blob
531        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
532        //      in the payload, respecting the order of inclusion. If the payload has no blob
533        //      transactions the expected array **MUST** be `[]`.
534        //
535        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
536        //      null}` if the expected and the actual arrays don't match.
537        //
538        // This validation **MUST** be instantly run in all cases even during active sync process.
539
540        let num_hash = payload.num_hash();
541        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
542        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
543
544        let block_hash = num_hash.hash;
545
546        // Check for invalid ancestors
547        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
548            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
549            return Ok(TreeOutcome::new(status));
550        }
551
552        // record pre-execution phase duration
553        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
554
555        let status = if self.backfill_sync_state.is_idle() {
556            self.try_insert_payload(payload)?
557        } else {
558            self.try_buffer_payload(payload)?
559        };
560
561        let mut outcome = TreeOutcome::new(status);
562        // if the block is valid and it is the current sync target head, make it canonical
563        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
564            // Only create the canonical event if this block isn't already the canonical head
565            if self.state.tree_state.canonical_block_hash() != block_hash {
566                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
567                    sync_target_head: block_hash,
568                }));
569            }
570        }
571
572        // record total newPayload duration
573        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
574
575        Ok(outcome)
576    }
577
578    /// Processes a payload during normal sync operation.
579    ///
580    /// Returns:
581    /// - `Valid`: Payload successfully validated and inserted
582    /// - `Syncing`: Parent missing, payload buffered for later
583    /// - Error status: Payload is invalid
584    #[instrument(level = "debug", target = "engine::tree", skip_all)]
585    fn try_insert_payload(
586        &mut self,
587        payload: T::ExecutionData,
588    ) -> Result<PayloadStatus, InsertBlockFatalError> {
589        let block_hash = payload.block_hash();
590        let num_hash = payload.num_hash();
591        let parent_hash = payload.parent_hash();
592        let mut latest_valid_hash = None;
593
594        match self.insert_payload(payload) {
595            Ok(status) => {
596                let status = match status {
597                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
598                        latest_valid_hash = Some(block_hash);
599                        self.try_connect_buffered_blocks(num_hash)?;
600                        PayloadStatusEnum::Valid
601                    }
602                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
603                        latest_valid_hash = Some(block_hash);
604                        PayloadStatusEnum::Valid
605                    }
606                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
607                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
608                        // not known to be invalid, but we don't know anything else
609                        PayloadStatusEnum::Syncing
610                    }
611                };
612
613                Ok(PayloadStatus::new(status, latest_valid_hash))
614            }
615            Err(error) => match error {
616                InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
617                InsertPayloadError::Payload(error) => {
618                    Ok(self.on_new_payload_error(error, parent_hash)?)
619                }
620            },
621        }
622    }
623
624    /// Stores a payload for later processing during backfill sync.
625    ///
626    /// During backfill, the node lacks the state needed to validate payloads,
627    /// so they are buffered (stored in memory) until their parent blocks are synced.
628    ///
629    /// Returns:
630    /// - `Syncing`: Payload successfully buffered
631    /// - Error status: Payload is malformed or invalid
632    fn try_buffer_payload(
633        &mut self,
634        payload: T::ExecutionData,
635    ) -> Result<PayloadStatus, InsertBlockFatalError> {
636        let parent_hash = payload.parent_hash();
637
638        match self.payload_validator.ensure_well_formed_payload(payload) {
639            // if the block is well-formed, buffer it for later
640            Ok(block) => {
641                if let Err(error) = self.buffer_block(block) {
642                    Ok(self.on_insert_block_error(error)?)
643                } else {
644                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
645                }
646            }
647            Err(error) => Ok(self.on_new_payload_error(error, parent_hash)?),
648        }
649    }
650
651    /// Returns the new chain for the given head.
652    ///
653    /// This also handles reorgs.
654    ///
655    /// Note: This does not update the tracked state and instead returns the new chain based on the
656    /// given head.
657    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
658        // get the executed new head block
659        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
660            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
661            self.metrics.engine.executed_new_block_cache_miss.increment(1);
662            return Ok(None)
663        };
664
665        let new_head_number = new_head_block.recovered_block().number();
666        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
667
668        let mut new_chain = vec![new_head_block.clone()];
669        let mut current_hash = new_head_block.recovered_block().parent_hash();
670        let mut current_number = new_head_number - 1;
671
672        // Walk back the new chain until we reach a block we know about
673        //
674        // This is only done for in-memory blocks, because we should not have persisted any blocks
675        // that are _above_ the current canonical head.
676        while current_number > current_canonical_number {
677            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
678            {
679                current_hash = block.recovered_block().parent_hash();
680                current_number -= 1;
681                new_chain.push(block);
682            } else {
683                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
684                // This should never happen as we're walking back a chain that should connect to
685                // the canonical chain
686                return Ok(None)
687            }
688        }
689
690        // If we have reached the current canonical head by walking back from the target, then we
691        // know this represents an extension of the canonical chain.
692        if current_hash == self.state.tree_state.current_canonical_head.hash {
693            new_chain.reverse();
694
695            // Simple extension of the current chain
696            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
697        }
698
699        // We have a reorg. Walk back both chains to find the fork point.
700        let mut old_chain = Vec::new();
701        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
702
703        // If the canonical chain is ahead of the new chain,
704        // gather all blocks until new head number.
705        while current_canonical_number > current_number {
706            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
707                old_chain.push(block.clone());
708                old_hash = block.recovered_block().parent_hash();
709                current_canonical_number -= 1;
710            } else {
711                // This shouldn't happen as we're walking back the canonical chain
712                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
713                return Ok(None)
714            }
715        }
716
717        // Both new and old chain pointers are now at the same height.
718        debug_assert_eq!(current_number, current_canonical_number);
719
720        // Walk both chains from specified hashes at same height until
721        // a common ancestor (fork block) is reached.
722        while old_hash != current_hash {
723            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
724                old_hash = block.recovered_block().parent_hash();
725                old_chain.push(block);
726            } else {
727                // This shouldn't happen as we're walking back the canonical chain
728                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
729                return Ok(None)
730            }
731
732            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
733            {
734                current_hash = block.recovered_block().parent_hash();
735                new_chain.push(block);
736            } else {
737                // This shouldn't happen as we've already walked this path
738                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
739                return Ok(None)
740            }
741        }
742        new_chain.reverse();
743        old_chain.reverse();
744
745        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
746    }
747
748    /// Updates the latest block state to the specified canonical ancestor.
749    ///
750    /// This method ensures that the latest block tracks the given canonical header by resetting
751    ///
752    /// # Arguments
753    /// * `canonical_header` - The canonical header to set as the new head
754    ///
755    /// # Returns
756    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
757    ///
758    /// Caution: This unwinds the canonical chain
759    fn update_latest_block_to_canonical_ancestor(
760        &mut self,
761        canonical_header: &SealedHeader<N::BlockHeader>,
762    ) -> ProviderResult<()> {
763        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
764        let current_head_number = self.state.tree_state.canonical_block_number();
765        let new_head_number = canonical_header.number();
766        let new_head_hash = canonical_header.hash();
767
768        // Update tree state with the new canonical head
769        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
770
771        // Handle the state update based on whether this is an unwind scenario
772        if new_head_number < current_head_number {
773            debug!(
774                target: "engine::tree",
775                current_head = current_head_number,
776                new_head = new_head_number,
777                new_head_hash = ?new_head_hash,
778                "FCU unwind detected: reverting to canonical ancestor"
779            );
780
781            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
782        } else {
783            debug!(
784                target: "engine::tree",
785                previous_head = current_head_number,
786                new_head = new_head_number,
787                new_head_hash = ?new_head_hash,
788                "Advancing latest block to canonical ancestor"
789            );
790            self.handle_chain_advance_or_same_height(canonical_header)
791        }
792    }
793
794    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
795    /// to the canonical header
796    fn handle_canonical_chain_unwind(
797        &self,
798        current_head_number: u64,
799        canonical_header: &SealedHeader<N::BlockHeader>,
800    ) -> ProviderResult<()> {
801        let new_head_number = canonical_header.number();
802        debug!(
803            target: "engine::tree",
804            from = current_head_number,
805            to = new_head_number,
806            "Handling unwind: collecting blocks to remove from in-memory state"
807        );
808
809        // Collect blocks that need to be removed from memory
810        let old_blocks =
811            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
812
813        // Load and apply the canonical ancestor block
814        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
815    }
816
817    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
818    fn collect_blocks_for_canonical_unwind(
819        &self,
820        new_head_number: u64,
821        current_head_number: u64,
822    ) -> Vec<ExecutedBlock<N>> {
823        let mut old_blocks = Vec::new();
824
825        for block_num in (new_head_number + 1)..=current_head_number {
826            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
827                let executed_block = block_state.block_ref().clone();
828                old_blocks.push(executed_block);
829                debug!(
830                    target: "engine::tree",
831                    block_number = block_num,
832                    "Collected block for removal from in-memory state"
833                );
834            }
835        }
836
837        if old_blocks.is_empty() {
838            debug!(
839                target: "engine::tree",
840                "No blocks found in memory to remove, will clear and reset state"
841            );
842        }
843
844        old_blocks
845    }
846
847    /// Applies the canonical ancestor block via a reorg operation.
848    fn apply_canonical_ancestor_via_reorg(
849        &self,
850        canonical_header: &SealedHeader<N::BlockHeader>,
851        old_blocks: Vec<ExecutedBlock<N>>,
852    ) -> ProviderResult<()> {
853        let new_head_hash = canonical_header.hash();
854        let new_head_number = canonical_header.number();
855
856        // Try to load the canonical ancestor's block
857        match self.canonical_block_by_hash(new_head_hash)? {
858            Some(executed_block) => {
859                // Perform the reorg to properly handle the unwind
860                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
861                    new: vec![executed_block],
862                    old: old_blocks,
863                });
864
865                // CRITICAL: Update the canonical head after the reorg
866                // This ensures get_canonical_head() returns the correct block
867                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
868
869                debug!(
870                    target: "engine::tree",
871                    block_number = new_head_number,
872                    block_hash = ?new_head_hash,
873                    "Successfully loaded canonical ancestor into memory via reorg"
874                );
875            }
876            None => {
877                // Fallback: update header only if block cannot be found
878                warn!(
879                    target: "engine::tree",
880                    block_hash = ?new_head_hash,
881                    "Could not find canonical ancestor block, updating header only"
882                );
883                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
884            }
885        }
886
887        Ok(())
888    }
889
890    /// Handles chain advance or same height scenarios.
891    fn handle_chain_advance_or_same_height(
892        &self,
893        canonical_header: &SealedHeader<N::BlockHeader>,
894    ) -> ProviderResult<()> {
895        let new_head_number = canonical_header.number();
896        let new_head_hash = canonical_header.hash();
897
898        // Update the canonical head header
899        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
900
901        // Load the block into memory if it's not already present
902        self.ensure_block_in_memory(new_head_number, new_head_hash)
903    }
904
905    /// Ensures a block is loaded into memory if not already present.
906    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
907        // Check if block is already in memory
908        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
909            return Ok(());
910        }
911
912        // Try to load the block from storage
913        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
914            self.canonical_in_memory_state
915                .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
916
917            debug!(
918                target: "engine::tree",
919                block_number,
920                block_hash = ?block_hash,
921                "Added canonical block to in-memory state"
922            );
923        }
924
925        Ok(())
926    }
927
928    /// Determines if the given block is part of a fork by checking that these
929    /// conditions are true:
930    /// * walking back from the target hash to verify that the target hash is not part of an
931    ///   extension of the canonical chain.
932    /// * walking back from the current head to verify that the target hash is not already part of
933    ///   the canonical chain.
934    ///
935    /// The header is required as an arg, because we might be checking that the header is a fork
936    /// block before it's in the tree state and before it's in the database.
937    fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
938        let target_hash = target.block.hash;
939        // verify that the given hash is not part of an extension of the canon chain.
940        let canonical_head = self.state.tree_state.canonical_head();
941        let mut current_hash;
942        let mut current_block = target;
943        loop {
944            if current_block.block.hash == canonical_head.hash {
945                return Ok(false)
946            }
947            // We already passed the canonical head
948            if current_block.block.number <= canonical_head.number {
949                break
950            }
951            current_hash = current_block.parent;
952
953            let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
954            current_block = next_block.block_with_parent();
955        }
956
957        // verify that the given hash is not already part of canonical chain stored in memory
958        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
959            return Ok(false)
960        }
961
962        // verify that the given hash is not already part of persisted canonical chain
963        if self.provider.block_number(target_hash)?.is_some() {
964            return Ok(false)
965        }
966
967        Ok(true)
968    }
969
970    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
971    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
972    /// chain.
973    ///
974    /// These responses should adhere to the [Engine API Spec for
975    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
976    ///
977    /// Returns an error if an internal error occurred like a database error.
978    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
979    fn on_forkchoice_updated(
980        &mut self,
981        state: ForkchoiceState,
982        attrs: Option<T::PayloadAttributes>,
983        version: EngineApiMessageVersion,
984    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
985        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
986
987        // Record metrics
988        self.record_forkchoice_metrics(&attrs);
989
990        // Pre-validation of forkchoice state
991        if let Some(early_result) = self.validate_forkchoice_state(state)? {
992            return Ok(TreeOutcome::new(early_result));
993        }
994
995        // Return early if we are on the correct fork
996        if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
997            return Ok(result);
998        }
999
1000        // Attempt to apply a chain update when the head differs from our canonical chain.
1001        // This handles reorgs and chain extensions by making the specified head canonical.
1002        if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1003            return Ok(result);
1004        }
1005
1006        // Fallback that ensures to catch up to the network's state.
1007        self.handle_missing_block(state)
1008    }
1009
1010    /// Records metrics for forkchoice updated calls
1011    fn record_forkchoice_metrics(&self, attrs: &Option<T::PayloadAttributes>) {
1012        self.metrics.engine.forkchoice_updated_messages.increment(1);
1013        if attrs.is_some() {
1014            self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1015        }
1016        self.canonical_in_memory_state.on_forkchoice_update_received();
1017    }
1018
1019    /// Pre-validates the forkchoice state and returns early if validation fails.
1020    ///
1021    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1022    /// returned. Returns `None` if validation passes and processing should continue.
1023    fn validate_forkchoice_state(
1024        &mut self,
1025        state: ForkchoiceState,
1026    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1027        if state.head_block_hash.is_zero() {
1028            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1029        }
1030
1031        // Check if the new head hash is connected to any ancestor that we previously marked as
1032        // invalid
1033        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1034        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1035            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1036        }
1037
1038        if !self.backfill_sync_state.is_idle() {
1039            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1040            // exclusive access to the database
1041            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1042            return Ok(Some(OnForkChoiceUpdated::syncing()));
1043        }
1044
1045        Ok(None)
1046    }
1047
1048    /// Handles the case where the forkchoice head is already canonical.
1049    ///
1050    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1051    /// processing is complete. Returns `None` if the head is not canonical and processing
1052    /// should continue.
1053    fn handle_canonical_head(
1054        &self,
1055        state: ForkchoiceState,
1056        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1057        version: EngineApiMessageVersion,
1058    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1059        // Process the forkchoice update by trying to make the head block canonical
1060        //
1061        // We can only process this forkchoice update if:
1062        // - we have the `head` block
1063        // - the head block is part of a chain that is connected to the canonical chain. This
1064        //   includes reorgs.
1065        //
1066        // Performing a FCU involves:
1067        // - marking the FCU's head block as canonical
1068        // - updating in memory state to reflect the new canonical chain
1069        // - updating canonical state trackers
1070        // - emitting a canonicalization event for the new chain (including reorg)
1071        // - if we have payload attributes, delegate them to the payload service
1072
1073        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1074            return Ok(None);
1075        }
1076
1077        trace!(target: "engine::tree", "fcu head hash is already canonical");
1078
1079        // Update the safe and finalized blocks and ensure their values are valid
1080        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1081            // safe or finalized hashes are invalid
1082            return Ok(Some(TreeOutcome::new(outcome)));
1083        }
1084
1085        // Process payload attributes if the head is already canonical
1086        if let Some(attr) = attrs {
1087            let tip = self
1088                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1089                .ok_or_else(|| {
1090                    // If we can't find the canonical block, then something is wrong and we need
1091                    // to return an error
1092                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1093                })?;
1094            // Clone only when we actually need to process the attributes
1095            let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1096            return Ok(Some(TreeOutcome::new(updated)));
1097        }
1098
1099        // The head block is already canonical
1100        let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1101            PayloadStatusEnum::Valid,
1102            Some(state.head_block_hash),
1103        )));
1104        Ok(Some(outcome))
1105    }
1106
1107    /// Applies chain update for the new head block and processes payload attributes.
1108    ///
1109    /// This method handles the case where the forkchoice head differs from our current canonical
1110    /// head. It attempts to make the specified head block canonical by:
1111    /// - Checking if the head is already part of the canonical chain
1112    /// - Applying chain reorganizations (reorgs) if necessary
1113    /// - Processing payload attributes if provided
1114    /// - Returning the appropriate forkchoice update response
1115    ///
1116    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1117    /// Returns `None` if no chain update was needed or possible.
1118    fn apply_chain_update(
1119        &mut self,
1120        state: ForkchoiceState,
1121        attrs: &Option<T::PayloadAttributes>,
1122        version: EngineApiMessageVersion,
1123    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1124        // Check if the head is already part of the canonical chain
1125        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1126            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1127
1128            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1129            // own chain at will, so we need to always trigger a new payload job if requested.
1130            if self.engine_kind.is_opstack() ||
1131                self.config.always_process_payload_attributes_on_canonical_head()
1132            {
1133                if let Some(attr) = attrs {
1134                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1135                    // Clone only when we actually need to process the attributes
1136                    let updated = self.process_payload_attributes(
1137                        attr.clone(),
1138                        &canonical_header,
1139                        state,
1140                        version,
1141                    );
1142                    return Ok(Some(TreeOutcome::new(updated)));
1143                }
1144
1145                // At this point, no alternative block has been triggered, so we need effectively
1146                // unwind the _canonical_ chain to the FCU's head, which is part of the canonical
1147                // chain. We need to update the latest block state to reflect the
1148                // canonical ancestor. This ensures that state providers and the
1149                // transaction pool operate with the correct chain state after
1150                // forkchoice update processing.
1151
1152                if self.config.unwind_canonical_header() {
1153                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1154                }
1155            }
1156
1157            // According to the Engine API specification, client software MAY skip an update of the
1158            // forkchoice state and MUST NOT begin a payload build process if
1159            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1160            // of canonical chain, i.e. the ancestor passed payload validation process
1161            // and deemed `VALID`. In the case of such an event, client software MUST
1162            // return `{payloadStatus: {status: VALID, latestValidHash:
1163            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1164
1165            // The head block is already canonical and we're not processing payload attributes,
1166            // so we're not triggering a payload job and can return right away
1167
1168            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1169                PayloadStatusEnum::Valid,
1170                Some(state.head_block_hash),
1171            )));
1172            return Ok(Some(outcome));
1173        }
1174
1175        // Ensure we can apply a new chain update for the head block
1176        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1177            let tip = chain_update.tip().clone_sealed_header();
1178            self.on_canonical_chain_update(chain_update);
1179
1180            // Update the safe and finalized blocks and ensure their values are valid
1181            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1182                // safe or finalized hashes are invalid
1183                return Ok(Some(TreeOutcome::new(outcome)));
1184            }
1185
1186            if let Some(attr) = attrs {
1187                // Clone only when we actually need to process the attributes
1188                let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1189                return Ok(Some(TreeOutcome::new(updated)));
1190            }
1191
1192            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1193                PayloadStatusEnum::Valid,
1194                Some(state.head_block_hash),
1195            )));
1196            return Ok(Some(outcome));
1197        }
1198
1199        Ok(None)
1200    }
1201
1202    /// Handles the case where the head block is missing and needs to be downloaded.
1203    ///
1204    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1205    /// Returns a `TreeOutcome` with syncing status and download event.
1206    fn handle_missing_block(
1207        &self,
1208        state: ForkchoiceState,
1209    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1210        // We don't have the block to perform the forkchoice update
1211        // We assume the FCU is valid and at least the head is missing,
1212        // so we need to start syncing to it
1213        //
1214        // find the appropriate target to sync to, if we don't have the safe block hash then we
1215        // start syncing to the safe block via backfill first
1216        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1217        // check that safe block is valid and missing
1218        !state.safe_block_hash.is_zero() &&
1219        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1220        {
1221            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1222            state.safe_block_hash
1223        } else {
1224            state.head_block_hash
1225        };
1226
1227        let target = self.lowest_buffered_ancestor_or(target);
1228        trace!(target: "engine::tree", %target, "downloading missing block");
1229
1230        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1231            PayloadStatusEnum::Syncing,
1232        )))
1233        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1234    }
1235
1236    /// Attempts to receive the next engine request.
1237    ///
1238    /// If there's currently no persistence action in progress, this will block until a new request
1239    /// is received. If there's a persistence action in progress, this will try to receive the
1240    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1241    /// received in time.
1242    ///
1243    /// Returns an error if the engine channel is disconnected.
1244    #[expect(clippy::type_complexity)]
1245    fn try_recv_engine_message(
1246        &self,
1247    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1248        if self.persistence_state.in_progress() {
1249            // try to receive the next request with a timeout to not block indefinitely
1250            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1251                Ok(msg) => Ok(Some(msg)),
1252                Err(err) => match err {
1253                    RecvTimeoutError::Timeout => Ok(None),
1254                    RecvTimeoutError::Disconnected => Err(RecvError),
1255                },
1256            }
1257        } else {
1258            self.incoming.recv().map(Some)
1259        }
1260    }
1261
1262    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1263    /// the current persistence action while we're removing blocks.
1264    fn remove_blocks(&mut self, new_tip_num: u64) {
1265        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1266        if new_tip_num < self.persistence_state.last_persisted_block.number {
1267            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1268            let (tx, rx) = oneshot::channel();
1269            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1270            self.persistence_state.start_remove(new_tip_num, rx);
1271        }
1272    }
1273
1274    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1275    /// the current persistence action while we're saving blocks.
1276    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1277        if blocks_to_persist.is_empty() {
1278            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1279            return
1280        }
1281
1282        // NOTE: checked non-empty above
1283        let highest_num_hash = blocks_to_persist
1284            .iter()
1285            .max_by_key(|block| block.recovered_block().number())
1286            .map(|b| b.recovered_block().num_hash())
1287            .expect("Checked non-empty persisting blocks");
1288
1289        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1290        let (tx, rx) = oneshot::channel();
1291        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1292
1293        self.persistence_state.start_save(highest_num_hash, rx);
1294    }
1295
1296    /// Attempts to advance the persistence state.
1297    ///
1298    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1299    /// or send a new persistence action if necessary.
1300    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1301        if self.persistence_state.in_progress() {
1302            let (mut rx, start_time, current_action) = self
1303                .persistence_state
1304                .rx
1305                .take()
1306                .expect("if a persistence task is in progress Receiver must be Some");
1307            // Check if persistence has complete
1308            match rx.try_recv() {
1309                Ok(last_persisted_hash_num) => {
1310                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1311                    let Some(BlockNumHash {
1312                        hash: last_persisted_block_hash,
1313                        number: last_persisted_block_number,
1314                    }) = last_persisted_hash_num
1315                    else {
1316                        // if this happened, then we persisted no blocks because we sent an
1317                        // empty vec of blocks
1318                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1319                        return Ok(())
1320                    };
1321
1322                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1323                    self.persistence_state
1324                        .finish(last_persisted_block_hash, last_persisted_block_number);
1325                    self.on_new_persisted_block()?;
1326                }
1327                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1328                Err(TryRecvError::Empty) => {
1329                    self.persistence_state.rx = Some((rx, start_time, current_action))
1330                }
1331            }
1332        }
1333
1334        if !self.persistence_state.in_progress() {
1335            if let Some(new_tip_num) = self.find_disk_reorg()? {
1336                self.remove_blocks(new_tip_num)
1337            } else if self.should_persist() {
1338                let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1339                self.persist_blocks(blocks_to_persist);
1340            }
1341        }
1342
1343        Ok(())
1344    }
1345
1346    /// Handles a message from the engine.
1347    fn on_engine_message(
1348        &mut self,
1349        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1350    ) -> Result<(), InsertBlockFatalError> {
1351        match msg {
1352            FromEngine::Event(event) => match event {
1353                FromOrchestrator::BackfillSyncStarted => {
1354                    debug!(target: "engine::tree", "received backfill sync started event");
1355                    self.backfill_sync_state = BackfillSyncState::Active;
1356                }
1357                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1358                    self.on_backfill_sync_finished(ctrl)?;
1359                }
1360            },
1361            FromEngine::Request(request) => {
1362                match request {
1363                    EngineApiRequest::InsertExecutedBlock(block) => {
1364                        let block_num_hash = block.recovered_block().num_hash();
1365                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1366                            // outdated block that can be skipped
1367                            return Ok(())
1368                        }
1369
1370                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1371                        let now = Instant::now();
1372
1373                        // if the parent is the canonical head, we can insert the block as the
1374                        // pending block
1375                        if self.state.tree_state.canonical_block_hash() ==
1376                            block.recovered_block().parent_hash()
1377                        {
1378                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1379                            self.canonical_in_memory_state.set_pending_block(block.clone());
1380                        }
1381
1382                        self.state.tree_state.insert_executed(block.clone());
1383                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1384                        self.emit_event(EngineApiEvent::BeaconConsensus(
1385                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1386                        ));
1387                    }
1388                    EngineApiRequest::Beacon(request) => {
1389                        match request {
1390                            BeaconEngineMessage::ForkchoiceUpdated {
1391                                state,
1392                                payload_attrs,
1393                                tx,
1394                                version,
1395                            } => {
1396                                let mut output =
1397                                    self.on_forkchoice_updated(state, payload_attrs, version);
1398
1399                                if let Ok(res) = &mut output {
1400                                    // track last received forkchoice state
1401                                    self.state
1402                                        .forkchoice_state_tracker
1403                                        .set_latest(state, res.outcome.forkchoice_status());
1404
1405                                    // emit an event about the handled FCU
1406                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1407                                        state,
1408                                        res.outcome.forkchoice_status(),
1409                                    ));
1410
1411                                    // handle the event if any
1412                                    self.on_maybe_tree_event(res.event.take())?;
1413                                }
1414
1415                                if let Err(err) =
1416                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1417                                {
1418                                    self.metrics
1419                                        .engine
1420                                        .failed_forkchoice_updated_response_deliveries
1421                                        .increment(1);
1422                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1423                                }
1424                            }
1425                            BeaconEngineMessage::NewPayload { payload, tx } => {
1426                                let mut output = self.on_new_payload(payload);
1427
1428                                let maybe_event =
1429                                    output.as_mut().ok().and_then(|out| out.event.take());
1430
1431                                // emit response
1432                                if let Err(err) =
1433                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1434                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1435                                    }))
1436                                {
1437                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1438                                    self.metrics
1439                                        .engine
1440                                        .failed_new_payload_response_deliveries
1441                                        .increment(1);
1442                                }
1443
1444                                // handle the event if any
1445                                self.on_maybe_tree_event(maybe_event)?;
1446                            }
1447                        }
1448                    }
1449                }
1450            }
1451            FromEngine::DownloadedBlocks(blocks) => {
1452                if let Some(event) = self.on_downloaded(blocks)? {
1453                    self.on_tree_event(event)?;
1454                }
1455            }
1456        }
1457        Ok(())
1458    }
1459
1460    /// Invoked if the backfill sync has finished to target.
1461    ///
1462    /// At this point we consider the block synced to the backfill target.
1463    ///
1464    /// Checks the tracked finalized block against the block on disk and requests another backfill
1465    /// run if the distance to the tip exceeds the threshold for another backfill run.
1466    ///
1467    /// This will also do the necessary housekeeping of the tree state, this includes:
1468    ///  - removing all blocks below the backfill height
1469    ///  - resetting the canonical in-memory state
1470    ///
1471    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1472    /// target block.
1473    fn on_backfill_sync_finished(
1474        &mut self,
1475        ctrl: ControlFlow,
1476    ) -> Result<(), InsertBlockFatalError> {
1477        debug!(target: "engine::tree", "received backfill sync finished event");
1478        self.backfill_sync_state = BackfillSyncState::Idle;
1479
1480        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1481        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1482            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1483            // update the `invalid_headers` cache with the new invalid header
1484            self.state.invalid_headers.insert(**bad_block);
1485
1486            // if this was an unwind then the target is the new height
1487            Some(*target)
1488        } else {
1489            // backfill height is the block number that the backfill finished at
1490            ctrl.block_number()
1491        };
1492
1493        // backfill height is the block number that the backfill finished at
1494        let Some(backfill_height) = backfill_height else { return Ok(()) };
1495
1496        // state house keeping after backfill sync
1497        // remove all executed blocks below the backfill height
1498        //
1499        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1500        // before that
1501        let Some(backfill_num_hash) = self
1502            .provider
1503            .block_hash(backfill_height)?
1504            .map(|hash| BlockNumHash { hash, number: backfill_height })
1505        else {
1506            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1507            return Ok(())
1508        };
1509
1510        if ctrl.is_unwind() {
1511            // the node reset so we need to clear everything above that height so that backfill
1512            // height is the new canonical block.
1513            self.state.tree_state.reset(backfill_num_hash)
1514        } else {
1515            self.state.tree_state.remove_until(
1516                backfill_num_hash,
1517                self.persistence_state.last_persisted_block.hash,
1518                Some(backfill_num_hash),
1519            );
1520        }
1521
1522        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1523        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1524
1525        // remove all buffered blocks below the backfill height
1526        self.state.buffer.remove_old_blocks(backfill_height);
1527        // we remove all entries because now we're synced to the backfill target and consider this
1528        // the canonical chain
1529        self.canonical_in_memory_state.clear_state();
1530
1531        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1532            // update the tracked chain height, after backfill sync both the canonical height and
1533            // persisted height are the same
1534            self.state.tree_state.set_canonical_head(new_head.num_hash());
1535            self.persistence_state.finish(new_head.hash(), new_head.number());
1536
1537            // update the tracked canonical head
1538            self.canonical_in_memory_state.set_canonical_head(new_head);
1539        }
1540
1541        // check if we need to run backfill again by comparing the most recent finalized height to
1542        // the backfill height
1543        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1544        else {
1545            return Ok(())
1546        };
1547        if sync_target_state.finalized_block_hash.is_zero() {
1548            // no finalized block, can't check distance
1549            return Ok(())
1550        }
1551        // get the block number of the finalized block, if we have it
1552        let newest_finalized = self
1553            .state
1554            .buffer
1555            .block(&sync_target_state.finalized_block_hash)
1556            .map(|block| block.number());
1557
1558        // The block number that the backfill finished at - if the progress or newest
1559        // finalized is None then we can't check the distance anyways.
1560        //
1561        // If both are Some, we perform another distance check and return the desired
1562        // backfill target
1563        if let Some(backfill_target) =
1564            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1565                // Determines whether or not we should run backfill again, in case
1566                // the new gap is still large enough and requires running backfill again
1567                self.backfill_sync_target(progress, finalized_number, None)
1568            })
1569        {
1570            // request another backfill run
1571            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1572                backfill_target.into(),
1573            )));
1574            return Ok(())
1575        };
1576
1577        // Check if there are more blocks to sync between current head and FCU target
1578        if let Some(lowest_buffered) =
1579            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1580        {
1581            let current_head_num = self.state.tree_state.current_canonical_head.number;
1582            let target_head_num = lowest_buffered.number();
1583
1584            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1585            {
1586                // There are blocks between current head and FCU target, download them
1587                debug!(
1588                    target: "engine::tree",
1589                    %current_head_num,
1590                    %target_head_num,
1591                    %distance,
1592                    "Backfill complete, downloading remaining blocks to reach FCU target"
1593                );
1594
1595                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1596                    lowest_buffered.parent_hash(),
1597                    distance,
1598                )));
1599                return Ok(());
1600            }
1601        }
1602
1603        // try to close the gap by executing buffered blocks that are child blocks of the new head
1604        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1605    }
1606
1607    /// Attempts to make the given target canonical.
1608    ///
1609    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1610    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1611        if let Some(chain_update) = self.on_new_head(target)? {
1612            self.on_canonical_chain_update(chain_update);
1613        }
1614
1615        Ok(())
1616    }
1617
1618    /// Convenience function to handle an optional tree event.
1619    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1620        if let Some(event) = event {
1621            self.on_tree_event(event)?;
1622        }
1623
1624        Ok(())
1625    }
1626
1627    /// Handles a tree event.
1628    ///
1629    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1630    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1631        match event {
1632            TreeEvent::TreeAction(action) => match action {
1633                TreeAction::MakeCanonical { sync_target_head } => {
1634                    self.make_canonical(sync_target_head)?;
1635                }
1636            },
1637            TreeEvent::BackfillAction(action) => {
1638                self.emit_event(EngineApiEvent::BackfillAction(action));
1639            }
1640            TreeEvent::Download(action) => {
1641                self.emit_event(EngineApiEvent::Download(action));
1642            }
1643        }
1644
1645        Ok(())
1646    }
1647
1648    /// Emits an outgoing event to the engine.
1649    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1650        let event = event.into();
1651
1652        if event.is_backfill_action() {
1653            debug_assert_eq!(
1654                self.backfill_sync_state,
1655                BackfillSyncState::Idle,
1656                "backfill action should only be emitted when backfill is idle"
1657            );
1658
1659            if self.persistence_state.in_progress() {
1660                // backfill sync and persisting data are mutually exclusive, so we can't start
1661                // backfill while we're still persisting
1662                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1663                return
1664            }
1665
1666            self.backfill_sync_state = BackfillSyncState::Pending;
1667            self.metrics.engine.pipeline_runs.increment(1);
1668            debug!(target: "engine::tree", "emitting backfill action event");
1669        }
1670
1671        let _ = self.outgoing.send(event).inspect_err(
1672            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1673        );
1674    }
1675
1676    /// Returns true if the canonical chain length minus the last persisted
1677    /// block is greater than or equal to the persistence threshold and
1678    /// backfill is not running.
1679    pub const fn should_persist(&self) -> bool {
1680        if !self.backfill_sync_state.is_idle() {
1681            // can't persist if backfill is running
1682            return false
1683        }
1684
1685        let min_block = self.persistence_state.last_persisted_block.number;
1686        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1687            self.config.persistence_threshold()
1688    }
1689
1690    /// Returns a batch of consecutive canonical blocks to persist in the range
1691    /// `(last_persisted_number .. canonical_head - threshold]`. The expected
1692    /// order is oldest -> newest.
1693    fn get_canonical_blocks_to_persist(
1694        &self,
1695    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1696        // We will calculate the state root using the database, so we need to be sure there are no
1697        // changes
1698        debug_assert!(!self.persistence_state.in_progress());
1699
1700        let mut blocks_to_persist = Vec::new();
1701        let mut current_hash = self.state.tree_state.canonical_block_hash();
1702        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1703        let canonical_head_number = self.state.tree_state.canonical_block_number();
1704
1705        // Persist only up to block buffer target
1706        let target_number =
1707            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1708
1709        debug!(
1710            target: "engine::tree",
1711            ?current_hash,
1712            ?last_persisted_number,
1713            ?canonical_head_number,
1714            ?target_number,
1715            "Returning canonical blocks to persist"
1716        );
1717        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1718            if block.recovered_block().number() <= last_persisted_number {
1719                break;
1720            }
1721
1722            if block.recovered_block().number() <= target_number {
1723                blocks_to_persist.push(block.clone());
1724            }
1725
1726            current_hash = block.recovered_block().parent_hash();
1727        }
1728
1729        // Reverse the order so that the oldest block comes first
1730        blocks_to_persist.reverse();
1731
1732        Ok(blocks_to_persist)
1733    }
1734
1735    /// This clears the blocks from the in-memory tree state that have been persisted to the
1736    /// database.
1737    ///
1738    /// This also updates the canonical in-memory state to reflect the newest persisted block
1739    /// height.
1740    ///
1741    /// Assumes that `finish` has been called on the `persistence_state` at least once
1742    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1743        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1744        // state.
1745        if let Some(remove_above) = self.find_disk_reorg()? {
1746            self.remove_blocks(remove_above);
1747            return Ok(())
1748        }
1749
1750        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1751        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1752        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1753            number: self.persistence_state.last_persisted_block.number,
1754            hash: self.persistence_state.last_persisted_block.hash,
1755        });
1756        Ok(())
1757    }
1758
1759    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1760    ///
1761    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1762    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1763    /// has in memory.
1764    ///
1765    /// For finalized blocks, this will return `None`.
1766    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1767        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1768        // check memory first
1769        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1770            return Ok(Some(block.clone()))
1771        }
1772
1773        let (block, senders) = self
1774            .provider
1775            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1776            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1777            .split_sealed();
1778        let execution_output = self
1779            .provider
1780            .get_state(block.header().number())?
1781            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1782        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1783        let trie_updates = self.provider.get_block_trie_updates(block.number())?;
1784
1785        Ok(Some(ExecutedBlock {
1786            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1787            execution_output: Arc::new(execution_output),
1788            hashed_state: Arc::new(hashed_state),
1789            trie_updates: Arc::new(trie_updates.into()),
1790        }))
1791    }
1792
1793    /// Return sealed block header from in-memory state or database by hash.
1794    fn sealed_header_by_hash(
1795        &self,
1796        hash: B256,
1797    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1798        // check memory first
1799        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1800
1801        if header.is_some() {
1802            Ok(header)
1803        } else {
1804            self.provider.sealed_header_by_hash(hash)
1805        }
1806    }
1807
1808    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1809    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1810    /// not exist in the buffer, this returns the hash that is passed in.
1811    ///
1812    /// Returns the parent hash of the block itself if the block is buffered and has no other
1813    /// buffered ancestors.
1814    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1815        self.state
1816            .buffer
1817            .lowest_ancestor(&hash)
1818            .map(|block| block.parent_hash())
1819            .unwrap_or_else(|| hash)
1820    }
1821
1822    /// If validation fails, the response MUST contain the latest valid hash:
1823    ///
1824    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1825    ///     conditions:
1826    ///     - It is fully validated and deemed VALID
1827    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1828    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1829    ///     conditions are satisfied by a `PoW` block.
1830    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1831    ///     the above conditions.
1832    fn latest_valid_hash_for_invalid_payload(
1833        &mut self,
1834        parent_hash: B256,
1835    ) -> ProviderResult<Option<B256>> {
1836        // Check if parent exists in side chain or in canonical chain.
1837        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1838            return Ok(Some(parent_hash))
1839        }
1840
1841        // iterate over ancestors in the invalid cache
1842        // until we encounter the first valid ancestor
1843        let mut current_hash = parent_hash;
1844        let mut current_block = self.state.invalid_headers.get(&current_hash);
1845        while let Some(block_with_parent) = current_block {
1846            current_hash = block_with_parent.parent;
1847            current_block = self.state.invalid_headers.get(&current_hash);
1848
1849            // If current_header is None, then the current_hash does not have an invalid
1850            // ancestor in the cache, check its presence in blockchain tree
1851            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1852                return Ok(Some(current_hash))
1853            }
1854        }
1855        Ok(None)
1856    }
1857
1858    /// Prepares the invalid payload response for the given hash, checking the
1859    /// database for the parent hash and populating the payload status with the latest valid hash
1860    /// according to the engine api spec.
1861    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1862        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1863        // PoW block, which we need to identify by looking at the parent's block difficulty
1864        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1865            !parent.difficulty().is_zero()
1866        {
1867            parent_hash = B256::ZERO;
1868        }
1869
1870        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1871        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1872            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1873        })
1874        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1875    }
1876
1877    /// Returns true if the given hash is the last received sync target block.
1878    ///
1879    /// See [`ForkchoiceStateTracker::sync_target_state`]
1880    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1881        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1882            return target.head_block_hash == block_hash
1883        }
1884        false
1885    }
1886
1887    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1888    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1889    ///
1890    /// Returns a payload status response according to the engine API spec if the block is known to
1891    /// be invalid.
1892    fn check_invalid_ancestor_with_head(
1893        &mut self,
1894        check: B256,
1895        head: &SealedBlock<N::Block>,
1896    ) -> ProviderResult<Option<PayloadStatus>> {
1897        // check if the check hash was previously marked as invalid
1898        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1899
1900        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1901    }
1902
1903    /// Invoked when a new payload received is invalid.
1904    fn on_invalid_new_payload(
1905        &mut self,
1906        head: SealedBlock<N::Block>,
1907        invalid: BlockWithParent,
1908    ) -> ProviderResult<PayloadStatus> {
1909        // populate the latest valid hash field
1910        let status = self.prepare_invalid_response(invalid.parent)?;
1911
1912        // insert the head block into the invalid header cache
1913        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1914        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1915
1916        Ok(status)
1917    }
1918
1919    /// Finds any invalid ancestor for the given payload.
1920    ///
1921    /// This function walks up the chain of buffered ancestors from the payload's block
1922    /// hash and checks if any ancestor is marked as invalid in the tree state.
1923    ///
1924    /// The check works by:
1925    /// 1. Finding the lowest buffered ancestor for the given block hash
1926    /// 2. If the ancestor is the same as the block hash itself, using the parent hash instead
1927    /// 3. Checking if this ancestor is in the `invalid_headers` map
1928    ///
1929    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
1930    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1931        let parent_hash = payload.parent_hash();
1932        let block_hash = payload.block_hash();
1933        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1934        if lowest_buffered_ancestor == block_hash {
1935            lowest_buffered_ancestor = parent_hash;
1936        }
1937
1938        // Check if the block has an invalid ancestor
1939        self.state.invalid_headers.get(&lowest_buffered_ancestor)
1940    }
1941
1942    /// Handles a payload that has an invalid ancestor.
1943    ///
1944    /// This function validates the payload and processes it according to whether it's
1945    /// well-formed or malformed:
1946    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
1947    ///    known-bad block, which violates consensus rules
1948    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
1949    ///    validated due to its own structural issues
1950    fn handle_invalid_ancestor_payload(
1951        &mut self,
1952        payload: T::ExecutionData,
1953        invalid: BlockWithParent,
1954    ) -> Result<PayloadStatus, InsertBlockFatalError> {
1955        let parent_hash = payload.parent_hash();
1956
1957        // Here we might have 2 cases
1958        // 1. the block is well formed and indeed links to an invalid header, meaning we should
1959        //    remember it as invalid
1960        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
1961        //    an error and forget it
1962        let block = match self.payload_validator.ensure_well_formed_payload(payload) {
1963            Ok(block) => block,
1964            Err(error) => return Ok(self.on_new_payload_error(error, parent_hash)?),
1965        };
1966
1967        Ok(self.on_invalid_new_payload(block.into_sealed_block(), invalid)?)
1968    }
1969
1970    /// Checks if the given `head` points to an invalid header, which requires a specific response
1971    /// to a forkchoice update.
1972    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1973        // check if the head was previously marked as invalid
1974        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1975
1976        // Try to prepare invalid response, but handle errors gracefully
1977        match self.prepare_invalid_response(header.parent) {
1978            Ok(status) => Ok(Some(status)),
1979            Err(err) => {
1980                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
1981                // Return a basic invalid status without latest valid hash
1982                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1983                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1984                })))
1985            }
1986        }
1987    }
1988
1989    /// Validate if block is correct and satisfies all the consensus rules that concern the header
1990    /// and block body itself.
1991    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1992        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1993            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1994            return Err(e)
1995        }
1996
1997        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1998            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1999            return Err(e)
2000        }
2001
2002        Ok(())
2003    }
2004
2005    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2006    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2007    fn try_connect_buffered_blocks(
2008        &mut self,
2009        parent: BlockNumHash,
2010    ) -> Result<(), InsertBlockFatalError> {
2011        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2012
2013        if blocks.is_empty() {
2014            // nothing to append
2015            return Ok(())
2016        }
2017
2018        let now = Instant::now();
2019        let block_count = blocks.len();
2020        for child in blocks {
2021            let child_num_hash = child.num_hash();
2022            match self.insert_block(child) {
2023                Ok(res) => {
2024                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2025                    if self.is_sync_target_head(child_num_hash.hash) &&
2026                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2027                    {
2028                        self.make_canonical(child_num_hash.hash)?;
2029                    }
2030                }
2031                Err(err) => {
2032                    if let InsertPayloadError::Block(err) = err {
2033                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2034                        if let Err(fatal) = self.on_insert_block_error(err) {
2035                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2036                            return Err(fatal)
2037                        }
2038                    }
2039                }
2040            }
2041        }
2042
2043        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2044        Ok(())
2045    }
2046
2047    /// Pre-validates the block and inserts it into the buffer.
2048    fn buffer_block(
2049        &mut self,
2050        block: RecoveredBlock<N::Block>,
2051    ) -> Result<(), InsertBlockError<N::Block>> {
2052        if let Err(err) = self.validate_block(&block) {
2053            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
2054        }
2055        self.state.buffer.insert_block(block);
2056        Ok(())
2057    }
2058
2059    /// Returns true if the distance from the local tip to the block is greater than the configured
2060    /// threshold.
2061    ///
2062    /// If the `local_tip` is greater than the `block`, then this will return false.
2063    #[inline]
2064    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2065        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2066    }
2067
2068    /// Returns how far the local tip is from the given block. If the local tip is at the same
2069    /// height or its block number is greater than the given block, this returns None.
2070    #[inline]
2071    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2072        if block > local_tip {
2073            Some(block - local_tip)
2074        } else {
2075            None
2076        }
2077    }
2078
2079    /// Returns the target hash to sync to if the distance from the local tip to the block is
2080    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2081    /// that block already).
2082    ///
2083    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2084    /// (missing) finalized block.
2085    fn backfill_sync_target(
2086        &self,
2087        canonical_tip_num: u64,
2088        target_block_number: u64,
2089        downloaded_block: Option<BlockNumHash>,
2090    ) -> Option<B256> {
2091        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2092
2093        // check if the downloaded block is the tracked finalized block
2094        let exceeds_backfill_threshold =
2095            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2096                // if we downloaded the finalized block we can now check how far we're off
2097                (Some(downloaded_block), Some(state))
2098                    if downloaded_block.hash == state.finalized_block_hash =>
2099                {
2100                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2101                }
2102                _ => match sync_target_state
2103                    .as_ref()
2104                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2105                {
2106                    Some(buffered_finalized) => {
2107                        // if we have buffered the finalized block, we should check how far we're
2108                        // off
2109                        self.exceeds_backfill_run_threshold(
2110                            canonical_tip_num,
2111                            buffered_finalized.number(),
2112                        )
2113                    }
2114                    None => {
2115                        // check if the distance exceeds the threshold for backfill sync
2116                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2117                    }
2118                },
2119            };
2120
2121        // if the number of missing blocks is greater than the max, trigger backfill
2122        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2123            // if we have already canonicalized the finalized block, we should skip backfill
2124            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2125                Err(err) => {
2126                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2127                }
2128                Ok(None) => {
2129                    // ensure the finalized block is known (not the zero hash)
2130                    if !state.finalized_block_hash.is_zero() {
2131                        // we don't have the block yet and the distance exceeds the allowed
2132                        // threshold
2133                        return Some(state.finalized_block_hash)
2134                    }
2135
2136                    // OPTIMISTIC SYNCING
2137                    //
2138                    // It can happen when the node is doing an
2139                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2140                    // but is expecting the EL to sync as high
2141                    // as possible before finalizing.
2142                    //
2143                    // This usually doesn't happen on ETH mainnet since CLs use the more
2144                    // secure checkpoint syncing.
2145                    //
2146                    // However, optimism chains will do this. The risk of a reorg is however
2147                    // low.
2148                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2149                    return Some(state.head_block_hash)
2150                }
2151                Ok(Some(_)) => {
2152                    // we're fully synced to the finalized block
2153                }
2154            }
2155        }
2156
2157        None
2158    }
2159
2160    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2161    /// happen if a reorg is happening while we are persisting a block.
2162    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2163        let mut canonical = self.state.tree_state.current_canonical_head;
2164        let mut persisted = self.persistence_state.last_persisted_block;
2165
2166        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2167            Ok(self
2168                .sealed_header_by_hash(num_hash.hash)?
2169                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2170                .parent_num_hash())
2171        };
2172
2173        // Happy path, canonical chain is ahead or equal to persisted chain.
2174        // Walk canonical chain back to make sure that it connects to persisted chain.
2175        while canonical.number > persisted.number {
2176            canonical = parent_num_hash(canonical)?;
2177        }
2178
2179        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2180        if canonical == persisted {
2181            return Ok(None);
2182        }
2183
2184        // At this point, we know that `persisted` block can't be reached by walking the canonical
2185        // chain back. In this case we need to truncate it to the first canonical block it connects
2186        // to.
2187
2188        // Firstly, walk back until we reach the same height as `canonical`.
2189        while persisted.number > canonical.number {
2190            persisted = parent_num_hash(persisted)?;
2191        }
2192
2193        debug_assert_eq!(persisted.number, canonical.number);
2194
2195        // Now walk both chains back until we find a common ancestor.
2196        while persisted.hash != canonical.hash {
2197            canonical = parent_num_hash(canonical)?;
2198            persisted = parent_num_hash(persisted)?;
2199        }
2200
2201        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2202
2203        Ok(Some(persisted.number))
2204    }
2205
2206    /// Invoked when we the canonical chain has been updated.
2207    ///
2208    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2209    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2210        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2211        let start = Instant::now();
2212
2213        // update the tracked canonical head
2214        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2215
2216        let tip = chain_update.tip().clone_sealed_header();
2217        let notification = chain_update.to_chain_notification();
2218
2219        // reinsert any missing reorged blocks
2220        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2221            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2222            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2223            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2224
2225            self.update_reorg_metrics(old.len());
2226            self.reinsert_reorged_blocks(new.clone());
2227            self.reinsert_reorged_blocks(old.clone());
2228        }
2229
2230        // update the tracked in-memory state with the new chain
2231        self.canonical_in_memory_state.update_chain(chain_update);
2232        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2233
2234        // Update metrics based on new tip
2235        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2236
2237        // sends an event to all active listeners about the new canonical chain
2238        self.canonical_in_memory_state.notify_canon_state(notification);
2239
2240        // emit event
2241        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2242            Box::new(tip),
2243            start.elapsed(),
2244        ));
2245    }
2246
2247    /// This updates metrics based on the given reorg length.
2248    fn update_reorg_metrics(&self, old_chain_length: usize) {
2249        self.metrics.tree.reorgs.increment(1);
2250        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2251    }
2252
2253    /// This reinserts any blocks in the new chain that do not already exist in the tree
2254    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2255        for block in new_chain {
2256            if self
2257                .state
2258                .tree_state
2259                .executed_block_by_hash(block.recovered_block().hash())
2260                .is_none()
2261            {
2262                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2263                self.state.tree_state.insert_executed(block);
2264            }
2265        }
2266    }
2267
2268    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2269    ///
2270    /// This mainly compares the missing parent of the downloaded block with the current canonical
2271    /// tip, and decides whether or not backfill sync should be triggered.
2272    fn on_disconnected_downloaded_block(
2273        &self,
2274        downloaded_block: BlockNumHash,
2275        missing_parent: BlockNumHash,
2276        head: BlockNumHash,
2277    ) -> Option<TreeEvent> {
2278        // compare the missing parent with the canonical tip
2279        if let Some(target) =
2280            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2281        {
2282            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2283            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2284        }
2285
2286        // continue downloading the missing parent
2287        //
2288        // this happens if either:
2289        //  * the missing parent block num < canonical tip num
2290        //    * this case represents a missing block on a fork that is shorter than the canonical
2291        //      chain
2292        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2293        //    less than the backfill threshold
2294        //    * this case represents a potentially long range of blocks to download and execute
2295        let request = if let Some(distance) =
2296            self.distance_from_local_tip(head.number, missing_parent.number)
2297        {
2298            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2299            DownloadRequest::BlockRange(missing_parent.hash, distance)
2300        } else {
2301            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2302            // This happens when the missing parent is on an outdated
2303            // sidechain and we can only download the missing block itself
2304            DownloadRequest::single_block(missing_parent.hash)
2305        };
2306
2307        Some(TreeEvent::Download(request))
2308    }
2309
2310    /// Invoked with a block downloaded from the network
2311    ///
2312    /// Returns an event with the appropriate action to take, such as:
2313    ///  - download more missing blocks
2314    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2315    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2316    fn on_downloaded_block(
2317        &mut self,
2318        block: RecoveredBlock<N::Block>,
2319    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2320        let block_num_hash = block.num_hash();
2321        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2322        if self
2323            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2324            .is_some()
2325        {
2326            return Ok(None)
2327        }
2328
2329        if !self.backfill_sync_state.is_idle() {
2330            return Ok(None)
2331        }
2332
2333        // try to append the block
2334        match self.insert_block(block) {
2335            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2336                if self.is_sync_target_head(block_num_hash.hash) {
2337                    trace!(target: "engine::tree", "appended downloaded sync target block");
2338
2339                    // we just inserted the current sync target block, we can try to make it
2340                    // canonical
2341                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2342                        sync_target_head: block_num_hash.hash,
2343                    })))
2344                }
2345                trace!(target: "engine::tree", "appended downloaded block");
2346                self.try_connect_buffered_blocks(block_num_hash)?;
2347            }
2348            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2349                // block is not connected to the canonical head, we need to download
2350                // its missing branch first
2351                return Ok(self.on_disconnected_downloaded_block(
2352                    block_num_hash,
2353                    missing_ancestor,
2354                    head,
2355                ))
2356            }
2357            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2358                trace!(target: "engine::tree", "downloaded block already executed");
2359            }
2360            Err(err) => {
2361                if let InsertPayloadError::Block(err) = err {
2362                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2363                    if let Err(fatal) = self.on_insert_block_error(err) {
2364                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2365                        return Err(fatal)
2366                    }
2367                }
2368            }
2369        }
2370        Ok(None)
2371    }
2372
2373    /// Inserts a payload into the tree and executes it.
2374    ///
2375    /// This function validates the payload's basic structure, then executes it using the
2376    /// payload validator. The execution includes running all transactions in the payload
2377    /// and validating the resulting state transitions.
2378    ///
2379    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2380    /// or `InsertPayloadError` if validation or execution failed.
2381    fn insert_payload(
2382        &mut self,
2383        payload: T::ExecutionData,
2384    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2385        self.insert_block_or_payload(
2386            payload.block_with_parent(),
2387            payload,
2388            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2389            |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2390        )
2391    }
2392
2393    fn insert_block(
2394        &mut self,
2395        block: RecoveredBlock<N::Block>,
2396    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2397        self.insert_block_or_payload(
2398            block.block_with_parent(),
2399            block,
2400            |validator, block, ctx| validator.validate_block(block, ctx),
2401            |_, block| Ok(block),
2402        )
2403    }
2404
2405    /// Inserts a block or payload into the blockchain tree with full execution.
2406    ///
2407    /// This is a generic function that handles both blocks and payloads by accepting
2408    /// a block identifier, input data, and execution/validation functions. It performs
2409    /// comprehensive checks and execution:
2410    ///
2411    /// - Validates that the block doesn't already exist in the tree
2412    /// - Ensures parent state is available, buffering if necessary
2413    /// - Executes the block/payload using the provided execute function
2414    /// - Handles both canonical and fork chain insertions
2415    /// - Updates pending block state when appropriate
2416    /// - Emits consensus engine events and records metrics
2417    ///
2418    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2419    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2420    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2421    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2422    fn insert_block_or_payload<Input, Err>(
2423        &mut self,
2424        block_id: BlockWithParent,
2425        input: Input,
2426        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2427        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2428    ) -> Result<InsertPayloadOk, Err>
2429    where
2430        Err: From<InsertBlockError<N::Block>>,
2431    {
2432        let block_insert_start = Instant::now();
2433        let block_num_hash = block_id.block;
2434        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2435
2436        match self.sealed_header_by_hash(block_num_hash.hash) {
2437            Err(err) => {
2438                let block = convert_to_block(self, input)?;
2439                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2440            }
2441            Ok(Some(_)) => {
2442                // We now assume that we already have this block in the tree. However, we need to
2443                // run the conversion to ensure that the block hash is valid.
2444                convert_to_block(self, input)?;
2445                return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2446            }
2447            _ => {}
2448        };
2449
2450        // Ensure that the parent state is available.
2451        match self.state_provider_builder(block_id.parent) {
2452            Err(err) => {
2453                let block = convert_to_block(self, input)?;
2454                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2455            }
2456            Ok(None) => {
2457                let block = convert_to_block(self, input)?;
2458
2459                // we don't have the state required to execute this block, buffering it and find the
2460                // missing parent block
2461                let missing_ancestor = self
2462                    .state
2463                    .buffer
2464                    .lowest_ancestor(&block.parent_hash())
2465                    .map(|block| block.parent_num_hash())
2466                    .unwrap_or_else(|| block.parent_num_hash());
2467
2468                self.state.buffer.insert_block(block);
2469
2470                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2471                    head: self.state.tree_state.current_canonical_head,
2472                    missing_ancestor,
2473                }))
2474            }
2475            Ok(Some(_)) => {}
2476        }
2477
2478        // determine whether we are on a fork chain
2479        let is_fork = match self.is_fork(block_id) {
2480            Err(err) => {
2481                let block = convert_to_block(self, input)?;
2482                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2483            }
2484            Ok(is_fork) => is_fork,
2485        };
2486
2487        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2488
2489        let start = Instant::now();
2490
2491        let executed = execute(&mut self.payload_validator, input, ctx)?;
2492
2493        // if the parent is the canonical head, we can insert the block as the pending block
2494        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2495        {
2496            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2497            self.canonical_in_memory_state.set_pending_block(executed.clone());
2498        }
2499
2500        self.state.tree_state.insert_executed(executed.clone());
2501        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2502
2503        // emit insert event
2504        let elapsed = start.elapsed();
2505        let engine_event = if is_fork {
2506            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2507        } else {
2508            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2509        };
2510        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2511
2512        self.metrics
2513            .engine
2514            .block_insert_total_duration
2515            .record(block_insert_start.elapsed().as_secs_f64());
2516        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2517        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2518    }
2519
2520    /// Handles an error that occurred while inserting a block.
2521    ///
2522    /// If this is a validation error this will mark the block as invalid.
2523    ///
2524    /// Returns the proper payload status response if the block is invalid.
2525    fn on_insert_block_error(
2526        &mut self,
2527        error: InsertBlockError<N::Block>,
2528    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2529        let (block, error) = error.split();
2530
2531        // if invalid block, we check the validation error. Otherwise return the fatal
2532        // error.
2533        let validation_err = error.ensure_validation_error()?;
2534
2535        // If the error was due to an invalid payload, the payload is added to the
2536        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2537        // returned.
2538        warn!(
2539            target: "engine::tree",
2540            invalid_hash=%block.hash(),
2541            invalid_number=block.number(),
2542            %validation_err,
2543            "Invalid block error on new payload",
2544        );
2545        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2546
2547        // keep track of the invalid header
2548        self.state.invalid_headers.insert(block.block_with_parent());
2549        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2550            Box::new(block),
2551        )));
2552
2553        Ok(PayloadStatus::new(
2554            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2555            latest_valid_hash,
2556        ))
2557    }
2558
2559    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2560    fn on_new_payload_error(
2561        &mut self,
2562        error: NewPayloadError,
2563        parent_hash: B256,
2564    ) -> ProviderResult<PayloadStatus> {
2565        error!(target: "engine::tree", %error, "Invalid payload");
2566        // we need to convert the error to a payload status (response to the CL)
2567
2568        let latest_valid_hash =
2569            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2570                // Engine-API rules:
2571                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2572                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2573                None
2574            } else {
2575                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2576            };
2577
2578        let status = PayloadStatusEnum::from(error);
2579        Ok(PayloadStatus::new(status, latest_valid_hash))
2580    }
2581
2582    /// Attempts to find the header for the given block hash if it is canonical.
2583    pub fn find_canonical_header(
2584        &self,
2585        hash: B256,
2586    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2587        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2588
2589        if canonical.is_none() {
2590            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2591        }
2592
2593        Ok(canonical)
2594    }
2595
2596    /// Updates the tracked finalized block if we have it.
2597    fn update_finalized_block(
2598        &self,
2599        finalized_block_hash: B256,
2600    ) -> Result<(), OnForkChoiceUpdated> {
2601        if finalized_block_hash.is_zero() {
2602            return Ok(())
2603        }
2604
2605        match self.find_canonical_header(finalized_block_hash) {
2606            Ok(None) => {
2607                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2608                // if the finalized block is not known, we can't update the finalized block
2609                return Err(OnForkChoiceUpdated::invalid_state())
2610            }
2611            Ok(Some(finalized)) => {
2612                if Some(finalized.num_hash()) !=
2613                    self.canonical_in_memory_state.get_finalized_num_hash()
2614                {
2615                    // we're also persisting the finalized block on disk so we can reload it on
2616                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2617                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2618                    self.canonical_in_memory_state.set_finalized(finalized.clone());
2619                    // Update finalized block height metric
2620                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2621                }
2622            }
2623            Err(err) => {
2624                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2625            }
2626        }
2627
2628        Ok(())
2629    }
2630
2631    /// Updates the tracked safe block if we have it
2632    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2633        if safe_block_hash.is_zero() {
2634            return Ok(())
2635        }
2636
2637        match self.find_canonical_header(safe_block_hash) {
2638            Ok(None) => {
2639                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2640                // if the safe block is not known, we can't update the safe block
2641                return Err(OnForkChoiceUpdated::invalid_state())
2642            }
2643            Ok(Some(safe)) => {
2644                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2645                    // we're also persisting the safe block on disk so we can reload it on
2646                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2647                    let _ = self.persistence.save_safe_block_number(safe.number());
2648                    self.canonical_in_memory_state.set_safe(safe.clone());
2649                    // Update safe block height metric
2650                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
2651                }
2652            }
2653            Err(err) => {
2654                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2655            }
2656        }
2657
2658        Ok(())
2659    }
2660
2661    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2662    /// made canonical.
2663    ///
2664    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2665    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2666    ///
2667    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2668    /// are consistent with the head block.
2669    fn ensure_consistent_forkchoice_state(
2670        &self,
2671        state: ForkchoiceState,
2672    ) -> Result<(), OnForkChoiceUpdated> {
2673        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2674        // after the head block is canonicalized.
2675        //
2676        // This ensures that the finalized block is consistent with the head block, i.e. the
2677        // finalized block is an ancestor of the head block.
2678        self.update_finalized_block(state.finalized_block_hash)?;
2679
2680        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2681        // after the head block is canonicalized.
2682        //
2683        // This ensures that the safe block is consistent with the head block, i.e. the safe
2684        // block is an ancestor of the head block.
2685        self.update_safe_block(state.safe_block_hash)
2686    }
2687
2688    /// Validates the payload attributes with respect to the header and fork choice state.
2689    ///
2690    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2691    /// return an error if the payload attributes are invalid.
2692    fn process_payload_attributes(
2693        &self,
2694        attrs: T::PayloadAttributes,
2695        head: &N::BlockHeader,
2696        state: ForkchoiceState,
2697        version: EngineApiMessageVersion,
2698    ) -> OnForkChoiceUpdated {
2699        if let Err(err) =
2700            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2701        {
2702            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2703            return OnForkChoiceUpdated::invalid_payload_attributes()
2704        }
2705
2706        // 8. Client software MUST begin a payload build process building on top of
2707        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2708        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2709        //    The build process is specified in the Payload building section.
2710        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2711            state.head_block_hash,
2712            attrs,
2713            version as u8,
2714        ) {
2715            Ok(attributes) => {
2716                // send the payload to the builder and return the receiver for the pending payload
2717                // id, initiating payload job is handled asynchronously
2718                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2719
2720                // Client software MUST respond to this method call in the following way:
2721                // {
2722                //      payloadStatus: {
2723                //          status: VALID,
2724                //          latestValidHash: forkchoiceState.headBlockHash,
2725                //          validationError: null
2726                //      },
2727                //      payloadId: buildProcessId
2728                // }
2729                //
2730                // if the payload is deemed VALID and the build process has begun.
2731                OnForkChoiceUpdated::updated_with_pending_payload_id(
2732                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2733                    pending_payload_id,
2734                )
2735            }
2736            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2737        }
2738    }
2739
2740    /// Remove all blocks up to __and including__ the given block number.
2741    ///
2742    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2743    /// those which have a fork point at or below the finalized hash.
2744    ///
2745    /// Canonical blocks below the upper bound will still be removed.
2746    pub(crate) fn remove_before(
2747        &mut self,
2748        upper_bound: BlockNumHash,
2749        finalized_hash: Option<B256>,
2750    ) -> ProviderResult<()> {
2751        // first fetch the finalized block number and then call the remove_before method on
2752        // tree_state
2753        let num = if let Some(hash) = finalized_hash {
2754            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2755        } else {
2756            None
2757        };
2758
2759        self.state.tree_state.remove_until(
2760            upper_bound,
2761            self.persistence_state.last_persisted_block.hash,
2762            num,
2763        );
2764        Ok(())
2765    }
2766
2767    /// Returns a builder for creating state providers for the given hash.
2768    ///
2769    /// This is an optimization for parallel execution contexts where we want to avoid
2770    /// creating state providers in the critical path.
2771    pub fn state_provider_builder(
2772        &self,
2773        hash: B256,
2774    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2775    where
2776        P: BlockReader + StateProviderFactory + StateReader + Clone,
2777    {
2778        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2779            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2780            // the block leads back to the canonical chain
2781            return Ok(Some(StateProviderBuilder::new(
2782                self.provider.clone(),
2783                historical,
2784                Some(blocks),
2785            )))
2786        }
2787
2788        // Check if the block is persisted
2789        if let Some(header) = self.provider.header(hash)? {
2790            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2791            // For persisted blocks, we create a builder that will fetch state directly from the
2792            // database
2793            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2794        }
2795
2796        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2797        Ok(None)
2798    }
2799}
2800
2801/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2802/// variant.
2803///
2804/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2805/// is valid or not.
2806#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2807pub enum BlockStatus {
2808    /// The block is valid and block extends canonical chain.
2809    Valid,
2810    /// The block may be valid and has an unknown missing ancestor.
2811    Disconnected {
2812        /// Current canonical head.
2813        head: BlockNumHash,
2814        /// The lowest ancestor block that is not connected to the canonical chain.
2815        missing_ancestor: BlockNumHash,
2816    },
2817}
2818
2819/// How a payload was inserted if it was valid.
2820///
2821/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
2822/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
2823#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2824pub enum InsertPayloadOk {
2825    /// The payload was valid, but we have already seen it.
2826    AlreadySeen(BlockStatus),
2827    /// The payload was valid and inserted into the tree.
2828    Inserted(BlockStatus),
2829}