reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use persistence_state::CurrentPersistenceAction;
17use reth_chain_state::{
18    CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
19    MemoryOverlayStateProvider, NewCanonicalChain,
20};
21use reth_consensus::{Consensus, FullConsensus};
22use reth_engine_primitives::{
23    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
24    ForkchoiceStateTracker, OnForkChoiceUpdated,
25};
26use reth_errors::{ConsensusError, ProviderResult};
27use reth_evm::{ConfigureEvm, OnStateHook};
28use reth_payload_builder::PayloadBuilderHandle;
29use reth_payload_primitives::{
30    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
31};
32use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
33use reth_provider::{
34    providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
35    HashedPostStateProvider, ProviderError, StateProviderBox, StateProviderFactory, StateReader,
36    StateRootProvider, TransactionVariant,
37};
38use reth_revm::database::StateProviderDatabase;
39use reth_stages_api::ControlFlow;
40use reth_trie::{HashedPostState, TrieInput};
41use reth_trie_db::DatabaseHashedPostState;
42use revm::state::EvmState;
43use state::TreeState;
44use std::{
45    fmt::Debug,
46    sync::{
47        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
48        Arc,
49    },
50    time::Instant,
51};
52use tokio::sync::{
53    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
54    oneshot::{self, error::TryRecvError},
55};
56use tracing::*;
57
58mod block_buffer;
59mod cached_state;
60pub mod error;
61mod instrumented_state;
62mod invalid_headers;
63mod metrics;
64mod payload_processor;
65pub mod payload_validator;
66mod persistence_state;
67pub mod precompile_cache;
68#[cfg(test)]
69mod tests;
70// TODO(alexey): compare trie updates in `insert_block_inner`
71#[expect(unused)]
72mod trie_updates;
73
74use crate::tree::error::AdvancePersistenceError;
75pub use block_buffer::BlockBuffer;
76pub use invalid_headers::InvalidHeaderCache;
77pub use payload_processor::*;
78pub use payload_validator::{BasicEngineValidator, EngineValidator};
79pub use persistence_state::PersistenceState;
80pub use reth_engine_primitives::TreeConfig;
81use reth_trie::KeccakKeyHasher;
82
83pub mod state;
84
85/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
86///
87/// This is the default threshold, and represents the distance (gap) from the local head to a
88/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
89/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
90///
91/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
92/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
93/// backfill this gap.
94pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
95
96/// A builder for creating state providers that can be used across threads.
97#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99    /// The provider factory used to create providers.
100    provider_factory: P,
101    /// The historical block hash to fetch state from.
102    historical: B256,
103    /// The blocks that form the chain from historical to target and are in memory.
104    overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108    /// Creates a new state provider from the provider factory, historical block hash and optional
109    /// overlaid blocks.
110    pub const fn new(
111        provider_factory: P,
112        historical: B256,
113        overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
114    ) -> Self {
115        Self { provider_factory, historical, overlay }
116    }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121    P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123    /// Creates a new state provider from this builder.
124    pub fn build(&self) -> ProviderResult<StateProviderBox> {
125        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126        if let Some(overlay) = self.overlay.clone() {
127            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128        }
129        Ok(provider)
130    }
131}
132
133/// Tracks the state of the engine api internals.
134///
135/// This type is not shareable.
136#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138    /// Tracks the state of the blockchain tree.
139    tree_state: TreeState<N>,
140    /// Tracks the forkchoice state updates received by the CL.
141    forkchoice_state_tracker: ForkchoiceStateTracker,
142    /// Buffer of detached blocks.
143    buffer: BlockBuffer<N::Block>,
144    /// Tracks the header of invalid payloads that were rejected by the engine because they're
145    /// invalid.
146    invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150    fn new(
151        block_buffer_limit: u32,
152        max_invalid_header_cache_length: u32,
153        canonical_block: BlockNumHash,
154        engine_kind: EngineApiKind,
155    ) -> Self {
156        Self {
157            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158            buffer: BlockBuffer::new(block_buffer_limit),
159            tree_state: TreeState::new(canonical_block, engine_kind),
160            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161        }
162    }
163}
164
165/// The outcome of a tree operation.
166#[derive(Debug)]
167pub struct TreeOutcome<T> {
168    /// The outcome of the operation.
169    pub outcome: T,
170    /// An optional event to tell the caller to do something.
171    pub event: Option<TreeEvent>,
172}
173
174impl<T> TreeOutcome<T> {
175    /// Create new tree outcome.
176    pub const fn new(outcome: T) -> Self {
177        Self { outcome, event: None }
178    }
179
180    /// Set event on the outcome.
181    pub fn with_event(mut self, event: TreeEvent) -> Self {
182        self.event = Some(event);
183        self
184    }
185}
186
187/// Events that are triggered by Tree Chain
188#[derive(Debug)]
189pub enum TreeEvent {
190    /// Tree action is needed.
191    TreeAction(TreeAction),
192    /// Backfill action is needed.
193    BackfillAction(BackfillAction),
194    /// Block download is needed.
195    Download(DownloadRequest),
196}
197
198impl TreeEvent {
199    /// Returns true if the event is a backfill action.
200    const fn is_backfill_action(&self) -> bool {
201        matches!(self, Self::BackfillAction(_))
202    }
203}
204
205/// The actions that can be performed on the tree.
206#[derive(Debug)]
207pub enum TreeAction {
208    /// Make target canonical.
209    MakeCanonical {
210        /// The sync target head hash
211        sync_target_head: B256,
212    },
213}
214
215/// Wrapper struct that combines metrics and state hook
216struct MeteredStateHook {
217    metrics: reth_evm::metrics::ExecutorMetrics,
218    inner_hook: Box<dyn OnStateHook>,
219}
220
221impl OnStateHook for MeteredStateHook {
222    fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
223        // Update the metrics for the number of accounts, storage slots and bytecodes loaded
224        let accounts = state.keys().len();
225        let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
226        let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
227
228        self.metrics.accounts_loaded_histogram.record(accounts as f64);
229        self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
230        self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
231
232        // Call the original state hook
233        self.inner_hook.on_state(source, state);
234    }
235}
236
237/// The engine API tree handler implementation.
238///
239/// This type is responsible for processing engine API requests, maintaining the canonical state and
240/// emitting events.
241pub struct EngineApiTreeHandler<N, P, T, V, C>
242where
243    N: NodePrimitives,
244    T: PayloadTypes,
245    C: ConfigureEvm<Primitives = N> + 'static,
246{
247    provider: P,
248    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
249    payload_validator: V,
250    /// Keeps track of internals such as executed and buffered blocks.
251    state: EngineApiTreeState<N>,
252    /// The half for sending messages to the engine.
253    ///
254    /// This is kept so that we can queue in messages to ourself that we can process later, for
255    /// example distributing workload across multiple messages that would otherwise take too long
256    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
257    /// them one by one so that we can handle incoming engine API in between and don't become
258    /// unresponsive. This can happen during live sync transition where we're trying to close the
259    /// gap (up to 3 epochs of blocks in the worst case).
260    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
261    /// Incoming engine API requests.
262    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
263    /// Outgoing events that are emitted to the handler.
264    outgoing: UnboundedSender<EngineApiEvent<N>>,
265    /// Channels to the persistence layer.
266    persistence: PersistenceHandle<N>,
267    /// Tracks the state changes of the persistence task.
268    persistence_state: PersistenceState,
269    /// Flag indicating the state of the node's backfill synchronization process.
270    backfill_sync_state: BackfillSyncState,
271    /// Keeps track of the state of the canonical chain that isn't persisted yet.
272    /// This is intended to be accessed from external sources, such as rpc.
273    canonical_in_memory_state: CanonicalInMemoryState<N>,
274    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
275    /// updates
276    payload_builder: PayloadBuilderHandle<T>,
277    /// Configuration settings.
278    config: TreeConfig,
279    /// Metrics for the engine api.
280    metrics: EngineApiMetrics,
281    /// The engine API variant of this handler
282    engine_kind: EngineApiKind,
283    /// The EVM configuration.
284    evm_config: C,
285}
286
287impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
288    for EngineApiTreeHandler<N, P, T, V, C>
289where
290    N: NodePrimitives,
291    C: Debug + ConfigureEvm<Primitives = N>,
292{
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct("EngineApiTreeHandler")
295            .field("provider", &self.provider)
296            .field("consensus", &self.consensus)
297            .field("payload_validator", &self.payload_validator)
298            .field("state", &self.state)
299            .field("incoming_tx", &self.incoming_tx)
300            .field("persistence", &self.persistence)
301            .field("persistence_state", &self.persistence_state)
302            .field("backfill_sync_state", &self.backfill_sync_state)
303            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
304            .field("payload_builder", &self.payload_builder)
305            .field("config", &self.config)
306            .field("metrics", &self.metrics)
307            .field("engine_kind", &self.engine_kind)
308            .field("evm_config", &self.evm_config)
309            .finish()
310    }
311}
312
313impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
314where
315    N: NodePrimitives,
316    P: DatabaseProviderFactory
317        + BlockReader<Block = N::Block, Header = N::BlockHeader>
318        + StateProviderFactory
319        + StateReader<Receipt = N::Receipt>
320        + HashedPostStateProvider
321        + Clone
322        + 'static,
323    <P as DatabaseProviderFactory>::Provider:
324        BlockReader<Block = N::Block, Header = N::BlockHeader>,
325    C: ConfigureEvm<Primitives = N> + 'static,
326    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
327    V: EngineValidator<T>,
328{
329    /// Creates a new [`EngineApiTreeHandler`].
330    #[expect(clippy::too_many_arguments)]
331    pub fn new(
332        provider: P,
333        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
334        payload_validator: V,
335        outgoing: UnboundedSender<EngineApiEvent<N>>,
336        state: EngineApiTreeState<N>,
337        canonical_in_memory_state: CanonicalInMemoryState<N>,
338        persistence: PersistenceHandle<N>,
339        persistence_state: PersistenceState,
340        payload_builder: PayloadBuilderHandle<T>,
341        config: TreeConfig,
342        engine_kind: EngineApiKind,
343        evm_config: C,
344    ) -> Self {
345        let (incoming_tx, incoming) = std::sync::mpsc::channel();
346
347        Self {
348            provider,
349            consensus,
350            payload_validator,
351            incoming,
352            outgoing,
353            persistence,
354            persistence_state,
355            backfill_sync_state: BackfillSyncState::Idle,
356            state,
357            canonical_in_memory_state,
358            payload_builder,
359            config,
360            metrics: Default::default(),
361            incoming_tx,
362            engine_kind,
363            evm_config,
364        }
365    }
366
367    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
368    /// own thread.
369    ///
370    /// Returns the sender through which incoming requests can be sent to the task and the receiver
371    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
372    #[expect(clippy::complexity)]
373    pub fn spawn_new(
374        provider: P,
375        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
376        payload_validator: V,
377        persistence: PersistenceHandle<N>,
378        payload_builder: PayloadBuilderHandle<T>,
379        canonical_in_memory_state: CanonicalInMemoryState<N>,
380        config: TreeConfig,
381        kind: EngineApiKind,
382        evm_config: C,
383    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
384    {
385        let best_block_number = provider.best_block_number().unwrap_or(0);
386        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
387
388        let persistence_state = PersistenceState {
389            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
390            rx: None,
391        };
392
393        let (tx, outgoing) = unbounded_channel();
394        let state = EngineApiTreeState::new(
395            config.block_buffer_limit(),
396            config.max_invalid_header_cache_length(),
397            header.num_hash(),
398            kind,
399        );
400
401        let task = Self::new(
402            provider,
403            consensus,
404            payload_validator,
405            tx,
406            state,
407            canonical_in_memory_state,
408            persistence,
409            persistence_state,
410            payload_builder,
411            config,
412            kind,
413            evm_config,
414        );
415        let incoming = task.incoming_tx.clone();
416        std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
417        (incoming, outgoing)
418    }
419
420    /// Returns a new [`Sender`] to send messages to this type.
421    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
422        self.incoming_tx.clone()
423    }
424
425    /// Run the engine API handler.
426    ///
427    /// This will block the current thread and process incoming messages.
428    pub fn run(mut self) {
429        loop {
430            match self.try_recv_engine_message() {
431                Ok(Some(msg)) => {
432                    debug!(target: "engine::tree", %msg, "received new engine message");
433                    if let Err(fatal) = self.on_engine_message(msg) {
434                        error!(target: "engine::tree", %fatal, "insert block fatal error");
435                        return
436                    }
437                }
438                Ok(None) => {
439                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440                }
441                Err(_err) => {
442                    error!(target: "engine::tree", "Engine channel disconnected");
443                    return
444                }
445            }
446
447            if let Err(err) = self.advance_persistence() {
448                error!(target: "engine::tree", %err, "Advancing persistence failed");
449                return
450            }
451        }
452    }
453
454    /// Invoked when previously requested blocks were downloaded.
455    ///
456    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
457    /// will execute the first batch and send the remaining blocks back through the channel so that
458    /// block request processing isn't blocked for a long time.
459    fn on_downloaded(
460        &mut self,
461        mut blocks: Vec<RecoveredBlock<N::Block>>,
462    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463        if blocks.is_empty() {
464            // nothing to execute
465            return Ok(None)
466        }
467
468        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470        for block in blocks.drain(..batch) {
471            if let Some(event) = self.on_downloaded_block(block)? {
472                let needs_backfill = event.is_backfill_action();
473                self.on_tree_event(event)?;
474                if needs_backfill {
475                    // can exit early if backfill is needed
476                    return Ok(None)
477                }
478            }
479        }
480
481        // if we still have blocks to execute, send them as a followup request
482        if !blocks.is_empty() {
483            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484        }
485
486        Ok(None)
487    }
488
489    /// When the Consensus layer receives a new block via the consensus gossip protocol,
490    /// the transactions in the block are sent to the execution layer in the form of a
491    /// [`PayloadTypes::ExecutionData`], for example
492    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
493    /// Execution layer executes the transactions and validates the state in the block header,
494    /// then passes validation data back to Consensus layer, that adds the block to the head of
495    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
496    /// network in the form of a "Beacon block".
497    ///
498    /// These responses should adhere to the [Engine API Spec for
499    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
500    ///
501    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
502    /// returns an error if an internal error occurred.
503    #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
504    fn on_new_payload(
505        &mut self,
506        payload: T::ExecutionData,
507    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508        trace!(target: "engine::tree", "invoked new payload");
509        self.metrics.engine.new_payload_messages.increment(1);
510
511        // start timing for the new payload process
512        let start = Instant::now();
513
514        // Ensures that the given payload does not violate any consensus rules that concern the
515        // block's layout, like:
516        //    - missing or invalid base fee
517        //    - invalid extra data
518        //    - invalid transactions
519        //    - incorrect hash
520        //    - the versioned hashes passed with the payload do not exactly match transaction
521        //      versioned hashes
522        //    - the block does not contain blob transactions if it is pre-cancun
523        //
524        // This validates the following engine API rule:
525        //
526        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
527        //    validation by taking the following steps:
528        //
529        //   1. Obtain the actual array by concatenating blob versioned hashes lists
530        //      (`tx.blob_versioned_hashes`) of each [blob
531        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
532        //      in the payload, respecting the order of inclusion. If the payload has no blob
533        //      transactions the expected array **MUST** be `[]`.
534        //
535        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
536        //      null}` if the expected and the actual arrays don't match.
537        //
538        // This validation **MUST** be instantly run in all cases even during active sync process.
539        let parent_hash = payload.parent_hash();
540
541        let num_hash = payload.num_hash();
542        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
543        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
544
545        let block_hash = num_hash.hash;
546        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
547        if lowest_buffered_ancestor == block_hash {
548            lowest_buffered_ancestor = parent_hash;
549        }
550
551        // now check if the block has an invalid ancestor
552        if let Some(invalid) = self.state.invalid_headers.get(&lowest_buffered_ancestor) {
553            // Here we might have 2 cases
554            // 1. the block is well formed and indeed links to an invalid header, meaning we should
555            //    remember it as invalid
556            // 2. the block is not well formed (i.e block hash is incorrect), and we should just
557            //    return an error and forget it
558            let block = match self.payload_validator.ensure_well_formed_payload(payload) {
559                Ok(block) => block,
560                Err(error) => {
561                    let status = self.on_new_payload_error(error, parent_hash)?;
562                    return Ok(TreeOutcome::new(status))
563                }
564            };
565
566            let status = self.on_invalid_new_payload(block.into_sealed_block(), invalid)?;
567            return Ok(TreeOutcome::new(status))
568        }
569        // record pre-execution phase duration
570        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
571
572        let status = if self.backfill_sync_state.is_idle() {
573            let mut latest_valid_hash = None;
574            match self.insert_payload(payload) {
575                Ok(status) => {
576                    let status = match status {
577                        InsertPayloadOk::Inserted(BlockStatus::Valid) => {
578                            latest_valid_hash = Some(block_hash);
579                            self.try_connect_buffered_blocks(num_hash)?;
580                            PayloadStatusEnum::Valid
581                        }
582                        InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
583                            latest_valid_hash = Some(block_hash);
584                            PayloadStatusEnum::Valid
585                        }
586                        InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
587                        InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
588                            // not known to be invalid, but we don't know anything else
589                            PayloadStatusEnum::Syncing
590                        }
591                    };
592
593                    PayloadStatus::new(status, latest_valid_hash)
594                }
595                Err(error) => match error {
596                    InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
597                    InsertPayloadError::Payload(error) => {
598                        self.on_new_payload_error(error, parent_hash)?
599                    }
600                },
601            }
602        } else {
603            match self.payload_validator.ensure_well_formed_payload(payload) {
604                // if the block is well-formed, buffer it for later
605                Ok(block) => {
606                    if let Err(error) = self.buffer_block(block) {
607                        self.on_insert_block_error(error)?
608                    } else {
609                        PayloadStatus::from_status(PayloadStatusEnum::Syncing)
610                    }
611                }
612                Err(error) => self.on_new_payload_error(error, parent_hash)?,
613            }
614        };
615
616        let mut outcome = TreeOutcome::new(status);
617        // if the block is valid and it is the current sync target head, make it canonical
618        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
619            // but only if it isn't already the canonical head
620            if self.state.tree_state.canonical_block_hash() != block_hash {
621                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
622                    sync_target_head: block_hash,
623                }));
624            }
625        }
626
627        // record total newPayload duration
628        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
629
630        Ok(outcome)
631    }
632
633    /// Returns the new chain for the given head.
634    ///
635    /// This also handles reorgs.
636    ///
637    /// Note: This does not update the tracked state and instead returns the new chain based on the
638    /// given head.
639    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
640        // get the executed new head block
641        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
642            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
643            self.metrics.engine.executed_new_block_cache_miss.increment(1);
644            return Ok(None)
645        };
646
647        let new_head_number = new_head_block.recovered_block().number();
648        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
649
650        let mut new_chain = vec![new_head_block.clone()];
651        let mut current_hash = new_head_block.recovered_block().parent_hash();
652        let mut current_number = new_head_number - 1;
653
654        // Walk back the new chain until we reach a block we know about
655        //
656        // This is only done for in-memory blocks, because we should not have persisted any blocks
657        // that are _above_ the current canonical head.
658        while current_number > current_canonical_number {
659            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
660            {
661                current_hash = block.recovered_block().parent_hash();
662                current_number -= 1;
663                new_chain.push(block);
664            } else {
665                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
666                // This should never happen as we're walking back a chain that should connect to
667                // the canonical chain
668                return Ok(None)
669            }
670        }
671
672        // If we have reached the current canonical head by walking back from the target, then we
673        // know this represents an extension of the canonical chain.
674        if current_hash == self.state.tree_state.current_canonical_head.hash {
675            new_chain.reverse();
676
677            // Simple extension of the current chain
678            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
679        }
680
681        // We have a reorg. Walk back both chains to find the fork point.
682        let mut old_chain = Vec::new();
683        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
684
685        // If the canonical chain is ahead of the new chain,
686        // gather all blocks until new head number.
687        while current_canonical_number > current_number {
688            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
689                old_chain.push(block.clone());
690                old_hash = block.recovered_block().parent_hash();
691                current_canonical_number -= 1;
692            } else {
693                // This shouldn't happen as we're walking back the canonical chain
694                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
695                return Ok(None)
696            }
697        }
698
699        // Both new and old chain pointers are now at the same height.
700        debug_assert_eq!(current_number, current_canonical_number);
701
702        // Walk both chains from specified hashes at same height until
703        // a common ancestor (fork block) is reached.
704        while old_hash != current_hash {
705            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
706                old_hash = block.recovered_block().parent_hash();
707                old_chain.push(block);
708            } else {
709                // This shouldn't happen as we're walking back the canonical chain
710                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
711                return Ok(None)
712            }
713
714            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
715            {
716                current_hash = block.recovered_block().parent_hash();
717                new_chain.push(block);
718            } else {
719                // This shouldn't happen as we've already walked this path
720                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
721                return Ok(None)
722            }
723        }
724        new_chain.reverse();
725        old_chain.reverse();
726
727        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
728    }
729
730    /// Updates the latest block state to the specified canonical ancestor.
731    ///
732    /// This method ensures that the latest block tracks the given canonical header by resetting
733    ///
734    /// # Arguments
735    /// * `canonical_header` - The canonical header to set as the new head
736    ///
737    /// # Returns
738    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
739    ///
740    /// Caution: This unwinds the canonical chain
741    fn update_latest_block_to_canonical_ancestor(
742        &mut self,
743        canonical_header: &SealedHeader<N::BlockHeader>,
744    ) -> ProviderResult<()> {
745        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
746        let current_head_number = self.state.tree_state.canonical_block_number();
747        let new_head_number = canonical_header.number();
748        let new_head_hash = canonical_header.hash();
749
750        // Update tree state with the new canonical head
751        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
752
753        // Handle the state update based on whether this is an unwind scenario
754        if new_head_number < current_head_number {
755            debug!(
756                target: "engine::tree",
757                current_head = current_head_number,
758                new_head = new_head_number,
759                new_head_hash = ?new_head_hash,
760                "FCU unwind detected: reverting to canonical ancestor"
761            );
762
763            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
764        } else {
765            debug!(
766                target: "engine::tree",
767                previous_head = current_head_number,
768                new_head = new_head_number,
769                new_head_hash = ?new_head_hash,
770                "Advancing latest block to canonical ancestor"
771            );
772            self.handle_chain_advance_or_same_height(canonical_header)
773        }
774    }
775
776    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
777    /// to the canonical header
778    fn handle_canonical_chain_unwind(
779        &self,
780        current_head_number: u64,
781        canonical_header: &SealedHeader<N::BlockHeader>,
782    ) -> ProviderResult<()> {
783        let new_head_number = canonical_header.number();
784        debug!(
785            target: "engine::tree",
786            from = current_head_number,
787            to = new_head_number,
788            "Handling unwind: collecting blocks to remove from in-memory state"
789        );
790
791        // Collect blocks that need to be removed from memory
792        let old_blocks =
793            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
794
795        // Load and apply the canonical ancestor block
796        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
797    }
798
799    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
800    fn collect_blocks_for_canonical_unwind(
801        &self,
802        new_head_number: u64,
803        current_head_number: u64,
804    ) -> Vec<ExecutedBlock<N>> {
805        let mut old_blocks = Vec::new();
806
807        for block_num in (new_head_number + 1)..=current_head_number {
808            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
809                let executed_block = block_state.block_ref().block.clone();
810                old_blocks.push(executed_block);
811                debug!(
812                    target: "engine::tree",
813                    block_number = block_num,
814                    "Collected block for removal from in-memory state"
815                );
816            }
817        }
818
819        if old_blocks.is_empty() {
820            debug!(
821                target: "engine::tree",
822                "No blocks found in memory to remove, will clear and reset state"
823            );
824        }
825
826        old_blocks
827    }
828
829    /// Applies the canonical ancestor block via a reorg operation.
830    fn apply_canonical_ancestor_via_reorg(
831        &self,
832        canonical_header: &SealedHeader<N::BlockHeader>,
833        old_blocks: Vec<ExecutedBlock<N>>,
834    ) -> ProviderResult<()> {
835        let new_head_hash = canonical_header.hash();
836        let new_head_number = canonical_header.number();
837
838        // Try to load the canonical ancestor's block
839        match self.canonical_block_by_hash(new_head_hash)? {
840            Some(executed_block) => {
841                let block_with_trie = ExecutedBlockWithTrieUpdates {
842                    block: executed_block,
843                    trie: ExecutedTrieUpdates::Missing,
844                };
845
846                // Perform the reorg to properly handle the unwind
847                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
848                    new: vec![block_with_trie],
849                    old: old_blocks,
850                });
851
852                // CRITICAL: Update the canonical head after the reorg
853                // This ensures get_canonical_head() returns the correct block
854                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
855
856                debug!(
857                    target: "engine::tree",
858                    block_number = new_head_number,
859                    block_hash = ?new_head_hash,
860                    "Successfully loaded canonical ancestor into memory via reorg"
861                );
862            }
863            None => {
864                // Fallback: update header only if block cannot be found
865                warn!(
866                    target: "engine::tree",
867                    block_hash = ?new_head_hash,
868                    "Could not find canonical ancestor block, updating header only"
869                );
870                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
871            }
872        }
873
874        Ok(())
875    }
876
877    /// Handles chain advance or same height scenarios.
878    fn handle_chain_advance_or_same_height(
879        &self,
880        canonical_header: &SealedHeader<N::BlockHeader>,
881    ) -> ProviderResult<()> {
882        let new_head_number = canonical_header.number();
883        let new_head_hash = canonical_header.hash();
884
885        // Update the canonical head header
886        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
887
888        // Load the block into memory if it's not already present
889        self.ensure_block_in_memory(new_head_number, new_head_hash)
890    }
891
892    /// Ensures a block is loaded into memory if not already present.
893    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
894        // Check if block is already in memory
895        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
896            return Ok(());
897        }
898
899        // Try to load the block from storage
900        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
901            let block_with_trie = ExecutedBlockWithTrieUpdates {
902                block: executed_block,
903                trie: ExecutedTrieUpdates::Missing,
904            };
905
906            self.canonical_in_memory_state
907                .update_chain(NewCanonicalChain::Commit { new: vec![block_with_trie] });
908
909            debug!(
910                target: "engine::tree",
911                block_number,
912                block_hash = ?block_hash,
913                "Added canonical block to in-memory state"
914            );
915        }
916
917        Ok(())
918    }
919
920    /// Determines if the given block is part of a fork by checking that these
921    /// conditions are true:
922    /// * walking back from the target hash to verify that the target hash is not part of an
923    ///   extension of the canonical chain.
924    /// * walking back from the current head to verify that the target hash is not already part of
925    ///   the canonical chain.
926    ///
927    /// The header is required as an arg, because we might be checking that the header is a fork
928    /// block before it's in the tree state and before it's in the database.
929    fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
930        let target_hash = target.block.hash;
931        // verify that the given hash is not part of an extension of the canon chain.
932        let canonical_head = self.state.tree_state.canonical_head();
933        let mut current_hash;
934        let mut current_block = target;
935        loop {
936            if current_block.block.hash == canonical_head.hash {
937                return Ok(false)
938            }
939            // We already passed the canonical head
940            if current_block.block.number <= canonical_head.number {
941                break
942            }
943            current_hash = current_block.parent;
944
945            let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
946            current_block = next_block.block_with_parent();
947        }
948
949        // verify that the given hash is not already part of canonical chain stored in memory
950        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
951            return Ok(false)
952        }
953
954        // verify that the given hash is not already part of persisted canonical chain
955        if self.provider.block_number(target_hash)?.is_some() {
956            return Ok(false)
957        }
958
959        Ok(true)
960    }
961
962    /// Returns the persisting kind for the input block.
963    fn persisting_kind_for(&self, block: BlockWithParent) -> PersistingKind {
964        // Check that we're currently persisting.
965        let Some(action) = self.persistence_state.current_action() else {
966            return PersistingKind::NotPersisting
967        };
968        // Check that the persistince action is saving blocks, not removing them.
969        let CurrentPersistenceAction::SavingBlocks { highest } = action else {
970            return PersistingKind::PersistingNotDescendant
971        };
972
973        // The block being validated can only be a descendant if its number is higher than
974        // the highest block persisting. Otherwise, it's likely a fork of a lower block.
975        if block.block.number > highest.number &&
976            self.state.tree_state.is_descendant(*highest, block)
977        {
978            return PersistingKind::PersistingDescendant
979        }
980
981        // In all other cases, the block is not a descendant.
982        PersistingKind::PersistingNotDescendant
983    }
984
985    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
986    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
987    /// chain.
988    ///
989    /// These responses should adhere to the [Engine API Spec for
990    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
991    ///
992    /// Returns an error if an internal error occurred like a database error.
993    #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
994    fn on_forkchoice_updated(
995        &mut self,
996        state: ForkchoiceState,
997        attrs: Option<T::PayloadAttributes>,
998        version: EngineApiMessageVersion,
999    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1000        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1001        self.metrics.engine.forkchoice_updated_messages.increment(1);
1002        if attrs.is_some() {
1003            self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1004        }
1005        self.canonical_in_memory_state.on_forkchoice_update_received();
1006
1007        if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1008            return Ok(TreeOutcome::new(on_updated))
1009        }
1010
1011        let valid_outcome = |head| {
1012            TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1013                PayloadStatusEnum::Valid,
1014                Some(head),
1015            )))
1016        };
1017
1018        // Process the forkchoice update by trying to make the head block canonical
1019        //
1020        // We can only process this forkchoice update if:
1021        // - we have the `head` block
1022        // - the head block is part of a chain that is connected to the canonical chain. This
1023        //   includes reorgs.
1024        //
1025        // Performing a FCU involves:
1026        // - marking the FCU's head block as canonical
1027        // - updating in memory state to reflect the new canonical chain
1028        // - updating canonical state trackers
1029        // - emitting a canonicalization event for the new chain (including reorg)
1030        // - if we have payload attributes, delegate them to the payload service
1031
1032        // 1. ensure we have a new head block
1033        if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1034            trace!(target: "engine::tree", "fcu head hash is already canonical");
1035
1036            // update the safe and finalized blocks and ensure their values are valid
1037            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1038                // safe or finalized hashes are invalid
1039                return Ok(TreeOutcome::new(outcome))
1040            }
1041
1042            // we still need to process payload attributes if the head is already canonical
1043            if let Some(attr) = attrs {
1044                let tip = self
1045                    .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1046                    .ok_or_else(|| {
1047                        // If we can't find the canonical block, then something is wrong and we need
1048                        // to return an error
1049                        ProviderError::HeaderNotFound(state.head_block_hash.into())
1050                    })?;
1051                let updated = self.process_payload_attributes(attr, &tip, state, version);
1052                return Ok(TreeOutcome::new(updated))
1053            }
1054
1055            // the head block is already canonical
1056            return Ok(valid_outcome(state.head_block_hash))
1057        }
1058
1059        // 2. check if the head is already part of the canonical chain
1060        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1061            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1062
1063            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1064            // own chain at will, so we need to always trigger a new payload job if requested.
1065            if self.engine_kind.is_opstack() ||
1066                self.config.always_process_payload_attributes_on_canonical_head()
1067            {
1068                if let Some(attr) = attrs {
1069                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1070                    let updated =
1071                        self.process_payload_attributes(attr, &canonical_header, state, version);
1072                    return Ok(TreeOutcome::new(updated))
1073                }
1074
1075                // At this point, no alternative block has been triggered, so we need effectively
1076                // unwind the _canonical_ chain to the FCU's head, which is part of the canonical
1077                // chain. We need to update the latest block state to reflect the
1078                // canonical ancestor. This ensures that state providers and the
1079                // transaction pool operate with the correct chain state after
1080                // forkchoice update processing.
1081                if self.config.unwind_canonical_header() {
1082                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1083                }
1084            }
1085
1086            // 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
1087            //    payload build process if `forkchoiceState.headBlockHash` references a `VALID`
1088            //    ancestor of the head of canonical chain, i.e. the ancestor passed payload
1089            //    validation process and deemed `VALID`. In the case of such an event, client
1090            //    software MUST return `{payloadStatus: {status: VALID, latestValidHash:
1091            //    forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1092
1093            // the head block is already canonical, so we're not triggering a payload job and can
1094            // return right away
1095            return Ok(valid_outcome(state.head_block_hash))
1096        }
1097
1098        // 3. ensure we can apply a new chain update for the head block
1099        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1100            let tip = chain_update.tip().clone_sealed_header();
1101            self.on_canonical_chain_update(chain_update);
1102
1103            // update the safe and finalized blocks and ensure their values are valid
1104            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1105                // safe or finalized hashes are invalid
1106                return Ok(TreeOutcome::new(outcome))
1107            }
1108
1109            if let Some(attr) = attrs {
1110                let updated = self.process_payload_attributes(attr, &tip, state, version);
1111                return Ok(TreeOutcome::new(updated))
1112            }
1113
1114            return Ok(valid_outcome(state.head_block_hash))
1115        }
1116
1117        // 4. we don't have the block to perform the update
1118        // we assume the FCU is valid and at least the head is missing,
1119        // so we need to start syncing to it
1120        //
1121        // find the appropriate target to sync to, if we don't have the safe block hash then we
1122        // start syncing to the safe block via backfill first
1123        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1124            // check that safe block is valid and missing
1125            !state.safe_block_hash.is_zero() &&
1126            self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1127        {
1128            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1129            state.safe_block_hash
1130        } else {
1131            state.head_block_hash
1132        };
1133
1134        let target = self.lowest_buffered_ancestor_or(target);
1135        trace!(target: "engine::tree", %target, "downloading missing block");
1136
1137        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1138            PayloadStatusEnum::Syncing,
1139        )))
1140        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1141    }
1142
1143    /// Attempts to receive the next engine request.
1144    ///
1145    /// If there's currently no persistence action in progress, this will block until a new request
1146    /// is received. If there's a persistence action in progress, this will try to receive the
1147    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1148    /// received in time.
1149    ///
1150    /// Returns an error if the engine channel is disconnected.
1151    #[expect(clippy::type_complexity)]
1152    fn try_recv_engine_message(
1153        &self,
1154    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1155        if self.persistence_state.in_progress() {
1156            // try to receive the next request with a timeout to not block indefinitely
1157            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1158                Ok(msg) => Ok(Some(msg)),
1159                Err(err) => match err {
1160                    RecvTimeoutError::Timeout => Ok(None),
1161                    RecvTimeoutError::Disconnected => Err(RecvError),
1162                },
1163            }
1164        } else {
1165            self.incoming.recv().map(Some)
1166        }
1167    }
1168
1169    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1170    /// the current persistence action while we're removing blocks.
1171    fn remove_blocks(&mut self, new_tip_num: u64) {
1172        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1173        if new_tip_num < self.persistence_state.last_persisted_block.number {
1174            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1175            let (tx, rx) = oneshot::channel();
1176            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1177            self.persistence_state.start_remove(new_tip_num, rx);
1178        }
1179    }
1180
1181    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1182    /// the current persistence action while we're saving blocks.
1183    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1184        if blocks_to_persist.is_empty() {
1185            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1186            return
1187        }
1188
1189        // NOTE: checked non-empty above
1190        let highest_num_hash = blocks_to_persist
1191            .iter()
1192            .max_by_key(|block| block.recovered_block().number())
1193            .map(|b| b.recovered_block().num_hash())
1194            .expect("Checked non-empty persisting blocks");
1195
1196        debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1197        let (tx, rx) = oneshot::channel();
1198        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1199
1200        self.persistence_state.start_save(highest_num_hash, rx);
1201    }
1202
1203    /// Attempts to advance the persistence state.
1204    ///
1205    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1206    /// or send a new persistence action if necessary.
1207    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1208        if self.persistence_state.in_progress() {
1209            let (mut rx, start_time, current_action) = self
1210                .persistence_state
1211                .rx
1212                .take()
1213                .expect("if a persistence task is in progress Receiver must be Some");
1214            // Check if persistence has complete
1215            match rx.try_recv() {
1216                Ok(last_persisted_hash_num) => {
1217                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1218                    let Some(BlockNumHash {
1219                        hash: last_persisted_block_hash,
1220                        number: last_persisted_block_number,
1221                    }) = last_persisted_hash_num
1222                    else {
1223                        // if this happened, then we persisted no blocks because we sent an
1224                        // empty vec of blocks
1225                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1226                        return Ok(())
1227                    };
1228
1229                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1230                    self.persistence_state
1231                        .finish(last_persisted_block_hash, last_persisted_block_number);
1232                    self.on_new_persisted_block()?;
1233                }
1234                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1235                Err(TryRecvError::Empty) => {
1236                    self.persistence_state.rx = Some((rx, start_time, current_action))
1237                }
1238            }
1239        }
1240
1241        if !self.persistence_state.in_progress() {
1242            if let Some(new_tip_num) = self.find_disk_reorg()? {
1243                self.remove_blocks(new_tip_num)
1244            } else if self.should_persist() {
1245                let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1246                self.persist_blocks(blocks_to_persist);
1247            }
1248        }
1249
1250        Ok(())
1251    }
1252
1253    /// Handles a message from the engine.
1254    fn on_engine_message(
1255        &mut self,
1256        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1257    ) -> Result<(), InsertBlockFatalError> {
1258        match msg {
1259            FromEngine::Event(event) => match event {
1260                FromOrchestrator::BackfillSyncStarted => {
1261                    debug!(target: "engine::tree", "received backfill sync started event");
1262                    self.backfill_sync_state = BackfillSyncState::Active;
1263                }
1264                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1265                    self.on_backfill_sync_finished(ctrl)?;
1266                }
1267            },
1268            FromEngine::Request(request) => {
1269                match request {
1270                    EngineApiRequest::InsertExecutedBlock(block) => {
1271                        let block_num_hash = block.recovered_block().num_hash();
1272                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1273                            // outdated block that can be skipped
1274                            return Ok(())
1275                        }
1276
1277                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1278                        let now = Instant::now();
1279
1280                        // if the parent is the canonical head, we can insert the block as the
1281                        // pending block
1282                        if self.state.tree_state.canonical_block_hash() ==
1283                            block.recovered_block().parent_hash()
1284                        {
1285                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1286                            self.canonical_in_memory_state.set_pending_block(block.clone());
1287                        }
1288
1289                        self.state.tree_state.insert_executed(block.clone());
1290                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1291                        self.emit_event(EngineApiEvent::BeaconConsensus(
1292                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1293                        ));
1294                    }
1295                    EngineApiRequest::Beacon(request) => {
1296                        match request {
1297                            BeaconEngineMessage::ForkchoiceUpdated {
1298                                state,
1299                                payload_attrs,
1300                                tx,
1301                                version,
1302                            } => {
1303                                let mut output =
1304                                    self.on_forkchoice_updated(state, payload_attrs, version);
1305
1306                                if let Ok(res) = &mut output {
1307                                    // track last received forkchoice state
1308                                    self.state
1309                                        .forkchoice_state_tracker
1310                                        .set_latest(state, res.outcome.forkchoice_status());
1311
1312                                    // emit an event about the handled FCU
1313                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1314                                        state,
1315                                        res.outcome.forkchoice_status(),
1316                                    ));
1317
1318                                    // handle the event if any
1319                                    self.on_maybe_tree_event(res.event.take())?;
1320                                }
1321
1322                                if let Err(err) =
1323                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1324                                {
1325                                    self.metrics
1326                                        .engine
1327                                        .failed_forkchoice_updated_response_deliveries
1328                                        .increment(1);
1329                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1330                                }
1331                            }
1332                            BeaconEngineMessage::NewPayload { payload, tx } => {
1333                                let mut output = self.on_new_payload(payload);
1334
1335                                let maybe_event =
1336                                    output.as_mut().ok().and_then(|out| out.event.take());
1337
1338                                // emit response
1339                                if let Err(err) =
1340                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1341                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1342                                    }))
1343                                {
1344                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1345                                    self.metrics
1346                                        .engine
1347                                        .failed_new_payload_response_deliveries
1348                                        .increment(1);
1349                                }
1350
1351                                // handle the event if any
1352                                self.on_maybe_tree_event(maybe_event)?;
1353                            }
1354                        }
1355                    }
1356                }
1357            }
1358            FromEngine::DownloadedBlocks(blocks) => {
1359                if let Some(event) = self.on_downloaded(blocks)? {
1360                    self.on_tree_event(event)?;
1361                }
1362            }
1363        }
1364        Ok(())
1365    }
1366
1367    /// Invoked if the backfill sync has finished to target.
1368    ///
1369    /// At this point we consider the block synced to the backfill target.
1370    ///
1371    /// Checks the tracked finalized block against the block on disk and requests another backfill
1372    /// run if the distance to the tip exceeds the threshold for another backfill run.
1373    ///
1374    /// This will also do the necessary housekeeping of the tree state, this includes:
1375    ///  - removing all blocks below the backfill height
1376    ///  - resetting the canonical in-memory state
1377    ///
1378    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1379    /// target block.
1380    fn on_backfill_sync_finished(
1381        &mut self,
1382        ctrl: ControlFlow,
1383    ) -> Result<(), InsertBlockFatalError> {
1384        debug!(target: "engine::tree", "received backfill sync finished event");
1385        self.backfill_sync_state = BackfillSyncState::Idle;
1386
1387        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1388        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1389            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1390            // update the `invalid_headers` cache with the new invalid header
1391            self.state.invalid_headers.insert(**bad_block);
1392
1393            // if this was an unwind then the target is the new height
1394            Some(*target)
1395        } else {
1396            // backfill height is the block number that the backfill finished at
1397            ctrl.block_number()
1398        };
1399
1400        // backfill height is the block number that the backfill finished at
1401        let Some(backfill_height) = backfill_height else { return Ok(()) };
1402
1403        // state house keeping after backfill sync
1404        // remove all executed blocks below the backfill height
1405        //
1406        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1407        // before that
1408        let Some(backfill_num_hash) = self
1409            .provider
1410            .block_hash(backfill_height)?
1411            .map(|hash| BlockNumHash { hash, number: backfill_height })
1412        else {
1413            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1414            return Ok(())
1415        };
1416
1417        if ctrl.is_unwind() {
1418            // the node reset so we need to clear everything above that height so that backfill
1419            // height is the new canonical block.
1420            self.state.tree_state.reset(backfill_num_hash)
1421        } else {
1422            self.state.tree_state.remove_until(
1423                backfill_num_hash,
1424                self.persistence_state.last_persisted_block.hash,
1425                Some(backfill_num_hash),
1426            );
1427        }
1428
1429        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1430        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1431
1432        // remove all buffered blocks below the backfill height
1433        self.state.buffer.remove_old_blocks(backfill_height);
1434        // we remove all entries because now we're synced to the backfill target and consider this
1435        // the canonical chain
1436        self.canonical_in_memory_state.clear_state();
1437
1438        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1439            // update the tracked chain height, after backfill sync both the canonical height and
1440            // persisted height are the same
1441            self.state.tree_state.set_canonical_head(new_head.num_hash());
1442            self.persistence_state.finish(new_head.hash(), new_head.number());
1443
1444            // update the tracked canonical head
1445            self.canonical_in_memory_state.set_canonical_head(new_head);
1446        }
1447
1448        // check if we need to run backfill again by comparing the most recent finalized height to
1449        // the backfill height
1450        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1451        else {
1452            return Ok(())
1453        };
1454        if sync_target_state.finalized_block_hash.is_zero() {
1455            // no finalized block, can't check distance
1456            return Ok(())
1457        }
1458        // get the block number of the finalized block, if we have it
1459        let newest_finalized = self
1460            .state
1461            .buffer
1462            .block(&sync_target_state.finalized_block_hash)
1463            .map(|block| block.number());
1464
1465        // The block number that the backfill finished at - if the progress or newest
1466        // finalized is None then we can't check the distance anyways.
1467        //
1468        // If both are Some, we perform another distance check and return the desired
1469        // backfill target
1470        if let Some(backfill_target) =
1471            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1472                // Determines whether or not we should run backfill again, in case
1473                // the new gap is still large enough and requires running backfill again
1474                self.backfill_sync_target(progress, finalized_number, None)
1475            })
1476        {
1477            // request another backfill run
1478            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1479                backfill_target.into(),
1480            )));
1481            return Ok(())
1482        };
1483
1484        // try to close the gap by executing buffered blocks that are child blocks of the new head
1485        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1486    }
1487
1488    /// Attempts to make the given target canonical.
1489    ///
1490    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1491    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1492        if let Some(chain_update) = self.on_new_head(target)? {
1493            self.on_canonical_chain_update(chain_update);
1494        }
1495
1496        Ok(())
1497    }
1498
1499    /// Convenience function to handle an optional tree event.
1500    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1501        if let Some(event) = event {
1502            self.on_tree_event(event)?;
1503        }
1504
1505        Ok(())
1506    }
1507
1508    /// Handles a tree event.
1509    ///
1510    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1511    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1512        match event {
1513            TreeEvent::TreeAction(action) => match action {
1514                TreeAction::MakeCanonical { sync_target_head } => {
1515                    self.make_canonical(sync_target_head)?;
1516                }
1517            },
1518            TreeEvent::BackfillAction(action) => {
1519                self.emit_event(EngineApiEvent::BackfillAction(action));
1520            }
1521            TreeEvent::Download(action) => {
1522                self.emit_event(EngineApiEvent::Download(action));
1523            }
1524        }
1525
1526        Ok(())
1527    }
1528
1529    /// Emits an outgoing event to the engine.
1530    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1531        let event = event.into();
1532
1533        if event.is_backfill_action() {
1534            debug_assert_eq!(
1535                self.backfill_sync_state,
1536                BackfillSyncState::Idle,
1537                "backfill action should only be emitted when backfill is idle"
1538            );
1539
1540            if self.persistence_state.in_progress() {
1541                // backfill sync and persisting data are mutually exclusive, so we can't start
1542                // backfill while we're still persisting
1543                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1544                return
1545            }
1546
1547            self.backfill_sync_state = BackfillSyncState::Pending;
1548            self.metrics.engine.pipeline_runs.increment(1);
1549            debug!(target: "engine::tree", "emitting backfill action event");
1550        }
1551
1552        let _ = self.outgoing.send(event).inspect_err(
1553            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1554        );
1555    }
1556
1557    /// Returns true if the canonical chain length minus the last persisted
1558    /// block is greater than or equal to the persistence threshold and
1559    /// backfill is not running.
1560    pub const fn should_persist(&self) -> bool {
1561        if !self.backfill_sync_state.is_idle() {
1562            // can't persist if backfill is running
1563            return false
1564        }
1565
1566        let min_block = self.persistence_state.last_persisted_block.number;
1567        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1568            self.config.persistence_threshold()
1569    }
1570
1571    /// Returns a batch of consecutive canonical blocks to persist in the range
1572    /// `(last_persisted_number .. canonical_head - threshold]`. The expected
1573    /// order is oldest -> newest.
1574    ///
1575    /// If any blocks are missing trie updates, all blocks are persisted, not taking `threshold`
1576    /// into account.
1577    ///
1578    /// For those blocks that didn't have the trie updates calculated, runs the state root
1579    /// calculation, and saves the trie updates.
1580    ///
1581    /// Returns an error if the state root calculation fails.
1582    fn get_canonical_blocks_to_persist(
1583        &mut self,
1584    ) -> Result<Vec<ExecutedBlockWithTrieUpdates<N>>, AdvancePersistenceError> {
1585        // We will calculate the state root using the database, so we need to be sure there are no
1586        // changes
1587        debug_assert!(!self.persistence_state.in_progress());
1588
1589        let mut blocks_to_persist = Vec::new();
1590        let mut current_hash = self.state.tree_state.canonical_block_hash();
1591        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1592        let canonical_head_number = self.state.tree_state.canonical_block_number();
1593        let all_blocks_have_trie_updates = self
1594            .state
1595            .tree_state
1596            .blocks_by_hash
1597            .values()
1598            .all(|block| block.trie_updates().is_some());
1599
1600        let target_number = if all_blocks_have_trie_updates {
1601            // Persist only up to block buffer target if all blocks have trie updates
1602            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1603        } else {
1604            // Persist all blocks if any block is missing trie updates
1605            canonical_head_number
1606        };
1607
1608        debug!(
1609            target: "engine::tree",
1610            ?current_hash,
1611            ?last_persisted_number,
1612            ?canonical_head_number,
1613            ?all_blocks_have_trie_updates,
1614            ?target_number,
1615            "Returning canonical blocks to persist"
1616        );
1617        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1618            if block.recovered_block().number() <= last_persisted_number {
1619                break;
1620            }
1621
1622            if block.recovered_block().number() <= target_number {
1623                blocks_to_persist.push(block.clone());
1624            }
1625
1626            current_hash = block.recovered_block().parent_hash();
1627        }
1628
1629        // Reverse the order so that the oldest block comes first
1630        blocks_to_persist.reverse();
1631
1632        // Calculate missing trie updates
1633        for block in &mut blocks_to_persist {
1634            if block.trie.is_present() {
1635                continue
1636            }
1637
1638            debug!(
1639                target: "engine::tree",
1640                block = ?block.recovered_block().num_hash(),
1641                "Calculating trie updates before persisting"
1642            );
1643
1644            let provider = self
1645                .state_provider_builder(block.recovered_block().parent_hash())?
1646                .ok_or(AdvancePersistenceError::MissingAncestor(
1647                    block.recovered_block().parent_hash(),
1648                ))?
1649                .build()?;
1650
1651            let mut trie_input = self.compute_trie_input(
1652                self.persisting_kind_for(block.recovered_block.block_with_parent()),
1653                self.provider.database_provider_ro()?,
1654                block.recovered_block().parent_hash(),
1655                None,
1656            )?;
1657            // Extend with block we are generating trie updates for.
1658            trie_input.append_ref(block.hashed_state());
1659            let (_root, updates) = provider.state_root_from_nodes_with_updates(trie_input)?;
1660            debug_assert_eq!(_root, block.recovered_block().state_root());
1661
1662            // Update trie updates in both tree state and blocks to persist that we return
1663            let trie_updates = Arc::new(updates);
1664            let tree_state_block = self
1665                .state
1666                .tree_state
1667                .blocks_by_hash
1668                .get_mut(&block.recovered_block().hash())
1669                .expect("blocks to persist are constructed from tree state blocks");
1670            tree_state_block.trie.set_present(trie_updates.clone());
1671            block.trie.set_present(trie_updates);
1672        }
1673
1674        Ok(blocks_to_persist)
1675    }
1676
1677    /// This clears the blocks from the in-memory tree state that have been persisted to the
1678    /// database.
1679    ///
1680    /// This also updates the canonical in-memory state to reflect the newest persisted block
1681    /// height.
1682    ///
1683    /// Assumes that `finish` has been called on the `persistence_state` at least once
1684    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1685        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1686        // state.
1687        if let Some(remove_above) = self.find_disk_reorg()? {
1688            self.remove_blocks(remove_above);
1689            return Ok(())
1690        }
1691
1692        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1693        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1694        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1695            number: self.persistence_state.last_persisted_block.number,
1696            hash: self.persistence_state.last_persisted_block.hash,
1697        });
1698        Ok(())
1699    }
1700
1701    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1702    ///
1703    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1704    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1705    /// has in memory.
1706    ///
1707    /// For finalized blocks, this will return `None`.
1708    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1709        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1710        // check memory first
1711        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1712            return Ok(Some(block.block.clone()))
1713        }
1714
1715        let (block, senders) = self
1716            .provider
1717            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1718            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1719            .split_sealed();
1720        let execution_output = self
1721            .provider
1722            .get_state(block.header().number())?
1723            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1724        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1725
1726        Ok(Some(ExecutedBlock {
1727            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1728            execution_output: Arc::new(execution_output),
1729            hashed_state: Arc::new(hashed_state),
1730        }))
1731    }
1732
1733    /// Return sealed block header from in-memory state or database by hash.
1734    fn sealed_header_by_hash(
1735        &self,
1736        hash: B256,
1737    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1738        // check memory first
1739        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1740
1741        if header.is_some() {
1742            Ok(header)
1743        } else {
1744            self.provider.sealed_header_by_hash(hash)
1745        }
1746    }
1747
1748    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1749    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1750    /// not exist in the buffer, this returns the hash that is passed in.
1751    ///
1752    /// Returns the parent hash of the block itself if the block is buffered and has no other
1753    /// buffered ancestors.
1754    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1755        self.state
1756            .buffer
1757            .lowest_ancestor(&hash)
1758            .map(|block| block.parent_hash())
1759            .unwrap_or_else(|| hash)
1760    }
1761
1762    /// If validation fails, the response MUST contain the latest valid hash:
1763    ///
1764    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1765    ///     conditions:
1766    ///     - It is fully validated and deemed VALID
1767    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1768    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1769    ///     conditions are satisfied by a `PoW` block.
1770    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1771    ///     the above conditions.
1772    fn latest_valid_hash_for_invalid_payload(
1773        &mut self,
1774        parent_hash: B256,
1775    ) -> ProviderResult<Option<B256>> {
1776        // Check if parent exists in side chain or in canonical chain.
1777        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1778            return Ok(Some(parent_hash))
1779        }
1780
1781        // iterate over ancestors in the invalid cache
1782        // until we encounter the first valid ancestor
1783        let mut current_hash = parent_hash;
1784        let mut current_block = self.state.invalid_headers.get(&current_hash);
1785        while let Some(block_with_parent) = current_block {
1786            current_hash = block_with_parent.parent;
1787            current_block = self.state.invalid_headers.get(&current_hash);
1788
1789            // If current_header is None, then the current_hash does not have an invalid
1790            // ancestor in the cache, check its presence in blockchain tree
1791            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1792                return Ok(Some(current_hash))
1793            }
1794        }
1795        Ok(None)
1796    }
1797
1798    /// Prepares the invalid payload response for the given hash, checking the
1799    /// database for the parent hash and populating the payload status with the latest valid hash
1800    /// according to the engine api spec.
1801    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1802        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1803        // PoW block, which we need to identify by looking at the parent's block difficulty
1804        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? {
1805            if !parent.difficulty().is_zero() {
1806                parent_hash = B256::ZERO;
1807            }
1808        }
1809
1810        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1811        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1812            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1813        })
1814        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1815    }
1816
1817    /// Returns true if the given hash is the last received sync target block.
1818    ///
1819    /// See [`ForkchoiceStateTracker::sync_target_state`]
1820    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1821        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1822            return target.head_block_hash == block_hash
1823        }
1824        false
1825    }
1826
1827    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1828    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1829    ///
1830    /// Returns a payload status response according to the engine API spec if the block is known to
1831    /// be invalid.
1832    fn check_invalid_ancestor_with_head(
1833        &mut self,
1834        check: B256,
1835        head: &SealedBlock<N::Block>,
1836    ) -> ProviderResult<Option<PayloadStatus>> {
1837        // check if the check hash was previously marked as invalid
1838        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1839
1840        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1841    }
1842
1843    /// Invoked when a new payload received is invalid.
1844    fn on_invalid_new_payload(
1845        &mut self,
1846        head: SealedBlock<N::Block>,
1847        invalid: BlockWithParent,
1848    ) -> ProviderResult<PayloadStatus> {
1849        // populate the latest valid hash field
1850        let status = self.prepare_invalid_response(invalid.parent)?;
1851
1852        // insert the head block into the invalid header cache
1853        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1854        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1855
1856        Ok(status)
1857    }
1858
1859    /// Checks if the given `head` points to an invalid header, which requires a specific response
1860    /// to a forkchoice update.
1861    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1862        // check if the head was previously marked as invalid
1863        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1864        // populate the latest valid hash field
1865        Ok(Some(self.prepare_invalid_response(header.parent)?))
1866    }
1867
1868    /// Validate if block is correct and satisfies all the consensus rules that concern the header
1869    /// and block body itself.
1870    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1871        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1872            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1873            return Err(e)
1874        }
1875
1876        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1877            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1878            return Err(e)
1879        }
1880
1881        Ok(())
1882    }
1883
1884    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
1885    #[instrument(level = "trace", skip(self), target = "engine::tree")]
1886    fn try_connect_buffered_blocks(
1887        &mut self,
1888        parent: BlockNumHash,
1889    ) -> Result<(), InsertBlockFatalError> {
1890        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1891
1892        if blocks.is_empty() {
1893            // nothing to append
1894            return Ok(())
1895        }
1896
1897        let now = Instant::now();
1898        let block_count = blocks.len();
1899        for child in blocks {
1900            let child_num_hash = child.num_hash();
1901            match self.insert_block(child) {
1902                Ok(res) => {
1903                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1904                    if self.is_sync_target_head(child_num_hash.hash) &&
1905                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1906                    {
1907                        self.make_canonical(child_num_hash.hash)?;
1908                    }
1909                }
1910                Err(err) => {
1911                    if let InsertPayloadError::Block(err) = err {
1912                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1913                        if let Err(fatal) = self.on_insert_block_error(err) {
1914                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1915                            return Err(fatal)
1916                        }
1917                    }
1918                }
1919            }
1920        }
1921
1922        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1923        Ok(())
1924    }
1925
1926    /// Pre-validates the block and inserts it into the buffer.
1927    fn buffer_block(
1928        &mut self,
1929        block: RecoveredBlock<N::Block>,
1930    ) -> Result<(), InsertBlockError<N::Block>> {
1931        if let Err(err) = self.validate_block(&block) {
1932            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1933        }
1934        self.state.buffer.insert_block(block);
1935        Ok(())
1936    }
1937
1938    /// Returns true if the distance from the local tip to the block is greater than the configured
1939    /// threshold.
1940    ///
1941    /// If the `local_tip` is greater than the `block`, then this will return false.
1942    #[inline]
1943    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1944        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1945    }
1946
1947    /// Returns how far the local tip is from the given block. If the local tip is at the same
1948    /// height or its block number is greater than the given block, this returns None.
1949    #[inline]
1950    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1951        if block > local_tip {
1952            Some(block - local_tip)
1953        } else {
1954            None
1955        }
1956    }
1957
1958    /// Returns the target hash to sync to if the distance from the local tip to the block is
1959    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
1960    /// that block already).
1961    ///
1962    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
1963    /// (missing) finalized block.
1964    fn backfill_sync_target(
1965        &self,
1966        canonical_tip_num: u64,
1967        target_block_number: u64,
1968        downloaded_block: Option<BlockNumHash>,
1969    ) -> Option<B256> {
1970        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1971
1972        // check if the downloaded block is the tracked finalized block
1973        let mut exceeds_backfill_threshold = if let Some(buffered_finalized) = sync_target_state
1974            .as_ref()
1975            .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1976        {
1977            // if we have buffered the finalized block, we should check how far
1978            // we're off
1979            self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number())
1980        } else {
1981            // check if the distance exceeds the threshold for backfill sync
1982            self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
1983        };
1984
1985        // If this is invoked after we downloaded a block we can check if this block is the
1986        // finalized block
1987        if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1988            if downloaded_block.hash == state.finalized_block_hash {
1989                // we downloaded the finalized block and can now check how far we're off
1990                exceeds_backfill_threshold =
1991                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1992            }
1993        }
1994
1995        // if the number of missing blocks is greater than the max, trigger backfill
1996        if exceeds_backfill_threshold {
1997            if let Some(state) = sync_target_state {
1998                // if we have already canonicalized the finalized block, we should skip backfill
1999                match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2000                    Err(err) => {
2001                        warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2002                    }
2003                    Ok(None) => {
2004                        // ensure the finalized block is known (not the zero hash)
2005                        if !state.finalized_block_hash.is_zero() {
2006                            // we don't have the block yet and the distance exceeds the allowed
2007                            // threshold
2008                            return Some(state.finalized_block_hash)
2009                        }
2010
2011                        // OPTIMISTIC SYNCING
2012                        //
2013                        // It can happen when the node is doing an
2014                        // optimistic sync, where the CL has no knowledge of the finalized hash,
2015                        // but is expecting the EL to sync as high
2016                        // as possible before finalizing.
2017                        //
2018                        // This usually doesn't happen on ETH mainnet since CLs use the more
2019                        // secure checkpoint syncing.
2020                        //
2021                        // However, optimism chains will do this. The risk of a reorg is however
2022                        // low.
2023                        debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2024                        return Some(state.head_block_hash)
2025                    }
2026                    Ok(Some(_)) => {
2027                        // we're fully synced to the finalized block
2028                    }
2029                }
2030            }
2031        }
2032
2033        None
2034    }
2035
2036    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2037    /// happen if a reorg is happening while we are persisting a block.
2038    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2039        let mut canonical = self.state.tree_state.current_canonical_head;
2040        let mut persisted = self.persistence_state.last_persisted_block;
2041
2042        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2043            Ok(self
2044                .sealed_header_by_hash(num_hash.hash)?
2045                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2046                .parent_num_hash())
2047        };
2048
2049        // Happy path, canonical chain is ahead or equal to persisted chain.
2050        // Walk canonical chain back to make sure that it connects to persisted chain.
2051        while canonical.number > persisted.number {
2052            canonical = parent_num_hash(canonical)?;
2053        }
2054
2055        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2056        if canonical == persisted {
2057            return Ok(None);
2058        }
2059
2060        // At this point, we know that `persisted` block can't be reached by walking the canonical
2061        // chain back. In this case we need to truncate it to the first canonical block it connects
2062        // to.
2063
2064        // Firstly, walk back until we reach the same height as `canonical`.
2065        while persisted.number > canonical.number {
2066            persisted = parent_num_hash(persisted)?;
2067        }
2068
2069        debug_assert_eq!(persisted.number, canonical.number);
2070
2071        // Now walk both chains back until we find a common ancestor.
2072        while persisted.hash != canonical.hash {
2073            canonical = parent_num_hash(canonical)?;
2074            persisted = parent_num_hash(persisted)?;
2075        }
2076
2077        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2078
2079        Ok(Some(persisted.number))
2080    }
2081
2082    /// Invoked when we the canonical chain has been updated.
2083    ///
2084    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2085    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2086        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2087        let start = Instant::now();
2088
2089        // update the tracked canonical head
2090        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2091
2092        let tip = chain_update.tip().clone_sealed_header();
2093        let notification = chain_update.to_chain_notification();
2094
2095        // reinsert any missing reorged blocks
2096        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2097            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2098            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2099            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2100
2101            self.update_reorg_metrics(old.len());
2102            self.reinsert_reorged_blocks(new.clone());
2103            // Try reinserting the reorged canonical chain. This is only possible if we have
2104            // `persisted_trie_updates` for those blocks.
2105            let old = old
2106                .iter()
2107                .filter_map(|block| {
2108                    let trie = self
2109                        .state
2110                        .tree_state
2111                        .persisted_trie_updates
2112                        .get(&block.recovered_block.hash())?
2113                        .1
2114                        .clone();
2115                    Some(ExecutedBlockWithTrieUpdates {
2116                        block: block.clone(),
2117                        trie: ExecutedTrieUpdates::Present(trie),
2118                    })
2119                })
2120                .collect::<Vec<_>>();
2121            self.reinsert_reorged_blocks(old);
2122        }
2123
2124        // update the tracked in-memory state with the new chain
2125        self.canonical_in_memory_state.update_chain(chain_update);
2126        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2127
2128        // Update metrics based on new tip
2129        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2130
2131        // sends an event to all active listeners about the new canonical chain
2132        self.canonical_in_memory_state.notify_canon_state(notification);
2133
2134        // emit event
2135        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2136            Box::new(tip),
2137            start.elapsed(),
2138        ));
2139    }
2140
2141    /// This updates metrics based on the given reorg length.
2142    fn update_reorg_metrics(&self, old_chain_length: usize) {
2143        self.metrics.tree.reorgs.increment(1);
2144        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2145    }
2146
2147    /// This reinserts any blocks in the new chain that do not already exist in the tree
2148    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2149        for block in new_chain {
2150            if self
2151                .state
2152                .tree_state
2153                .executed_block_by_hash(block.recovered_block().hash())
2154                .is_none()
2155            {
2156                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2157                self.state.tree_state.insert_executed(block);
2158            }
2159        }
2160    }
2161
2162    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2163    ///
2164    /// This mainly compares the missing parent of the downloaded block with the current canonical
2165    /// tip, and decides whether or not backfill sync should be triggered.
2166    fn on_disconnected_downloaded_block(
2167        &self,
2168        downloaded_block: BlockNumHash,
2169        missing_parent: BlockNumHash,
2170        head: BlockNumHash,
2171    ) -> Option<TreeEvent> {
2172        // compare the missing parent with the canonical tip
2173        if let Some(target) =
2174            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2175        {
2176            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2177            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2178        }
2179
2180        // continue downloading the missing parent
2181        //
2182        // this happens if either:
2183        //  * the missing parent block num < canonical tip num
2184        //    * this case represents a missing block on a fork that is shorter than the canonical
2185        //      chain
2186        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2187        //    less than the backfill threshold
2188        //    * this case represents a potentially long range of blocks to download and execute
2189        let request = if let Some(distance) =
2190            self.distance_from_local_tip(head.number, missing_parent.number)
2191        {
2192            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2193            DownloadRequest::BlockRange(missing_parent.hash, distance)
2194        } else {
2195            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2196            // This happens when the missing parent is on an outdated
2197            // sidechain and we can only download the missing block itself
2198            DownloadRequest::single_block(missing_parent.hash)
2199        };
2200
2201        Some(TreeEvent::Download(request))
2202    }
2203
2204    /// Invoked with a block downloaded from the network
2205    ///
2206    /// Returns an event with the appropriate action to take, such as:
2207    ///  - download more missing blocks
2208    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2209    #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2210    fn on_downloaded_block(
2211        &mut self,
2212        block: RecoveredBlock<N::Block>,
2213    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2214        let block_num_hash = block.num_hash();
2215        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2216        if self
2217            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2218            .is_some()
2219        {
2220            return Ok(None)
2221        }
2222
2223        if !self.backfill_sync_state.is_idle() {
2224            return Ok(None)
2225        }
2226
2227        // try to append the block
2228        match self.insert_block(block) {
2229            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2230                if self.is_sync_target_head(block_num_hash.hash) {
2231                    trace!(target: "engine::tree", "appended downloaded sync target block");
2232
2233                    // we just inserted the current sync target block, we can try to make it
2234                    // canonical
2235                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2236                        sync_target_head: block_num_hash.hash,
2237                    })))
2238                }
2239                trace!(target: "engine::tree", "appended downloaded block");
2240                self.try_connect_buffered_blocks(block_num_hash)?;
2241            }
2242            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2243                // block is not connected to the canonical head, we need to download
2244                // its missing branch first
2245                return Ok(self.on_disconnected_downloaded_block(
2246                    block_num_hash,
2247                    missing_ancestor,
2248                    head,
2249                ))
2250            }
2251            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2252                trace!(target: "engine::tree", "downloaded block already executed");
2253            }
2254            Err(err) => {
2255                if let InsertPayloadError::Block(err) = err {
2256                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2257                    if let Err(fatal) = self.on_insert_block_error(err) {
2258                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2259                        return Err(fatal)
2260                    }
2261                }
2262            }
2263        }
2264        Ok(None)
2265    }
2266
2267    fn insert_payload(
2268        &mut self,
2269        payload: T::ExecutionData,
2270    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2271        self.insert_block_or_payload(
2272            payload.block_with_parent(),
2273            payload,
2274            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2275            |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2276        )
2277    }
2278
2279    fn insert_block(
2280        &mut self,
2281        block: RecoveredBlock<N::Block>,
2282    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2283        self.insert_block_or_payload(
2284            block.block_with_parent(),
2285            block,
2286            |validator, block, ctx| validator.validate_block(block, ctx),
2287            |_, block| Ok(block),
2288        )
2289    }
2290
2291    fn insert_block_or_payload<Input, Err>(
2292        &mut self,
2293        block_id: BlockWithParent,
2294        input: Input,
2295        execute: impl FnOnce(
2296            &mut V,
2297            Input,
2298            TreeCtx<'_, N>,
2299        ) -> Result<ExecutedBlockWithTrieUpdates<N>, Err>,
2300        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2301    ) -> Result<InsertPayloadOk, Err>
2302    where
2303        Err: From<InsertBlockError<N::Block>>,
2304    {
2305        let block_insert_start = Instant::now();
2306        let block_num_hash = block_id.block;
2307        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2308
2309        match self.sealed_header_by_hash(block_num_hash.hash) {
2310            Err(err) => {
2311                let block = convert_to_block(self, input)?;
2312                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2313            }
2314            Ok(Some(_)) => {
2315                // We now assume that we already have this block in the tree. However, we need to
2316                // run the conversion to ensure that the block hash is valid.
2317                convert_to_block(self, input)?;
2318                return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2319            }
2320            _ => {}
2321        };
2322
2323        // Ensure that the parent state is available.
2324        match self.state_provider_builder(block_id.parent) {
2325            Err(err) => {
2326                let block = convert_to_block(self, input)?;
2327                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2328            }
2329            Ok(None) => {
2330                let block = convert_to_block(self, input)?;
2331
2332                // we don't have the state required to execute this block, buffering it and find the
2333                // missing parent block
2334                let missing_ancestor = self
2335                    .state
2336                    .buffer
2337                    .lowest_ancestor(&block.parent_hash())
2338                    .map(|block| block.parent_num_hash())
2339                    .unwrap_or_else(|| block.parent_num_hash());
2340
2341                self.state.buffer.insert_block(block);
2342
2343                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2344                    head: self.state.tree_state.current_canonical_head,
2345                    missing_ancestor,
2346                }))
2347            }
2348            Ok(Some(_)) => {}
2349        }
2350
2351        // determine whether we are on a fork chain
2352        let is_fork = match self.is_fork(block_id) {
2353            Err(err) => {
2354                let block = convert_to_block(self, input)?;
2355                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2356            }
2357            Ok(is_fork) => is_fork,
2358        };
2359
2360        let ctx =
2361            TreeCtx::new(&mut self.state, &self.persistence_state, &self.canonical_in_memory_state);
2362
2363        let start = Instant::now();
2364
2365        let executed = execute(&mut self.payload_validator, input, ctx)?;
2366
2367        // if the parent is the canonical head, we can insert the block as the pending block
2368        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2369        {
2370            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2371            self.canonical_in_memory_state.set_pending_block(executed.clone());
2372        }
2373
2374        self.state.tree_state.insert_executed(executed.clone());
2375        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2376
2377        // emit insert event
2378        let elapsed = start.elapsed();
2379        let engine_event = if is_fork {
2380            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2381        } else {
2382            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2383        };
2384        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2385
2386        self.metrics
2387            .engine
2388            .block_insert_total_duration
2389            .record(block_insert_start.elapsed().as_secs_f64());
2390        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2391        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2392    }
2393
2394    /// Computes the trie input at the provided parent hash.
2395    ///
2396    /// The goal of this function is to take in-memory blocks and generate a [`TrieInput`] that
2397    /// serves as an overlay to the database blocks.
2398    ///
2399    /// It works as follows:
2400    /// 1. Collect in-memory blocks that are descendants of the provided parent hash using
2401    ///    [`TreeState::blocks_by_hash`].
2402    /// 2. If the persistence is in progress, and the block that we're computing the trie input for
2403    ///    is a descendant of the currently persisting blocks, we need to be sure that in-memory
2404    ///    blocks are not overlapping with the database blocks that may have been already persisted.
2405    ///    To do that, we're filtering out in-memory blocks that are lower than the highest database
2406    ///    block.
2407    /// 3. Once in-memory blocks are collected and optionally filtered, we compute the
2408    ///    [`HashedPostState`] from them.
2409    fn compute_trie_input<TP: DBProvider + BlockNumReader>(
2410        &self,
2411        persisting_kind: PersistingKind,
2412        provider: TP,
2413        parent_hash: B256,
2414        allocated_trie_input: Option<TrieInput>,
2415    ) -> ProviderResult<TrieInput> {
2416        // get allocated trie input or use a default trie input
2417        let mut input = allocated_trie_input.unwrap_or_default();
2418
2419        let best_block_number = provider.best_block_number()?;
2420
2421        let (mut historical, mut blocks) = self
2422            .state
2423            .tree_state
2424            .blocks_by_hash(parent_hash)
2425            .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2426
2427        // If the current block is a descendant of the currently persisting blocks, then we need to
2428        // filter in-memory blocks, so that none of them are already persisted in the database.
2429        if persisting_kind.is_descendant() {
2430            // Iterate over the blocks from oldest to newest.
2431            while let Some(block) = blocks.last() {
2432                let recovered_block = block.recovered_block();
2433                if recovered_block.number() <= best_block_number {
2434                    // Remove those blocks that lower than or equal to the highest database
2435                    // block.
2436                    blocks.pop();
2437                } else {
2438                    // If the block is higher than the best block number, stop filtering, as it's
2439                    // the first block that's not in the database.
2440                    break
2441                }
2442            }
2443
2444            historical = if let Some(block) = blocks.last() {
2445                // If there are any in-memory blocks left after filtering, set the anchor to the
2446                // parent of the oldest block.
2447                (block.recovered_block().number() - 1).into()
2448            } else {
2449                // Otherwise, set the anchor to the original provided parent hash.
2450                parent_hash.into()
2451            };
2452        }
2453
2454        if blocks.is_empty() {
2455            debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2456        } else {
2457            debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2458        }
2459
2460        // Convert the historical block to the block number.
2461        let block_number = provider
2462            .convert_hash_or_number(historical)?
2463            .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2464
2465        // Retrieve revert state for historical block.
2466        let revert_state = if block_number == best_block_number {
2467            // We do not check against the `last_block_number` here because
2468            // `HashedPostState::from_reverts` only uses the database tables, and not static files.
2469            debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2470            HashedPostState::default()
2471        } else {
2472            let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
2473                provider.tx_ref(),
2474                block_number + 1,
2475            )
2476            .map_err(ProviderError::from)?;
2477            debug!(
2478                target: "engine::tree",
2479                block_number,
2480                best_block_number,
2481                accounts = revert_state.accounts.len(),
2482                storages = revert_state.storages.len(),
2483                "Non-empty revert state"
2484            );
2485            revert_state
2486        };
2487        input.append(revert_state);
2488
2489        // Extend with contents of parent in-memory blocks.
2490        input.extend_with_blocks(
2491            blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
2492        );
2493
2494        Ok(input)
2495    }
2496
2497    /// Handles an error that occurred while inserting a block.
2498    ///
2499    /// If this is a validation error this will mark the block as invalid.
2500    ///
2501    /// Returns the proper payload status response if the block is invalid.
2502    fn on_insert_block_error(
2503        &mut self,
2504        error: InsertBlockError<N::Block>,
2505    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2506        let (block, error) = error.split();
2507
2508        // if invalid block, we check the validation error. Otherwise return the fatal
2509        // error.
2510        let validation_err = error.ensure_validation_error()?;
2511
2512        // If the error was due to an invalid payload, the payload is added to the
2513        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2514        // returned.
2515        warn!(
2516            target: "engine::tree",
2517            invalid_hash=%block.hash(),
2518            invalid_number=block.number(),
2519            %validation_err,
2520            "Invalid block error on new payload",
2521        );
2522        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2523
2524        // keep track of the invalid header
2525        self.state.invalid_headers.insert(block.block_with_parent());
2526        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2527            Box::new(block),
2528        )));
2529
2530        Ok(PayloadStatus::new(
2531            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2532            latest_valid_hash,
2533        ))
2534    }
2535
2536    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2537    fn on_new_payload_error(
2538        &mut self,
2539        error: NewPayloadError,
2540        parent_hash: B256,
2541    ) -> ProviderResult<PayloadStatus> {
2542        error!(target: "engine::tree", %error, "Invalid payload");
2543        // we need to convert the error to a payload status (response to the CL)
2544
2545        let latest_valid_hash =
2546            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2547                // Engine-API rules:
2548                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2549                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2550                None
2551            } else {
2552                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2553            };
2554
2555        let status = PayloadStatusEnum::from(error);
2556        Ok(PayloadStatus::new(status, latest_valid_hash))
2557    }
2558
2559    /// Attempts to find the header for the given block hash if it is canonical.
2560    pub fn find_canonical_header(
2561        &self,
2562        hash: B256,
2563    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2564        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2565
2566        if canonical.is_none() {
2567            canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2568        }
2569
2570        Ok(canonical)
2571    }
2572
2573    /// Updates the tracked finalized block if we have it.
2574    fn update_finalized_block(
2575        &self,
2576        finalized_block_hash: B256,
2577    ) -> Result<(), OnForkChoiceUpdated> {
2578        if finalized_block_hash.is_zero() {
2579            return Ok(())
2580        }
2581
2582        match self.find_canonical_header(finalized_block_hash) {
2583            Ok(None) => {
2584                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2585                // if the finalized block is not known, we can't update the finalized block
2586                return Err(OnForkChoiceUpdated::invalid_state())
2587            }
2588            Ok(Some(finalized)) => {
2589                if Some(finalized.num_hash()) !=
2590                    self.canonical_in_memory_state.get_finalized_num_hash()
2591                {
2592                    // we're also persisting the finalized block on disk so we can reload it on
2593                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2594                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2595                    self.canonical_in_memory_state.set_finalized(finalized);
2596                }
2597            }
2598            Err(err) => {
2599                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2600            }
2601        }
2602
2603        Ok(())
2604    }
2605
2606    /// Updates the tracked safe block if we have it
2607    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2608        if safe_block_hash.is_zero() {
2609            return Ok(())
2610        }
2611
2612        match self.find_canonical_header(safe_block_hash) {
2613            Ok(None) => {
2614                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2615                // if the safe block is not known, we can't update the safe block
2616                return Err(OnForkChoiceUpdated::invalid_state())
2617            }
2618            Ok(Some(safe)) => {
2619                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2620                    // we're also persisting the safe block on disk so we can reload it on
2621                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2622                    let _ = self.persistence.save_safe_block_number(safe.number());
2623                    self.canonical_in_memory_state.set_safe(safe);
2624                }
2625            }
2626            Err(err) => {
2627                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2628            }
2629        }
2630
2631        Ok(())
2632    }
2633
2634    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2635    /// made canonical.
2636    ///
2637    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2638    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2639    ///
2640    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2641    /// are consistent with the head block.
2642    fn ensure_consistent_forkchoice_state(
2643        &self,
2644        state: ForkchoiceState,
2645    ) -> Result<(), OnForkChoiceUpdated> {
2646        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2647        // after the head block is canonicalized.
2648        //
2649        // This ensures that the finalized block is consistent with the head block, i.e. the
2650        // finalized block is an ancestor of the head block.
2651        self.update_finalized_block(state.finalized_block_hash)?;
2652
2653        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2654        // after the head block is canonicalized.
2655        //
2656        // This ensures that the safe block is consistent with the head block, i.e. the safe
2657        // block is an ancestor of the head block.
2658        self.update_safe_block(state.safe_block_hash)
2659    }
2660
2661    /// Pre-validate forkchoice update and check whether it can be processed.
2662    ///
2663    /// This method returns the update outcome if validation fails or
2664    /// the node is syncing and the update cannot be processed at the moment.
2665    fn pre_validate_forkchoice_update(
2666        &mut self,
2667        state: ForkchoiceState,
2668    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2669        if state.head_block_hash.is_zero() {
2670            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2671        }
2672
2673        // check if the new head hash is connected to any ancestor that we previously marked as
2674        // invalid
2675        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2676        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2677            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2678        }
2679
2680        if !self.backfill_sync_state.is_idle() {
2681            // We can only process new forkchoice updates if the pipeline is idle, since it requires
2682            // exclusive access to the database
2683            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2684            return Ok(Some(OnForkChoiceUpdated::syncing()))
2685        }
2686
2687        Ok(None)
2688    }
2689
2690    /// Validates the payload attributes with respect to the header and fork choice state.
2691    ///
2692    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2693    /// return an error if the payload attributes are invalid.
2694    fn process_payload_attributes(
2695        &self,
2696        attrs: T::PayloadAttributes,
2697        head: &N::BlockHeader,
2698        state: ForkchoiceState,
2699        version: EngineApiMessageVersion,
2700    ) -> OnForkChoiceUpdated {
2701        if let Err(err) =
2702            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2703        {
2704            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2705            return OnForkChoiceUpdated::invalid_payload_attributes()
2706        }
2707
2708        // 8. Client software MUST begin a payload build process building on top of
2709        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2710        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2711        //    The build process is specified in the Payload building section.
2712        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2713            state.head_block_hash,
2714            attrs,
2715            version as u8,
2716        ) {
2717            Ok(attributes) => {
2718                // send the payload to the builder and return the receiver for the pending payload
2719                // id, initiating payload job is handled asynchronously
2720                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2721
2722                // Client software MUST respond to this method call in the following way:
2723                // {
2724                //      payloadStatus: {
2725                //          status: VALID,
2726                //          latestValidHash: forkchoiceState.headBlockHash,
2727                //          validationError: null
2728                //      },
2729                //      payloadId: buildProcessId
2730                // }
2731                //
2732                // if the payload is deemed VALID and the build process has begun.
2733                OnForkChoiceUpdated::updated_with_pending_payload_id(
2734                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2735                    pending_payload_id,
2736                )
2737            }
2738            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2739        }
2740    }
2741
2742    /// Remove all blocks up to __and including__ the given block number.
2743    ///
2744    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2745    /// those which have a fork point at or below the finalized hash.
2746    ///
2747    /// Canonical blocks below the upper bound will still be removed.
2748    pub(crate) fn remove_before(
2749        &mut self,
2750        upper_bound: BlockNumHash,
2751        finalized_hash: Option<B256>,
2752    ) -> ProviderResult<()> {
2753        // first fetch the finalized block number and then call the remove_before method on
2754        // tree_state
2755        let num = if let Some(hash) = finalized_hash {
2756            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2757        } else {
2758            None
2759        };
2760
2761        self.state.tree_state.remove_until(
2762            upper_bound,
2763            self.persistence_state.last_persisted_block.hash,
2764            num,
2765        );
2766        Ok(())
2767    }
2768
2769    /// Returns a builder for creating state providers for the given hash.
2770    ///
2771    /// This is an optimization for parallel execution contexts where we want to avoid
2772    /// creating state providers in the critical path.
2773    pub fn state_provider_builder(
2774        &self,
2775        hash: B256,
2776    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2777    where
2778        P: BlockReader + StateProviderFactory + StateReader + Clone,
2779    {
2780        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2781            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2782            // the block leads back to the canonical chain
2783            return Ok(Some(StateProviderBuilder::new(
2784                self.provider.clone(),
2785                historical,
2786                Some(blocks),
2787            )))
2788        }
2789
2790        // Check if the block is persisted
2791        if let Some(header) = self.provider.header(&hash)? {
2792            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2793            // For persisted blocks, we create a builder that will fetch state directly from the
2794            // database
2795            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2796        }
2797
2798        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2799        Ok(None)
2800    }
2801}
2802
2803/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2804/// variant.
2805///
2806/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2807/// is valid or not.
2808#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2809pub enum BlockStatus {
2810    /// The block is valid and block extends canonical chain.
2811    Valid,
2812    /// The block may be valid and has an unknown missing ancestor.
2813    Disconnected {
2814        /// Current canonical head.
2815        head: BlockNumHash,
2816        /// The lowest ancestor block that is not connected to the canonical chain.
2817        missing_ancestor: BlockNumHash,
2818    },
2819}
2820
2821/// How a payload was inserted if it was valid.
2822///
2823/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
2824/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
2825#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2826pub enum InsertPayloadOk {
2827    /// The payload was valid, but we have already seen it.
2828    AlreadySeen(BlockStatus),
2829    /// The payload was valid and inserted into the tree.
2830    Inserted(BlockStatus),
2831}
2832
2833/// Whether or not the blocks are currently persisting and the input block is a descendant.
2834#[derive(Debug, Clone, Copy)]
2835pub enum PersistingKind {
2836    /// The blocks are not currently persisting.
2837    NotPersisting,
2838    /// The blocks are currently persisting but the input block is not a descendant.
2839    PersistingNotDescendant,
2840    /// The blocks are currently persisting and the input block is a descendant.
2841    PersistingDescendant,
2842}
2843
2844impl PersistingKind {
2845    /// Returns true if the parallel state root can be run.
2846    ///
2847    /// We only run the parallel state root if we are not currently persisting any blocks or
2848    /// persisting blocks that are all ancestors of the one we are calculating the state root for.
2849    pub const fn can_run_parallel_state_root(&self) -> bool {
2850        matches!(self, Self::NotPersisting | Self::PersistingDescendant)
2851    }
2852
2853    /// Returns true if the blocks are currently being persisted and the input block is a
2854    /// descendant.
2855    pub const fn is_descendant(&self) -> bool {
2856        matches!(self, Self::PersistingDescendant)
2857    }
2858}