reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use persistence_state::CurrentPersistenceAction;
17use reth_chain_state::{
18    CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
19    MemoryOverlayStateProvider, NewCanonicalChain,
20};
21use reth_consensus::{Consensus, FullConsensus};
22use reth_engine_primitives::{
23    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
24    ForkchoiceStateTracker, OnForkChoiceUpdated,
25};
26use reth_errors::{ConsensusError, ProviderResult};
27use reth_evm::{ConfigureEvm, OnStateHook};
28use reth_payload_builder::PayloadBuilderHandle;
29use reth_payload_primitives::{
30    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
31};
32use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
33use reth_provider::{
34    providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
35    HashedPostStateProvider, ProviderError, StateProviderBox, StateProviderFactory, StateReader,
36    StateRootProvider, TransactionVariant,
37};
38use reth_revm::database::StateProviderDatabase;
39use reth_stages_api::ControlFlow;
40use reth_trie::{HashedPostState, TrieInput};
41use reth_trie_db::DatabaseHashedPostState;
42use revm::state::EvmState;
43use state::TreeState;
44use std::{
45    fmt::Debug,
46    sync::{
47        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
48        Arc,
49    },
50    time::Instant,
51};
52use tokio::sync::{
53    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
54    oneshot::{self, error::TryRecvError},
55};
56use tracing::*;
57
58mod block_buffer;
59mod cached_state;
60pub mod error;
61mod instrumented_state;
62mod invalid_headers;
63mod metrics;
64mod payload_processor;
65pub mod payload_validator;
66mod persistence_state;
67pub mod precompile_cache;
68#[cfg(test)]
69mod tests;
70// TODO(alexey): compare trie updates in `insert_block_inner`
71#[expect(unused)]
72mod trie_updates;
73
74use crate::tree::error::AdvancePersistenceError;
75pub use block_buffer::BlockBuffer;
76pub use invalid_headers::InvalidHeaderCache;
77pub use payload_processor::*;
78pub use payload_validator::{BasicEngineValidator, EngineValidator};
79pub use persistence_state::PersistenceState;
80pub use reth_engine_primitives::TreeConfig;
81use reth_trie::KeccakKeyHasher;
82
83pub mod state;
84
85/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
86///
87/// This is the default threshold, and represents the distance (gap) from the local head to a
88/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
89/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
90///
91/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
92/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
93/// backfill this gap.
94pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
95
96/// A builder for creating state providers that can be used across threads.
97#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99    /// The provider factory used to create providers.
100    provider_factory: P,
101    /// The historical block hash to fetch state from.
102    historical: B256,
103    /// The blocks that form the chain from historical to target and are in memory.
104    overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108    /// Creates a new state provider from the provider factory, historical block hash and optional
109    /// overlaid blocks.
110    pub const fn new(
111        provider_factory: P,
112        historical: B256,
113        overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
114    ) -> Self {
115        Self { provider_factory, historical, overlay }
116    }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121    P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123    /// Creates a new state provider from this builder.
124    pub fn build(&self) -> ProviderResult<StateProviderBox> {
125        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126        if let Some(overlay) = self.overlay.clone() {
127            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128        }
129        Ok(provider)
130    }
131}
132
133/// Tracks the state of the engine api internals.
134///
135/// This type is not shareable.
136#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138    /// Tracks the state of the blockchain tree.
139    tree_state: TreeState<N>,
140    /// Tracks the forkchoice state updates received by the CL.
141    forkchoice_state_tracker: ForkchoiceStateTracker,
142    /// Buffer of detached blocks.
143    buffer: BlockBuffer<N::Block>,
144    /// Tracks the header of invalid payloads that were rejected by the engine because they're
145    /// invalid.
146    invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150    fn new(
151        block_buffer_limit: u32,
152        max_invalid_header_cache_length: u32,
153        canonical_block: BlockNumHash,
154        engine_kind: EngineApiKind,
155    ) -> Self {
156        Self {
157            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158            buffer: BlockBuffer::new(block_buffer_limit),
159            tree_state: TreeState::new(canonical_block, engine_kind),
160            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161        }
162    }
163}
164
165/// The outcome of a tree operation.
166#[derive(Debug)]
167pub struct TreeOutcome<T> {
168    /// The outcome of the operation.
169    pub outcome: T,
170    /// An optional event to tell the caller to do something.
171    pub event: Option<TreeEvent>,
172}
173
174impl<T> TreeOutcome<T> {
175    /// Create new tree outcome.
176    pub const fn new(outcome: T) -> Self {
177        Self { outcome, event: None }
178    }
179
180    /// Set event on the outcome.
181    pub fn with_event(mut self, event: TreeEvent) -> Self {
182        self.event = Some(event);
183        self
184    }
185}
186
187/// Events that are triggered by Tree Chain
188#[derive(Debug)]
189pub enum TreeEvent {
190    /// Tree action is needed.
191    TreeAction(TreeAction),
192    /// Backfill action is needed.
193    BackfillAction(BackfillAction),
194    /// Block download is needed.
195    Download(DownloadRequest),
196}
197
198impl TreeEvent {
199    /// Returns true if the event is a backfill action.
200    const fn is_backfill_action(&self) -> bool {
201        matches!(self, Self::BackfillAction(_))
202    }
203}
204
205/// The actions that can be performed on the tree.
206#[derive(Debug)]
207pub enum TreeAction {
208    /// Make target canonical.
209    MakeCanonical {
210        /// The sync target head hash
211        sync_target_head: B256,
212    },
213}
214
215/// Wrapper struct that combines metrics and state hook
216struct MeteredStateHook {
217    metrics: reth_evm::metrics::ExecutorMetrics,
218    inner_hook: Box<dyn OnStateHook>,
219}
220
221impl OnStateHook for MeteredStateHook {
222    fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
223        // Update the metrics for the number of accounts, storage slots and bytecodes loaded
224        let accounts = state.keys().len();
225        let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
226        let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
227
228        self.metrics.accounts_loaded_histogram.record(accounts as f64);
229        self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
230        self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
231
232        // Call the original state hook
233        self.inner_hook.on_state(source, state);
234    }
235}
236
237/// The engine API tree handler implementation.
238///
239/// This type is responsible for processing engine API requests, maintaining the canonical state and
240/// emitting events.
241pub struct EngineApiTreeHandler<N, P, T, V, C>
242where
243    N: NodePrimitives,
244    T: PayloadTypes,
245    C: ConfigureEvm<Primitives = N> + 'static,
246{
247    provider: P,
248    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
249    payload_validator: V,
250    /// Keeps track of internals such as executed and buffered blocks.
251    state: EngineApiTreeState<N>,
252    /// The half for sending messages to the engine.
253    ///
254    /// This is kept so that we can queue in messages to ourself that we can process later, for
255    /// example distributing workload across multiple messages that would otherwise take too long
256    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
257    /// them one by one so that we can handle incoming engine API in between and don't become
258    /// unresponsive. This can happen during live sync transition where we're trying to close the
259    /// gap (up to 3 epochs of blocks in the worst case).
260    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
261    /// Incoming engine API requests.
262    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
263    /// Outgoing events that are emitted to the handler.
264    outgoing: UnboundedSender<EngineApiEvent<N>>,
265    /// Channels to the persistence layer.
266    persistence: PersistenceHandle<N>,
267    /// Tracks the state changes of the persistence task.
268    persistence_state: PersistenceState,
269    /// Flag indicating the state of the node's backfill synchronization process.
270    backfill_sync_state: BackfillSyncState,
271    /// Keeps track of the state of the canonical chain that isn't persisted yet.
272    /// This is intended to be accessed from external sources, such as rpc.
273    canonical_in_memory_state: CanonicalInMemoryState<N>,
274    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
275    /// updates
276    payload_builder: PayloadBuilderHandle<T>,
277    /// Configuration settings.
278    config: TreeConfig,
279    /// Metrics for the engine api.
280    metrics: EngineApiMetrics,
281    /// The engine API variant of this handler
282    engine_kind: EngineApiKind,
283    /// The EVM configuration.
284    evm_config: C,
285}
286
287impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
288    for EngineApiTreeHandler<N, P, T, V, C>
289where
290    N: NodePrimitives,
291    C: Debug + ConfigureEvm<Primitives = N>,
292{
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct("EngineApiTreeHandler")
295            .field("provider", &self.provider)
296            .field("consensus", &self.consensus)
297            .field("payload_validator", &self.payload_validator)
298            .field("state", &self.state)
299            .field("incoming_tx", &self.incoming_tx)
300            .field("persistence", &self.persistence)
301            .field("persistence_state", &self.persistence_state)
302            .field("backfill_sync_state", &self.backfill_sync_state)
303            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
304            .field("payload_builder", &self.payload_builder)
305            .field("config", &self.config)
306            .field("metrics", &self.metrics)
307            .field("engine_kind", &self.engine_kind)
308            .field("evm_config", &self.evm_config)
309            .finish()
310    }
311}
312
313impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
314where
315    N: NodePrimitives,
316    P: DatabaseProviderFactory
317        + BlockReader<Block = N::Block, Header = N::BlockHeader>
318        + StateProviderFactory
319        + StateReader<Receipt = N::Receipt>
320        + HashedPostStateProvider
321        + Clone
322        + 'static,
323    <P as DatabaseProviderFactory>::Provider:
324        BlockReader<Block = N::Block, Header = N::BlockHeader>,
325    C: ConfigureEvm<Primitives = N> + 'static,
326    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
327    V: EngineValidator<T>,
328{
329    /// Creates a new [`EngineApiTreeHandler`].
330    #[expect(clippy::too_many_arguments)]
331    pub fn new(
332        provider: P,
333        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
334        payload_validator: V,
335        outgoing: UnboundedSender<EngineApiEvent<N>>,
336        state: EngineApiTreeState<N>,
337        canonical_in_memory_state: CanonicalInMemoryState<N>,
338        persistence: PersistenceHandle<N>,
339        persistence_state: PersistenceState,
340        payload_builder: PayloadBuilderHandle<T>,
341        config: TreeConfig,
342        engine_kind: EngineApiKind,
343        evm_config: C,
344    ) -> Self {
345        let (incoming_tx, incoming) = std::sync::mpsc::channel();
346
347        Self {
348            provider,
349            consensus,
350            payload_validator,
351            incoming,
352            outgoing,
353            persistence,
354            persistence_state,
355            backfill_sync_state: BackfillSyncState::Idle,
356            state,
357            canonical_in_memory_state,
358            payload_builder,
359            config,
360            metrics: Default::default(),
361            incoming_tx,
362            engine_kind,
363            evm_config,
364        }
365    }
366
367    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
368    /// own thread.
369    ///
370    /// Returns the sender through which incoming requests can be sent to the task and the receiver
371    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
372    #[expect(clippy::complexity)]
373    pub fn spawn_new(
374        provider: P,
375        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
376        payload_validator: V,
377        persistence: PersistenceHandle<N>,
378        payload_builder: PayloadBuilderHandle<T>,
379        canonical_in_memory_state: CanonicalInMemoryState<N>,
380        config: TreeConfig,
381        kind: EngineApiKind,
382        evm_config: C,
383    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
384    {
385        let best_block_number = provider.best_block_number().unwrap_or(0);
386        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
387
388        let persistence_state = PersistenceState {
389            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
390            rx: None,
391        };
392
393        let (tx, outgoing) = unbounded_channel();
394        let state = EngineApiTreeState::new(
395            config.block_buffer_limit(),
396            config.max_invalid_header_cache_length(),
397            header.num_hash(),
398            kind,
399        );
400
401        let task = Self::new(
402            provider,
403            consensus,
404            payload_validator,
405            tx,
406            state,
407            canonical_in_memory_state,
408            persistence,
409            persistence_state,
410            payload_builder,
411            config,
412            kind,
413            evm_config,
414        );
415        let incoming = task.incoming_tx.clone();
416        std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
417        (incoming, outgoing)
418    }
419
420    /// Returns a new [`Sender`] to send messages to this type.
421    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
422        self.incoming_tx.clone()
423    }
424
425    /// Run the engine API handler.
426    ///
427    /// This will block the current thread and process incoming messages.
428    pub fn run(mut self) {
429        loop {
430            match self.try_recv_engine_message() {
431                Ok(Some(msg)) => {
432                    debug!(target: "engine::tree", %msg, "received new engine message");
433                    if let Err(fatal) = self.on_engine_message(msg) {
434                        error!(target: "engine::tree", %fatal, "insert block fatal error");
435                        return
436                    }
437                }
438                Ok(None) => {
439                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440                }
441                Err(_err) => {
442                    error!(target: "engine::tree", "Engine channel disconnected");
443                    return
444                }
445            }
446
447            if let Err(err) = self.advance_persistence() {
448                error!(target: "engine::tree", %err, "Advancing persistence failed");
449                return
450            }
451        }
452    }
453
454    /// Invoked when previously requested blocks were downloaded.
455    ///
456    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
457    /// will execute the first batch and send the remaining blocks back through the channel so that
458    /// block request processing isn't blocked for a long time.
459    fn on_downloaded(
460        &mut self,
461        mut blocks: Vec<RecoveredBlock<N::Block>>,
462    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463        if blocks.is_empty() {
464            // nothing to execute
465            return Ok(None)
466        }
467
468        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470        for block in blocks.drain(..batch) {
471            if let Some(event) = self.on_downloaded_block(block)? {
472                let needs_backfill = event.is_backfill_action();
473                self.on_tree_event(event)?;
474                if needs_backfill {
475                    // can exit early if backfill is needed
476                    return Ok(None)
477                }
478            }
479        }
480
481        // if we still have blocks to execute, send them as a followup request
482        if !blocks.is_empty() {
483            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484        }
485
486        Ok(None)
487    }
488
489    /// When the Consensus layer receives a new block via the consensus gossip protocol,
490    /// the transactions in the block are sent to the execution layer in the form of a
491    /// [`PayloadTypes::ExecutionData`], for example
492    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
493    /// Execution layer executes the transactions and validates the state in the block header,
494    /// then passes validation data back to Consensus layer, that adds the block to the head of
495    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
496    /// network in the form of a "Beacon block".
497    ///
498    /// These responses should adhere to the [Engine API Spec for
499    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
500    ///
501    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
502    /// returns an error if an internal error occurred.
503    #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
504    fn on_new_payload(
505        &mut self,
506        payload: T::ExecutionData,
507    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508        trace!(target: "engine::tree", "invoked new payload");
509        self.metrics.engine.new_payload_messages.increment(1);
510
511        let validation_start = Instant::now();
512
513        // Ensures that the given payload does not violate any consensus rules that concern the
514        // block's layout, like:
515        //    - missing or invalid base fee
516        //    - invalid extra data
517        //    - invalid transactions
518        //    - incorrect hash
519        //    - the versioned hashes passed with the payload do not exactly match transaction
520        //      versioned hashes
521        //    - the block does not contain blob transactions if it is pre-cancun
522        //
523        // This validates the following engine API rule:
524        //
525        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
526        //    validation by taking the following steps:
527        //
528        //   1. Obtain the actual array by concatenating blob versioned hashes lists
529        //      (`tx.blob_versioned_hashes`) of each [blob
530        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
531        //      in the payload, respecting the order of inclusion. If the payload has no blob
532        //      transactions the expected array **MUST** be `[]`.
533        //
534        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
535        //      null}` if the expected and the actual arrays don't match.
536        //
537        // This validation **MUST** be instantly run in all cases even during active sync process.
538        let parent_hash = payload.parent_hash();
539
540        self.metrics
541            .block_validation
542            .record_payload_validation(validation_start.elapsed().as_secs_f64());
543
544        let num_hash = payload.num_hash();
545        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
546        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
547
548        let block_hash = num_hash.hash;
549        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
550        if lowest_buffered_ancestor == block_hash {
551            lowest_buffered_ancestor = parent_hash;
552        }
553
554        // now check if the block has an invalid ancestor
555        if let Some(invalid) = self.state.invalid_headers.get(&lowest_buffered_ancestor) {
556            // Here we might have 2 cases
557            // 1. the block is well formed and indeed links to an invalid header, meaning we should
558            //    remember it as invalid
559            // 2. the block is not well formed (i.e block hash is incorrect), and we should just
560            //    return an error and forget it
561            let block = match self.payload_validator.ensure_well_formed_payload(payload) {
562                Ok(block) => block,
563                Err(error) => {
564                    let status = self.on_new_payload_error(error, parent_hash)?;
565                    return Ok(TreeOutcome::new(status))
566                }
567            };
568
569            let status = self.on_invalid_new_payload(block.into_sealed_block(), invalid)?;
570            return Ok(TreeOutcome::new(status))
571        }
572
573        let status = if self.backfill_sync_state.is_idle() {
574            let mut latest_valid_hash = None;
575            match self.insert_payload(payload) {
576                Ok(status) => {
577                    let status = match status {
578                        InsertPayloadOk::Inserted(BlockStatus::Valid) => {
579                            latest_valid_hash = Some(block_hash);
580                            self.try_connect_buffered_blocks(num_hash)?;
581                            PayloadStatusEnum::Valid
582                        }
583                        InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
584                            latest_valid_hash = Some(block_hash);
585                            PayloadStatusEnum::Valid
586                        }
587                        InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
588                        InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
589                            // not known to be invalid, but we don't know anything else
590                            PayloadStatusEnum::Syncing
591                        }
592                    };
593
594                    PayloadStatus::new(status, latest_valid_hash)
595                }
596                Err(error) => match error {
597                    InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
598                    InsertPayloadError::Payload(error) => {
599                        self.on_new_payload_error(error, parent_hash)?
600                    }
601                },
602            }
603        } else {
604            match self.payload_validator.ensure_well_formed_payload(payload) {
605                // if the block is well-formed, buffer it for later
606                Ok(block) => {
607                    if let Err(error) = self.buffer_block(block) {
608                        self.on_insert_block_error(error)?
609                    } else {
610                        PayloadStatus::from_status(PayloadStatusEnum::Syncing)
611                    }
612                }
613                Err(error) => self.on_new_payload_error(error, parent_hash)?,
614            }
615        };
616
617        let mut outcome = TreeOutcome::new(status);
618        // if the block is valid and it is the current sync target head, make it canonical
619        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
620            // but only if it isn't already the canonical head
621            if self.state.tree_state.canonical_block_hash() != block_hash {
622                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
623                    sync_target_head: block_hash,
624                }));
625            }
626        }
627
628        Ok(outcome)
629    }
630
631    /// Returns the new chain for the given head.
632    ///
633    /// This also handles reorgs.
634    ///
635    /// Note: This does not update the tracked state and instead returns the new chain based on the
636    /// given head.
637    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
638        // get the executed new head block
639        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
640            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
641            self.metrics.engine.executed_new_block_cache_miss.increment(1);
642            return Ok(None)
643        };
644
645        let new_head_number = new_head_block.recovered_block().number();
646        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
647
648        let mut new_chain = vec![new_head_block.clone()];
649        let mut current_hash = new_head_block.recovered_block().parent_hash();
650        let mut current_number = new_head_number - 1;
651
652        // Walk back the new chain until we reach a block we know about
653        //
654        // This is only done for in-memory blocks, because we should not have persisted any blocks
655        // that are _above_ the current canonical head.
656        while current_number > current_canonical_number {
657            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
658            {
659                current_hash = block.recovered_block().parent_hash();
660                current_number -= 1;
661                new_chain.push(block);
662            } else {
663                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
664                // This should never happen as we're walking back a chain that should connect to
665                // the canonical chain
666                return Ok(None);
667            }
668        }
669
670        // If we have reached the current canonical head by walking back from the target, then we
671        // know this represents an extension of the canonical chain.
672        if current_hash == self.state.tree_state.current_canonical_head.hash {
673            new_chain.reverse();
674
675            // Simple extension of the current chain
676            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
677        }
678
679        // We have a reorg. Walk back both chains to find the fork point.
680        let mut old_chain = Vec::new();
681        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
682
683        // If the canonical chain is ahead of the new chain,
684        // gather all blocks until new head number.
685        while current_canonical_number > current_number {
686            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
687                old_chain.push(block.clone());
688                old_hash = block.recovered_block().parent_hash();
689                current_canonical_number -= 1;
690            } else {
691                // This shouldn't happen as we're walking back the canonical chain
692                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
693                return Ok(None);
694            }
695        }
696
697        // Both new and old chain pointers are now at the same height.
698        debug_assert_eq!(current_number, current_canonical_number);
699
700        // Walk both chains from specified hashes at same height until
701        // a common ancestor (fork block) is reached.
702        while old_hash != current_hash {
703            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
704                old_hash = block.recovered_block().parent_hash();
705                old_chain.push(block);
706            } else {
707                // This shouldn't happen as we're walking back the canonical chain
708                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
709                return Ok(None);
710            }
711
712            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
713            {
714                current_hash = block.recovered_block().parent_hash();
715                new_chain.push(block);
716            } else {
717                // This shouldn't happen as we've already walked this path
718                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
719                return Ok(None);
720            }
721        }
722        new_chain.reverse();
723        old_chain.reverse();
724
725        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
726    }
727
728    /// Updates the latest block state to the specified canonical ancestor.
729    ///
730    /// This method ensures that the latest block tracks the given canonical header by resetting
731    ///
732    /// # Arguments
733    /// * `canonical_header` - The canonical header to set as the new head
734    ///
735    /// # Returns
736    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
737    ///
738    /// Caution: This unwinds the canonical chain
739    fn update_latest_block_to_canonical_ancestor(
740        &mut self,
741        canonical_header: &SealedHeader<N::BlockHeader>,
742    ) -> ProviderResult<()> {
743        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
744        let current_head_number = self.state.tree_state.canonical_block_number();
745        let new_head_number = canonical_header.number();
746        let new_head_hash = canonical_header.hash();
747
748        // Update tree state with the new canonical head
749        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
750
751        // Handle the state update based on whether this is an unwind scenario
752        if new_head_number < current_head_number {
753            debug!(
754                target: "engine::tree",
755                current_head = current_head_number,
756                new_head = new_head_number,
757                new_head_hash = ?new_head_hash,
758                "FCU unwind detected: reverting to canonical ancestor"
759            );
760
761            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
762        } else {
763            debug!(
764                target: "engine::tree",
765                previous_head = current_head_number,
766                new_head = new_head_number,
767                new_head_hash = ?new_head_hash,
768                "Advancing latest block to canonical ancestor"
769            );
770            self.handle_chain_advance_or_same_height(canonical_header)
771        }
772    }
773
774    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
775    /// to the canonical header
776    fn handle_canonical_chain_unwind(
777        &self,
778        current_head_number: u64,
779        canonical_header: &SealedHeader<N::BlockHeader>,
780    ) -> ProviderResult<()> {
781        let new_head_number = canonical_header.number();
782        debug!(
783            target: "engine::tree",
784            from = current_head_number,
785            to = new_head_number,
786            "Handling unwind: collecting blocks to remove from in-memory state"
787        );
788
789        // Collect blocks that need to be removed from memory
790        let old_blocks =
791            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
792
793        // Load and apply the canonical ancestor block
794        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
795    }
796
797    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
798    fn collect_blocks_for_canonical_unwind(
799        &self,
800        new_head_number: u64,
801        current_head_number: u64,
802    ) -> Vec<ExecutedBlock<N>> {
803        let mut old_blocks = Vec::new();
804
805        for block_num in (new_head_number + 1)..=current_head_number {
806            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
807                let executed_block = block_state.block_ref().block.clone();
808                old_blocks.push(executed_block);
809                debug!(
810                    target: "engine::tree",
811                    block_number = block_num,
812                    "Collected block for removal from in-memory state"
813                );
814            }
815        }
816
817        if old_blocks.is_empty() {
818            debug!(
819                target: "engine::tree",
820                "No blocks found in memory to remove, will clear and reset state"
821            );
822        }
823
824        old_blocks
825    }
826
827    /// Applies the canonical ancestor block via a reorg operation.
828    fn apply_canonical_ancestor_via_reorg(
829        &self,
830        canonical_header: &SealedHeader<N::BlockHeader>,
831        old_blocks: Vec<ExecutedBlock<N>>,
832    ) -> ProviderResult<()> {
833        let new_head_hash = canonical_header.hash();
834        let new_head_number = canonical_header.number();
835
836        // Try to load the canonical ancestor's block
837        match self.canonical_block_by_hash(new_head_hash)? {
838            Some(executed_block) => {
839                let block_with_trie = ExecutedBlockWithTrieUpdates {
840                    block: executed_block,
841                    trie: ExecutedTrieUpdates::Missing,
842                };
843
844                // Perform the reorg to properly handle the unwind
845                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
846                    new: vec![block_with_trie],
847                    old: old_blocks,
848                });
849
850                // CRITICAL: Update the canonical head after the reorg
851                // This ensures get_canonical_head() returns the correct block
852                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
853
854                debug!(
855                    target: "engine::tree",
856                    block_number = new_head_number,
857                    block_hash = ?new_head_hash,
858                    "Successfully loaded canonical ancestor into memory via reorg"
859                );
860            }
861            None => {
862                // Fallback: update header only if block cannot be found
863                warn!(
864                    target: "engine::tree",
865                    block_hash = ?new_head_hash,
866                    "Could not find canonical ancestor block, updating header only"
867                );
868                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
869            }
870        }
871
872        Ok(())
873    }
874
875    /// Handles chain advance or same height scenarios.
876    fn handle_chain_advance_or_same_height(
877        &self,
878        canonical_header: &SealedHeader<N::BlockHeader>,
879    ) -> ProviderResult<()> {
880        let new_head_number = canonical_header.number();
881        let new_head_hash = canonical_header.hash();
882
883        // Update the canonical head header
884        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
885
886        // Load the block into memory if it's not already present
887        self.ensure_block_in_memory(new_head_number, new_head_hash)
888    }
889
890    /// Ensures a block is loaded into memory if not already present.
891    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
892        // Check if block is already in memory
893        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
894            return Ok(());
895        }
896
897        // Try to load the block from storage
898        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
899            let block_with_trie = ExecutedBlockWithTrieUpdates {
900                block: executed_block,
901                trie: ExecutedTrieUpdates::Missing,
902            };
903
904            self.canonical_in_memory_state
905                .update_chain(NewCanonicalChain::Commit { new: vec![block_with_trie] });
906
907            debug!(
908                target: "engine::tree",
909                block_number,
910                block_hash = ?block_hash,
911                "Added canonical block to in-memory state"
912            );
913        }
914
915        Ok(())
916    }
917
918    /// Determines if the given block is part of a fork by checking that these
919    /// conditions are true:
920    /// * walking back from the target hash to verify that the target hash is not part of an
921    ///   extension of the canonical chain.
922    /// * walking back from the current head to verify that the target hash is not already part of
923    ///   the canonical chain.
924    ///
925    /// The header is required as an arg, because we might be checking that the header is a fork
926    /// block before it's in the tree state and before it's in the database.
927    fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
928        let target_hash = target.block.hash;
929        // verify that the given hash is not part of an extension of the canon chain.
930        let canonical_head = self.state.tree_state.canonical_head();
931        let mut current_hash;
932        let mut current_block = target;
933        loop {
934            if current_block.block.hash == canonical_head.hash {
935                return Ok(false)
936            }
937            // We already passed the canonical head
938            if current_block.block.number <= canonical_head.number {
939                break
940            }
941            current_hash = current_block.parent;
942
943            let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
944            current_block = next_block.block_with_parent();
945        }
946
947        // verify that the given hash is not already part of canonical chain stored in memory
948        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
949            return Ok(false)
950        }
951
952        // verify that the given hash is not already part of persisted canonical chain
953        if self.provider.block_number(target_hash)?.is_some() {
954            return Ok(false)
955        }
956
957        Ok(true)
958    }
959
960    /// Returns the persisting kind for the input block.
961    fn persisting_kind_for(&self, block: BlockWithParent) -> PersistingKind {
962        // Check that we're currently persisting.
963        let Some(action) = self.persistence_state.current_action() else {
964            return PersistingKind::NotPersisting
965        };
966        // Check that the persistince action is saving blocks, not removing them.
967        let CurrentPersistenceAction::SavingBlocks { highest } = action else {
968            return PersistingKind::PersistingNotDescendant
969        };
970
971        // The block being validated can only be a descendant if its number is higher than
972        // the highest block persisting. Otherwise, it's likely a fork of a lower block.
973        if block.block.number > highest.number &&
974            self.state.tree_state.is_descendant(*highest, block)
975        {
976            return PersistingKind::PersistingDescendant
977        }
978
979        // In all other cases, the block is not a descendant.
980        PersistingKind::PersistingNotDescendant
981    }
982
983    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
984    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
985    /// chain.
986    ///
987    /// These responses should adhere to the [Engine API Spec for
988    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
989    ///
990    /// Returns an error if an internal error occurred like a database error.
991    #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
992    fn on_forkchoice_updated(
993        &mut self,
994        state: ForkchoiceState,
995        attrs: Option<T::PayloadAttributes>,
996        version: EngineApiMessageVersion,
997    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
998        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
999        self.metrics.engine.forkchoice_updated_messages.increment(1);
1000        if attrs.is_some() {
1001            self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1002        }
1003        self.canonical_in_memory_state.on_forkchoice_update_received();
1004
1005        if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1006            return Ok(TreeOutcome::new(on_updated))
1007        }
1008
1009        let valid_outcome = |head| {
1010            TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1011                PayloadStatusEnum::Valid,
1012                Some(head),
1013            )))
1014        };
1015
1016        // Process the forkchoice update by trying to make the head block canonical
1017        //
1018        // We can only process this forkchoice update if:
1019        // - we have the `head` block
1020        // - the head block is part of a chain that is connected to the canonical chain. This
1021        //   includes reorgs.
1022        //
1023        // Performing a FCU involves:
1024        // - marking the FCU's head block as canonical
1025        // - updating in memory state to reflect the new canonical chain
1026        // - updating canonical state trackers
1027        // - emitting a canonicalization event for the new chain (including reorg)
1028        // - if we have payload attributes, delegate them to the payload service
1029
1030        // 1. ensure we have a new head block
1031        if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1032            trace!(target: "engine::tree", "fcu head hash is already canonical");
1033
1034            // update the safe and finalized blocks and ensure their values are valid
1035            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1036                // safe or finalized hashes are invalid
1037                return Ok(TreeOutcome::new(outcome))
1038            }
1039
1040            // we still need to process payload attributes if the head is already canonical
1041            if let Some(attr) = attrs {
1042                let tip = self
1043                    .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1044                    .ok_or_else(|| {
1045                        // If we can't find the canonical block, then something is wrong and we need
1046                        // to return an error
1047                        ProviderError::HeaderNotFound(state.head_block_hash.into())
1048                    })?;
1049                let updated = self.process_payload_attributes(attr, &tip, state, version);
1050                return Ok(TreeOutcome::new(updated))
1051            }
1052
1053            // the head block is already canonical
1054            return Ok(valid_outcome(state.head_block_hash))
1055        }
1056
1057        // 2. check if the head is already part of the canonical chain
1058        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1059            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1060
1061            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1062            // own chain at will, so we need to always trigger a new payload job if requested.
1063            if self.engine_kind.is_opstack() ||
1064                self.config.always_process_payload_attributes_on_canonical_head()
1065            {
1066                if let Some(attr) = attrs {
1067                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1068                    let updated =
1069                        self.process_payload_attributes(attr, &canonical_header, state, version);
1070                    return Ok(TreeOutcome::new(updated))
1071                }
1072
1073                // At this point, no alternative block has been triggered, so we need effectively
1074                // unwind the _canonical_ chain to the FCU's head, which is part of the canonical
1075                // chain. We need to update the latest block state to reflect the
1076                // canonical ancestor. This ensures that state providers and the
1077                // transaction pool operate with the correct chain state after
1078                // forkchoice update processing.
1079                self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1080            }
1081
1082            // 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
1083            //    payload build process if `forkchoiceState.headBlockHash` references a `VALID`
1084            //    ancestor of the head of canonical chain, i.e. the ancestor passed payload
1085            //    validation process and deemed `VALID`. In the case of such an event, client
1086            //    software MUST return `{payloadStatus: {status: VALID, latestValidHash:
1087            //    forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1088
1089            // the head block is already canonical, so we're not triggering a payload job and can
1090            // return right away
1091            return Ok(valid_outcome(state.head_block_hash))
1092        }
1093
1094        // 3. ensure we can apply a new chain update for the head block
1095        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1096            let tip = chain_update.tip().clone_sealed_header();
1097            self.on_canonical_chain_update(chain_update);
1098
1099            // update the safe and finalized blocks and ensure their values are valid
1100            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1101                // safe or finalized hashes are invalid
1102                return Ok(TreeOutcome::new(outcome))
1103            }
1104
1105            if let Some(attr) = attrs {
1106                let updated = self.process_payload_attributes(attr, &tip, state, version);
1107                return Ok(TreeOutcome::new(updated))
1108            }
1109
1110            return Ok(valid_outcome(state.head_block_hash))
1111        }
1112
1113        // 4. we don't have the block to perform the update
1114        // we assume the FCU is valid and at least the head is missing,
1115        // so we need to start syncing to it
1116        //
1117        // find the appropriate target to sync to, if we don't have the safe block hash then we
1118        // start syncing to the safe block via backfill first
1119        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1120            // check that safe block is valid and missing
1121            !state.safe_block_hash.is_zero() &&
1122            self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1123        {
1124            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1125            state.safe_block_hash
1126        } else {
1127            state.head_block_hash
1128        };
1129
1130        let target = self.lowest_buffered_ancestor_or(target);
1131        trace!(target: "engine::tree", %target, "downloading missing block");
1132
1133        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1134            PayloadStatusEnum::Syncing,
1135        )))
1136        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1137    }
1138
1139    /// Attempts to receive the next engine request.
1140    ///
1141    /// If there's currently no persistence action in progress, this will block until a new request
1142    /// is received. If there's a persistence action in progress, this will try to receive the
1143    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1144    /// received in time.
1145    ///
1146    /// Returns an error if the engine channel is disconnected.
1147    #[expect(clippy::type_complexity)]
1148    fn try_recv_engine_message(
1149        &self,
1150    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1151        if self.persistence_state.in_progress() {
1152            // try to receive the next request with a timeout to not block indefinitely
1153            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1154                Ok(msg) => Ok(Some(msg)),
1155                Err(err) => match err {
1156                    RecvTimeoutError::Timeout => Ok(None),
1157                    RecvTimeoutError::Disconnected => Err(RecvError),
1158                },
1159            }
1160        } else {
1161            self.incoming.recv().map(Some)
1162        }
1163    }
1164
1165    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1166    /// the current persistence action while we're removing blocks.
1167    fn remove_blocks(&mut self, new_tip_num: u64) {
1168        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1169        if new_tip_num < self.persistence_state.last_persisted_block.number {
1170            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1171            let (tx, rx) = oneshot::channel();
1172            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1173            self.persistence_state.start_remove(new_tip_num, rx);
1174        }
1175    }
1176
1177    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1178    /// the current persistence action while we're saving blocks.
1179    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1180        if blocks_to_persist.is_empty() {
1181            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1182            return
1183        }
1184
1185        // NOTE: checked non-empty above
1186        let highest_num_hash = blocks_to_persist
1187            .iter()
1188            .max_by_key(|block| block.recovered_block().number())
1189            .map(|b| b.recovered_block().num_hash())
1190            .expect("Checked non-empty persisting blocks");
1191
1192        debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1193        let (tx, rx) = oneshot::channel();
1194        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1195
1196        self.persistence_state.start_save(highest_num_hash, rx);
1197    }
1198
1199    /// Attempts to advance the persistence state.
1200    ///
1201    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1202    /// or send a new persistence action if necessary.
1203    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1204        if self.persistence_state.in_progress() {
1205            let (mut rx, start_time, current_action) = self
1206                .persistence_state
1207                .rx
1208                .take()
1209                .expect("if a persistence task is in progress Receiver must be Some");
1210            // Check if persistence has complete
1211            match rx.try_recv() {
1212                Ok(last_persisted_hash_num) => {
1213                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1214                    let Some(BlockNumHash {
1215                        hash: last_persisted_block_hash,
1216                        number: last_persisted_block_number,
1217                    }) = last_persisted_hash_num
1218                    else {
1219                        // if this happened, then we persisted no blocks because we sent an
1220                        // empty vec of blocks
1221                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1222                        return Ok(())
1223                    };
1224
1225                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1226                    self.persistence_state
1227                        .finish(last_persisted_block_hash, last_persisted_block_number);
1228                    self.on_new_persisted_block()?;
1229                }
1230                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1231                Err(TryRecvError::Empty) => {
1232                    self.persistence_state.rx = Some((rx, start_time, current_action))
1233                }
1234            }
1235        }
1236
1237        if !self.persistence_state.in_progress() {
1238            if let Some(new_tip_num) = self.find_disk_reorg()? {
1239                self.remove_blocks(new_tip_num)
1240            } else if self.should_persist() {
1241                let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1242                self.persist_blocks(blocks_to_persist);
1243            }
1244        }
1245
1246        Ok(())
1247    }
1248
1249    /// Handles a message from the engine.
1250    fn on_engine_message(
1251        &mut self,
1252        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1253    ) -> Result<(), InsertBlockFatalError> {
1254        match msg {
1255            FromEngine::Event(event) => match event {
1256                FromOrchestrator::BackfillSyncStarted => {
1257                    debug!(target: "engine::tree", "received backfill sync started event");
1258                    self.backfill_sync_state = BackfillSyncState::Active;
1259                }
1260                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1261                    self.on_backfill_sync_finished(ctrl)?;
1262                }
1263            },
1264            FromEngine::Request(request) => {
1265                match request {
1266                    EngineApiRequest::InsertExecutedBlock(block) => {
1267                        let block_num_hash = block.recovered_block().num_hash();
1268                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1269                            // outdated block that can be skipped
1270                            return Ok(())
1271                        }
1272
1273                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1274                        let now = Instant::now();
1275
1276                        // if the parent is the canonical head, we can insert the block as the
1277                        // pending block
1278                        if self.state.tree_state.canonical_block_hash() ==
1279                            block.recovered_block().parent_hash()
1280                        {
1281                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1282                            self.canonical_in_memory_state.set_pending_block(block.clone());
1283                        }
1284
1285                        self.state.tree_state.insert_executed(block.clone());
1286                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1287                        self.emit_event(EngineApiEvent::BeaconConsensus(
1288                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1289                        ));
1290                    }
1291                    EngineApiRequest::Beacon(request) => {
1292                        match request {
1293                            BeaconEngineMessage::ForkchoiceUpdated {
1294                                state,
1295                                payload_attrs,
1296                                tx,
1297                                version,
1298                            } => {
1299                                let mut output =
1300                                    self.on_forkchoice_updated(state, payload_attrs, version);
1301
1302                                if let Ok(res) = &mut output {
1303                                    // track last received forkchoice state
1304                                    self.state
1305                                        .forkchoice_state_tracker
1306                                        .set_latest(state, res.outcome.forkchoice_status());
1307
1308                                    // emit an event about the handled FCU
1309                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1310                                        state,
1311                                        res.outcome.forkchoice_status(),
1312                                    ));
1313
1314                                    // handle the event if any
1315                                    self.on_maybe_tree_event(res.event.take())?;
1316                                }
1317
1318                                if let Err(err) =
1319                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1320                                {
1321                                    self.metrics
1322                                        .engine
1323                                        .failed_forkchoice_updated_response_deliveries
1324                                        .increment(1);
1325                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1326                                }
1327                            }
1328                            BeaconEngineMessage::NewPayload { payload, tx } => {
1329                                let mut output = self.on_new_payload(payload);
1330
1331                                let maybe_event =
1332                                    output.as_mut().ok().and_then(|out| out.event.take());
1333
1334                                // emit response
1335                                if let Err(err) =
1336                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1337                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1338                                    }))
1339                                {
1340                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1341                                    self.metrics
1342                                        .engine
1343                                        .failed_new_payload_response_deliveries
1344                                        .increment(1);
1345                                }
1346
1347                                // handle the event if any
1348                                self.on_maybe_tree_event(maybe_event)?;
1349                            }
1350                        }
1351                    }
1352                }
1353            }
1354            FromEngine::DownloadedBlocks(blocks) => {
1355                if let Some(event) = self.on_downloaded(blocks)? {
1356                    self.on_tree_event(event)?;
1357                }
1358            }
1359        }
1360        Ok(())
1361    }
1362
1363    /// Invoked if the backfill sync has finished to target.
1364    ///
1365    /// At this point we consider the block synced to the backfill target.
1366    ///
1367    /// Checks the tracked finalized block against the block on disk and requests another backfill
1368    /// run if the distance to the tip exceeds the threshold for another backfill run.
1369    ///
1370    /// This will also do the necessary housekeeping of the tree state, this includes:
1371    ///  - removing all blocks below the backfill height
1372    ///  - resetting the canonical in-memory state
1373    ///
1374    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1375    /// target block.
1376    fn on_backfill_sync_finished(
1377        &mut self,
1378        ctrl: ControlFlow,
1379    ) -> Result<(), InsertBlockFatalError> {
1380        debug!(target: "engine::tree", "received backfill sync finished event");
1381        self.backfill_sync_state = BackfillSyncState::Idle;
1382
1383        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1384        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1385            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1386            // update the `invalid_headers` cache with the new invalid header
1387            self.state.invalid_headers.insert(**bad_block);
1388
1389            // if this was an unwind then the target is the new height
1390            Some(*target)
1391        } else {
1392            // backfill height is the block number that the backfill finished at
1393            ctrl.block_number()
1394        };
1395
1396        // backfill height is the block number that the backfill finished at
1397        let Some(backfill_height) = backfill_height else { return Ok(()) };
1398
1399        // state house keeping after backfill sync
1400        // remove all executed blocks below the backfill height
1401        //
1402        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1403        // before that
1404        let Some(backfill_num_hash) = self
1405            .provider
1406            .block_hash(backfill_height)?
1407            .map(|hash| BlockNumHash { hash, number: backfill_height })
1408        else {
1409            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1410            return Ok(())
1411        };
1412
1413        if ctrl.is_unwind() {
1414            // the node reset so we need to clear everything above that height so that backfill
1415            // height is the new canonical block.
1416            self.state.tree_state.reset(backfill_num_hash)
1417        } else {
1418            self.state.tree_state.remove_until(
1419                backfill_num_hash,
1420                self.persistence_state.last_persisted_block.hash,
1421                Some(backfill_num_hash),
1422            );
1423        }
1424
1425        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1426        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1427
1428        // remove all buffered blocks below the backfill height
1429        self.state.buffer.remove_old_blocks(backfill_height);
1430        // we remove all entries because now we're synced to the backfill target and consider this
1431        // the canonical chain
1432        self.canonical_in_memory_state.clear_state();
1433
1434        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1435            // update the tracked chain height, after backfill sync both the canonical height and
1436            // persisted height are the same
1437            self.state.tree_state.set_canonical_head(new_head.num_hash());
1438            self.persistence_state.finish(new_head.hash(), new_head.number());
1439
1440            // update the tracked canonical head
1441            self.canonical_in_memory_state.set_canonical_head(new_head);
1442        }
1443
1444        // check if we need to run backfill again by comparing the most recent finalized height to
1445        // the backfill height
1446        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1447        else {
1448            return Ok(())
1449        };
1450        if sync_target_state.finalized_block_hash.is_zero() {
1451            // no finalized block, can't check distance
1452            return Ok(())
1453        }
1454        // get the block number of the finalized block, if we have it
1455        let newest_finalized = self
1456            .state
1457            .buffer
1458            .block(&sync_target_state.finalized_block_hash)
1459            .map(|block| block.number());
1460
1461        // The block number that the backfill finished at - if the progress or newest
1462        // finalized is None then we can't check the distance anyways.
1463        //
1464        // If both are Some, we perform another distance check and return the desired
1465        // backfill target
1466        if let Some(backfill_target) =
1467            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1468                // Determines whether or not we should run backfill again, in case
1469                // the new gap is still large enough and requires running backfill again
1470                self.backfill_sync_target(progress, finalized_number, None)
1471            })
1472        {
1473            // request another backfill run
1474            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1475                backfill_target.into(),
1476            )));
1477            return Ok(())
1478        };
1479
1480        // try to close the gap by executing buffered blocks that are child blocks of the new head
1481        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1482    }
1483
1484    /// Attempts to make the given target canonical.
1485    ///
1486    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1487    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1488        if let Some(chain_update) = self.on_new_head(target)? {
1489            self.on_canonical_chain_update(chain_update);
1490        }
1491
1492        Ok(())
1493    }
1494
1495    /// Convenience function to handle an optional tree event.
1496    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1497        if let Some(event) = event {
1498            self.on_tree_event(event)?;
1499        }
1500
1501        Ok(())
1502    }
1503
1504    /// Handles a tree event.
1505    ///
1506    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1507    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1508        match event {
1509            TreeEvent::TreeAction(action) => match action {
1510                TreeAction::MakeCanonical { sync_target_head } => {
1511                    self.make_canonical(sync_target_head)?;
1512                }
1513            },
1514            TreeEvent::BackfillAction(action) => {
1515                self.emit_event(EngineApiEvent::BackfillAction(action));
1516            }
1517            TreeEvent::Download(action) => {
1518                self.emit_event(EngineApiEvent::Download(action));
1519            }
1520        }
1521
1522        Ok(())
1523    }
1524
1525    /// Emits an outgoing event to the engine.
1526    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1527        let event = event.into();
1528
1529        if event.is_backfill_action() {
1530            debug_assert_eq!(
1531                self.backfill_sync_state,
1532                BackfillSyncState::Idle,
1533                "backfill action should only be emitted when backfill is idle"
1534            );
1535
1536            if self.persistence_state.in_progress() {
1537                // backfill sync and persisting data are mutually exclusive, so we can't start
1538                // backfill while we're still persisting
1539                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1540                return
1541            }
1542
1543            self.backfill_sync_state = BackfillSyncState::Pending;
1544            self.metrics.engine.pipeline_runs.increment(1);
1545            debug!(target: "engine::tree", "emitting backfill action event");
1546        }
1547
1548        let _ = self.outgoing.send(event).inspect_err(
1549            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1550        );
1551    }
1552
1553    /// Returns true if the canonical chain length minus the last persisted
1554    /// block is greater than or equal to the persistence threshold and
1555    /// backfill is not running.
1556    pub const fn should_persist(&self) -> bool {
1557        if !self.backfill_sync_state.is_idle() {
1558            // can't persist if backfill is running
1559            return false
1560        }
1561
1562        let min_block = self.persistence_state.last_persisted_block.number;
1563        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1564            self.config.persistence_threshold()
1565    }
1566
1567    /// Returns a batch of consecutive canonical blocks to persist in the range
1568    /// `(last_persisted_number .. canonical_head - threshold]`. The expected
1569    /// order is oldest -> newest.
1570    ///
1571    /// For those blocks that didn't have the trie updates calculated, runs the state root
1572    /// calculation, and saves the trie updates.
1573    ///
1574    /// Returns an error if the state root calculation fails.
1575    fn get_canonical_blocks_to_persist(
1576        &mut self,
1577    ) -> Result<Vec<ExecutedBlockWithTrieUpdates<N>>, AdvancePersistenceError> {
1578        // We will calculate the state root using the database, so we need to be sure there are no
1579        // changes
1580        debug_assert!(!self.persistence_state.in_progress());
1581
1582        let mut blocks_to_persist = Vec::new();
1583        let mut current_hash = self.state.tree_state.canonical_block_hash();
1584        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1585
1586        let canonical_head_number = self.state.tree_state.canonical_block_number();
1587
1588        let target_number =
1589            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1590
1591        debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1592        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1593            if block.recovered_block().number() <= last_persisted_number {
1594                break;
1595            }
1596
1597            if block.recovered_block().number() <= target_number {
1598                blocks_to_persist.push(block.clone());
1599            }
1600
1601            current_hash = block.recovered_block().parent_hash();
1602        }
1603
1604        // Reverse the order so that the oldest block comes first
1605        blocks_to_persist.reverse();
1606
1607        // Calculate missing trie updates
1608        for block in &mut blocks_to_persist {
1609            if block.trie.is_present() {
1610                continue
1611            }
1612
1613            debug!(
1614                target: "engine::tree",
1615                block = ?block.recovered_block().num_hash(),
1616                "Calculating trie updates before persisting"
1617            );
1618
1619            let provider = self
1620                .state_provider_builder(block.recovered_block().parent_hash())?
1621                .ok_or(AdvancePersistenceError::MissingAncestor(
1622                    block.recovered_block().parent_hash(),
1623                ))?
1624                .build()?;
1625
1626            let mut trie_input = self.compute_trie_input(
1627                self.persisting_kind_for(block.recovered_block.block_with_parent()),
1628                self.provider.database_provider_ro()?,
1629                block.recovered_block().parent_hash(),
1630                None,
1631            )?;
1632            // Extend with block we are generating trie updates for.
1633            trie_input.append_ref(block.hashed_state());
1634            let (_root, updates) = provider.state_root_from_nodes_with_updates(trie_input)?;
1635            debug_assert_eq!(_root, block.recovered_block().state_root());
1636
1637            // Update trie updates in both tree state and blocks to persist that we return
1638            let trie_updates = Arc::new(updates);
1639            let tree_state_block = self
1640                .state
1641                .tree_state
1642                .blocks_by_hash
1643                .get_mut(&block.recovered_block().hash())
1644                .expect("blocks to persist are constructed from tree state blocks");
1645            tree_state_block.trie.set_present(trie_updates.clone());
1646            block.trie.set_present(trie_updates);
1647        }
1648
1649        Ok(blocks_to_persist)
1650    }
1651
1652    /// This clears the blocks from the in-memory tree state that have been persisted to the
1653    /// database.
1654    ///
1655    /// This also updates the canonical in-memory state to reflect the newest persisted block
1656    /// height.
1657    ///
1658    /// Assumes that `finish` has been called on the `persistence_state` at least once
1659    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1660        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1661        // state.
1662        if let Some(remove_above) = self.find_disk_reorg()? {
1663            self.remove_blocks(remove_above);
1664            return Ok(())
1665        }
1666
1667        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1668        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1669        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1670            number: self.persistence_state.last_persisted_block.number,
1671            hash: self.persistence_state.last_persisted_block.hash,
1672        });
1673        Ok(())
1674    }
1675
1676    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1677    ///
1678    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1679    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1680    /// has in memory.
1681    ///
1682    /// For finalized blocks, this will return `None`.
1683    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1684        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1685        // check memory first
1686        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1687            return Ok(Some(block.block.clone()))
1688        }
1689
1690        let (block, senders) = self
1691            .provider
1692            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1693            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1694            .split_sealed();
1695        let execution_output = self
1696            .provider
1697            .get_state(block.header().number())?
1698            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1699        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1700
1701        Ok(Some(ExecutedBlock {
1702            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1703            execution_output: Arc::new(execution_output),
1704            hashed_state: Arc::new(hashed_state),
1705        }))
1706    }
1707
1708    /// Return sealed block header from in-memory state or database by hash.
1709    fn sealed_header_by_hash(
1710        &self,
1711        hash: B256,
1712    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1713        // check memory first
1714        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1715
1716        if header.is_some() {
1717            Ok(header)
1718        } else {
1719            self.provider.sealed_header_by_hash(hash)
1720        }
1721    }
1722
1723    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1724    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1725    /// not exist in the buffer, this returns the hash that is passed in.
1726    ///
1727    /// Returns the parent hash of the block itself if the block is buffered and has no other
1728    /// buffered ancestors.
1729    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1730        self.state
1731            .buffer
1732            .lowest_ancestor(&hash)
1733            .map(|block| block.parent_hash())
1734            .unwrap_or_else(|| hash)
1735    }
1736
1737    /// If validation fails, the response MUST contain the latest valid hash:
1738    ///
1739    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1740    ///     conditions:
1741    ///     - It is fully validated and deemed VALID
1742    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1743    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1744    ///     conditions are satisfied by a `PoW` block.
1745    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1746    ///     the above conditions.
1747    fn latest_valid_hash_for_invalid_payload(
1748        &mut self,
1749        parent_hash: B256,
1750    ) -> ProviderResult<Option<B256>> {
1751        // Check if parent exists in side chain or in canonical chain.
1752        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1753            return Ok(Some(parent_hash))
1754        }
1755
1756        // iterate over ancestors in the invalid cache
1757        // until we encounter the first valid ancestor
1758        let mut current_hash = parent_hash;
1759        let mut current_block = self.state.invalid_headers.get(&current_hash);
1760        while let Some(block_with_parent) = current_block {
1761            current_hash = block_with_parent.parent;
1762            current_block = self.state.invalid_headers.get(&current_hash);
1763
1764            // If current_header is None, then the current_hash does not have an invalid
1765            // ancestor in the cache, check its presence in blockchain tree
1766            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1767                return Ok(Some(current_hash))
1768            }
1769        }
1770        Ok(None)
1771    }
1772
1773    /// Prepares the invalid payload response for the given hash, checking the
1774    /// database for the parent hash and populating the payload status with the latest valid hash
1775    /// according to the engine api spec.
1776    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1777        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1778        // PoW block, which we need to identify by looking at the parent's block difficulty
1779        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? {
1780            if !parent.difficulty().is_zero() {
1781                parent_hash = B256::ZERO;
1782            }
1783        }
1784
1785        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1786        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1787            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1788        })
1789        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1790    }
1791
1792    /// Returns true if the given hash is the last received sync target block.
1793    ///
1794    /// See [`ForkchoiceStateTracker::sync_target_state`]
1795    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1796        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1797            return target.head_block_hash == block_hash
1798        }
1799        false
1800    }
1801
1802    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1803    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1804    ///
1805    /// Returns a payload status response according to the engine API spec if the block is known to
1806    /// be invalid.
1807    fn check_invalid_ancestor_with_head(
1808        &mut self,
1809        check: B256,
1810        head: &SealedBlock<N::Block>,
1811    ) -> ProviderResult<Option<PayloadStatus>> {
1812        // check if the check hash was previously marked as invalid
1813        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1814
1815        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1816    }
1817
1818    /// Invoked when a new payload received is invalid.
1819    fn on_invalid_new_payload(
1820        &mut self,
1821        head: SealedBlock<N::Block>,
1822        invalid: BlockWithParent,
1823    ) -> ProviderResult<PayloadStatus> {
1824        // populate the latest valid hash field
1825        let status = self.prepare_invalid_response(invalid.parent)?;
1826
1827        // insert the head block into the invalid header cache
1828        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1829        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1830
1831        Ok(status)
1832    }
1833
1834    /// Checks if the given `head` points to an invalid header, which requires a specific response
1835    /// to a forkchoice update.
1836    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1837        // check if the head was previously marked as invalid
1838        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1839        // populate the latest valid hash field
1840        Ok(Some(self.prepare_invalid_response(header.parent)?))
1841    }
1842
1843    /// Validate if block is correct and satisfies all the consensus rules that concern the header
1844    /// and block body itself.
1845    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1846        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1847            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1848            return Err(e)
1849        }
1850
1851        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1852            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1853            return Err(e)
1854        }
1855
1856        Ok(())
1857    }
1858
1859    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
1860    #[instrument(level = "trace", skip(self), target = "engine::tree")]
1861    fn try_connect_buffered_blocks(
1862        &mut self,
1863        parent: BlockNumHash,
1864    ) -> Result<(), InsertBlockFatalError> {
1865        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1866
1867        if blocks.is_empty() {
1868            // nothing to append
1869            return Ok(())
1870        }
1871
1872        let now = Instant::now();
1873        let block_count = blocks.len();
1874        for child in blocks {
1875            let child_num_hash = child.num_hash();
1876            match self.insert_block(child) {
1877                Ok(res) => {
1878                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1879                    if self.is_sync_target_head(child_num_hash.hash) &&
1880                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1881                    {
1882                        self.make_canonical(child_num_hash.hash)?;
1883                    }
1884                }
1885                Err(err) => {
1886                    if let InsertPayloadError::Block(err) = err {
1887                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1888                        if let Err(fatal) = self.on_insert_block_error(err) {
1889                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1890                            return Err(fatal)
1891                        }
1892                    }
1893                }
1894            }
1895        }
1896
1897        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1898        Ok(())
1899    }
1900
1901    /// Pre-validates the block and inserts it into the buffer.
1902    fn buffer_block(
1903        &mut self,
1904        block: RecoveredBlock<N::Block>,
1905    ) -> Result<(), InsertBlockError<N::Block>> {
1906        if let Err(err) = self.validate_block(&block) {
1907            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1908        }
1909        self.state.buffer.insert_block(block);
1910        Ok(())
1911    }
1912
1913    /// Returns true if the distance from the local tip to the block is greater than the configured
1914    /// threshold.
1915    ///
1916    /// If the `local_tip` is greater than the `block`, then this will return false.
1917    #[inline]
1918    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1919        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1920    }
1921
1922    /// Returns how far the local tip is from the given block. If the local tip is at the same
1923    /// height or its block number is greater than the given block, this returns None.
1924    #[inline]
1925    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1926        if block > local_tip {
1927            Some(block - local_tip)
1928        } else {
1929            None
1930        }
1931    }
1932
1933    /// Returns the target hash to sync to if the distance from the local tip to the block is
1934    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
1935    /// that block already).
1936    ///
1937    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
1938    /// (missing) finalized block.
1939    fn backfill_sync_target(
1940        &self,
1941        canonical_tip_num: u64,
1942        target_block_number: u64,
1943        downloaded_block: Option<BlockNumHash>,
1944    ) -> Option<B256> {
1945        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1946
1947        // check if the downloaded block is the tracked finalized block
1948        let mut exceeds_backfill_threshold = if let Some(buffered_finalized) = sync_target_state
1949            .as_ref()
1950            .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1951        {
1952            // if we have buffered the finalized block, we should check how far
1953            // we're off
1954            self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number())
1955        } else {
1956            // check if the distance exceeds the threshold for backfill sync
1957            self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
1958        };
1959
1960        // If this is invoked after we downloaded a block we can check if this block is the
1961        // finalized block
1962        if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1963            if downloaded_block.hash == state.finalized_block_hash {
1964                // we downloaded the finalized block and can now check how far we're off
1965                exceeds_backfill_threshold =
1966                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1967            }
1968        }
1969
1970        // if the number of missing blocks is greater than the max, trigger backfill
1971        if exceeds_backfill_threshold {
1972            if let Some(state) = sync_target_state {
1973                // if we have already canonicalized the finalized block, we should skip backfill
1974                match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
1975                    Err(err) => {
1976                        warn!(target: "engine::tree", %err, "Failed to get finalized block header");
1977                    }
1978                    Ok(None) => {
1979                        // ensure the finalized block is known (not the zero hash)
1980                        if !state.finalized_block_hash.is_zero() {
1981                            // we don't have the block yet and the distance exceeds the allowed
1982                            // threshold
1983                            return Some(state.finalized_block_hash)
1984                        }
1985
1986                        // OPTIMISTIC SYNCING
1987                        //
1988                        // It can happen when the node is doing an
1989                        // optimistic sync, where the CL has no knowledge of the finalized hash,
1990                        // but is expecting the EL to sync as high
1991                        // as possible before finalizing.
1992                        //
1993                        // This usually doesn't happen on ETH mainnet since CLs use the more
1994                        // secure checkpoint syncing.
1995                        //
1996                        // However, optimism chains will do this. The risk of a reorg is however
1997                        // low.
1998                        debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
1999                        return Some(state.head_block_hash)
2000                    }
2001                    Ok(Some(_)) => {
2002                        // we're fully synced to the finalized block
2003                    }
2004                }
2005            }
2006        }
2007
2008        None
2009    }
2010
2011    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2012    /// happen if a reorg is happening while we are persisting a block.
2013    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2014        let mut canonical = self.state.tree_state.current_canonical_head;
2015        let mut persisted = self.persistence_state.last_persisted_block;
2016
2017        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2018            Ok(self
2019                .sealed_header_by_hash(num_hash.hash)?
2020                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2021                .parent_num_hash())
2022        };
2023
2024        // Happy path, canonical chain is ahead or equal to persisted chain.
2025        // Walk canonical chain back to make sure that it connects to persisted chain.
2026        while canonical.number > persisted.number {
2027            canonical = parent_num_hash(canonical)?;
2028        }
2029
2030        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2031        if canonical == persisted {
2032            return Ok(None);
2033        }
2034
2035        // At this point, we know that `persisted` block can't be reached by walking the canonical
2036        // chain back. In this case we need to truncate it to the first canonical block it connects
2037        // to.
2038
2039        // Firstly, walk back until we reach the same height as `canonical`.
2040        while persisted.number > canonical.number {
2041            persisted = parent_num_hash(persisted)?;
2042        }
2043
2044        debug_assert_eq!(persisted.number, canonical.number);
2045
2046        // Now walk both chains back until we find a common ancestor.
2047        while persisted.hash != canonical.hash {
2048            canonical = parent_num_hash(canonical)?;
2049            persisted = parent_num_hash(persisted)?;
2050        }
2051
2052        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2053
2054        Ok(Some(persisted.number))
2055    }
2056
2057    /// Invoked when we the canonical chain has been updated.
2058    ///
2059    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2060    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2061        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2062        let start = Instant::now();
2063
2064        // update the tracked canonical head
2065        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2066
2067        let tip = chain_update.tip().clone_sealed_header();
2068        let notification = chain_update.to_chain_notification();
2069
2070        // reinsert any missing reorged blocks
2071        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2072            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2073            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2074            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2075
2076            self.update_reorg_metrics(old.len());
2077            self.reinsert_reorged_blocks(new.clone());
2078            // Try reinserting the reorged canonical chain. This is only possible if we have
2079            // `persisted_trie_updates` for those blocks.
2080            let old = old
2081                .iter()
2082                .filter_map(|block| {
2083                    let trie = self
2084                        .state
2085                        .tree_state
2086                        .persisted_trie_updates
2087                        .get(&block.recovered_block.hash())?
2088                        .1
2089                        .clone();
2090                    Some(ExecutedBlockWithTrieUpdates {
2091                        block: block.clone(),
2092                        trie: ExecutedTrieUpdates::Present(trie),
2093                    })
2094                })
2095                .collect::<Vec<_>>();
2096            self.reinsert_reorged_blocks(old);
2097        }
2098
2099        // update the tracked in-memory state with the new chain
2100        self.canonical_in_memory_state.update_chain(chain_update);
2101        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2102
2103        // Update metrics based on new tip
2104        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2105
2106        // sends an event to all active listeners about the new canonical chain
2107        self.canonical_in_memory_state.notify_canon_state(notification);
2108
2109        // emit event
2110        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2111            Box::new(tip),
2112            start.elapsed(),
2113        ));
2114    }
2115
2116    /// This updates metrics based on the given reorg length.
2117    fn update_reorg_metrics(&self, old_chain_length: usize) {
2118        self.metrics.tree.reorgs.increment(1);
2119        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2120    }
2121
2122    /// This reinserts any blocks in the new chain that do not already exist in the tree
2123    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2124        for block in new_chain {
2125            if self
2126                .state
2127                .tree_state
2128                .executed_block_by_hash(block.recovered_block().hash())
2129                .is_none()
2130            {
2131                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2132                self.state.tree_state.insert_executed(block);
2133            }
2134        }
2135    }
2136
2137    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2138    ///
2139    /// This mainly compares the missing parent of the downloaded block with the current canonical
2140    /// tip, and decides whether or not backfill sync should be triggered.
2141    fn on_disconnected_downloaded_block(
2142        &self,
2143        downloaded_block: BlockNumHash,
2144        missing_parent: BlockNumHash,
2145        head: BlockNumHash,
2146    ) -> Option<TreeEvent> {
2147        // compare the missing parent with the canonical tip
2148        if let Some(target) =
2149            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2150        {
2151            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2152            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2153        }
2154
2155        // continue downloading the missing parent
2156        //
2157        // this happens if either:
2158        //  * the missing parent block num < canonical tip num
2159        //    * this case represents a missing block on a fork that is shorter than the canonical
2160        //      chain
2161        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2162        //    less than the backfill threshold
2163        //    * this case represents a potentially long range of blocks to download and execute
2164        let request = if let Some(distance) =
2165            self.distance_from_local_tip(head.number, missing_parent.number)
2166        {
2167            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2168            DownloadRequest::BlockRange(missing_parent.hash, distance)
2169        } else {
2170            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2171            // This happens when the missing parent is on an outdated
2172            // sidechain and we can only download the missing block itself
2173            DownloadRequest::single_block(missing_parent.hash)
2174        };
2175
2176        Some(TreeEvent::Download(request))
2177    }
2178
2179    /// Invoked with a block downloaded from the network
2180    ///
2181    /// Returns an event with the appropriate action to take, such as:
2182    ///  - download more missing blocks
2183    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2184    #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2185    fn on_downloaded_block(
2186        &mut self,
2187        block: RecoveredBlock<N::Block>,
2188    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2189        let block_num_hash = block.num_hash();
2190        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2191        if self
2192            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2193            .is_some()
2194        {
2195            return Ok(None)
2196        }
2197
2198        if !self.backfill_sync_state.is_idle() {
2199            return Ok(None)
2200        }
2201
2202        // try to append the block
2203        match self.insert_block(block) {
2204            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2205                if self.is_sync_target_head(block_num_hash.hash) {
2206                    trace!(target: "engine::tree", "appended downloaded sync target block");
2207
2208                    // we just inserted the current sync target block, we can try to make it
2209                    // canonical
2210                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2211                        sync_target_head: block_num_hash.hash,
2212                    })))
2213                }
2214                trace!(target: "engine::tree", "appended downloaded block");
2215                self.try_connect_buffered_blocks(block_num_hash)?;
2216            }
2217            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2218                // block is not connected to the canonical head, we need to download
2219                // its missing branch first
2220                return Ok(self.on_disconnected_downloaded_block(
2221                    block_num_hash,
2222                    missing_ancestor,
2223                    head,
2224                ))
2225            }
2226            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2227                trace!(target: "engine::tree", "downloaded block already executed");
2228            }
2229            Err(err) => {
2230                if let InsertPayloadError::Block(err) = err {
2231                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2232                    if let Err(fatal) = self.on_insert_block_error(err) {
2233                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2234                        return Err(fatal)
2235                    }
2236                }
2237            }
2238        }
2239        Ok(None)
2240    }
2241
2242    fn insert_payload(
2243        &mut self,
2244        payload: T::ExecutionData,
2245    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2246        self.insert_block_or_payload(
2247            payload.block_with_parent(),
2248            payload,
2249            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2250            |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2251        )
2252    }
2253
2254    fn insert_block(
2255        &mut self,
2256        block: RecoveredBlock<N::Block>,
2257    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2258        self.insert_block_or_payload(
2259            block.block_with_parent(),
2260            block,
2261            |validator, block, ctx| validator.validate_block(block, ctx),
2262            |_, block| Ok(block),
2263        )
2264    }
2265
2266    fn insert_block_or_payload<Input, Err>(
2267        &mut self,
2268        block_id: BlockWithParent,
2269        input: Input,
2270        execute: impl FnOnce(
2271            &mut V,
2272            Input,
2273            TreeCtx<'_, N>,
2274        ) -> Result<ExecutedBlockWithTrieUpdates<N>, Err>,
2275        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2276    ) -> Result<InsertPayloadOk, Err>
2277    where
2278        Err: From<InsertBlockError<N::Block>>,
2279    {
2280        let block_insert_start = Instant::now();
2281        let block_num_hash = block_id.block;
2282        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2283
2284        match self.sealed_header_by_hash(block_num_hash.hash) {
2285            Err(err) => {
2286                let block = convert_to_block(self, input)?;
2287                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2288            }
2289            Ok(Some(_)) => {
2290                // We now assume that we already have this block in the tree. However, we need to
2291                // run the conversion to ensure that the block hash is valid.
2292                convert_to_block(self, input)?;
2293                return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2294            }
2295            _ => {}
2296        };
2297
2298        // Ensure that the parent state is available.
2299        match self.state_provider_builder(block_id.parent) {
2300            Err(err) => {
2301                let block = convert_to_block(self, input)?;
2302                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2303            }
2304            Ok(None) => {
2305                let block = convert_to_block(self, input)?;
2306
2307                // we don't have the state required to execute this block, buffering it and find the
2308                // missing parent block
2309                let missing_ancestor = self
2310                    .state
2311                    .buffer
2312                    .lowest_ancestor(&block.parent_hash())
2313                    .map(|block| block.parent_num_hash())
2314                    .unwrap_or_else(|| block.parent_num_hash());
2315
2316                self.state.buffer.insert_block(block);
2317
2318                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2319                    head: self.state.tree_state.current_canonical_head,
2320                    missing_ancestor,
2321                }))
2322            }
2323            Ok(Some(_)) => {}
2324        }
2325
2326        // determine whether we are on a fork chain
2327        let is_fork = match self.is_fork(block_id) {
2328            Err(err) => {
2329                let block = convert_to_block(self, input)?;
2330                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2331            }
2332            Ok(is_fork) => is_fork,
2333        };
2334
2335        let ctx = TreeCtx::new(
2336            &mut self.state,
2337            &self.persistence_state,
2338            &self.canonical_in_memory_state,
2339            is_fork,
2340        );
2341
2342        let start = Instant::now();
2343
2344        let executed = execute(&mut self.payload_validator, input, ctx)?;
2345
2346        // if the parent is the canonical head, we can insert the block as the pending block
2347        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2348        {
2349            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2350            self.canonical_in_memory_state.set_pending_block(executed.clone());
2351        }
2352
2353        self.state.tree_state.insert_executed(executed.clone());
2354        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2355
2356        // emit insert event
2357        let elapsed = start.elapsed();
2358        let engine_event = if is_fork {
2359            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2360        } else {
2361            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2362        };
2363        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2364
2365        self.metrics
2366            .engine
2367            .block_insert_total_duration
2368            .record(block_insert_start.elapsed().as_secs_f64());
2369        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2370        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2371    }
2372
2373    /// Computes the trie input at the provided parent hash.
2374    ///
2375    /// The goal of this function is to take in-memory blocks and generate a [`TrieInput`] that
2376    /// serves as an overlay to the database blocks.
2377    ///
2378    /// It works as follows:
2379    /// 1. Collect in-memory blocks that are descendants of the provided parent hash using
2380    ///    [`TreeState::blocks_by_hash`].
2381    /// 2. If the persistence is in progress, and the block that we're computing the trie input for
2382    ///    is a descendant of the currently persisting blocks, we need to be sure that in-memory
2383    ///    blocks are not overlapping with the database blocks that may have been already persisted.
2384    ///    To do that, we're filtering out in-memory blocks that are lower than the highest database
2385    ///    block.
2386    /// 3. Once in-memory blocks are collected and optionally filtered, we compute the
2387    ///    [`HashedPostState`] from them.
2388    fn compute_trie_input<TP: DBProvider + BlockNumReader>(
2389        &self,
2390        persisting_kind: PersistingKind,
2391        provider: TP,
2392        parent_hash: B256,
2393        allocated_trie_input: Option<TrieInput>,
2394    ) -> ProviderResult<TrieInput> {
2395        // get allocated trie input or use a default trie input
2396        let mut input = allocated_trie_input.unwrap_or_default();
2397
2398        let best_block_number = provider.best_block_number()?;
2399
2400        let (mut historical, mut blocks) = self
2401            .state
2402            .tree_state
2403            .blocks_by_hash(parent_hash)
2404            .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2405
2406        // If the current block is a descendant of the currently persisting blocks, then we need to
2407        // filter in-memory blocks, so that none of them are already persisted in the database.
2408        if persisting_kind.is_descendant() {
2409            // Iterate over the blocks from oldest to newest.
2410            while let Some(block) = blocks.last() {
2411                let recovered_block = block.recovered_block();
2412                if recovered_block.number() <= best_block_number {
2413                    // Remove those blocks that lower than or equal to the highest database
2414                    // block.
2415                    blocks.pop();
2416                } else {
2417                    // If the block is higher than the best block number, stop filtering, as it's
2418                    // the first block that's not in the database.
2419                    break
2420                }
2421            }
2422
2423            historical = if let Some(block) = blocks.last() {
2424                // If there are any in-memory blocks left after filtering, set the anchor to the
2425                // parent of the oldest block.
2426                (block.recovered_block().number() - 1).into()
2427            } else {
2428                // Otherwise, set the anchor to the original provided parent hash.
2429                parent_hash.into()
2430            };
2431        }
2432
2433        if blocks.is_empty() {
2434            debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2435        } else {
2436            debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2437        }
2438
2439        // Convert the historical block to the block number.
2440        let block_number = provider
2441            .convert_hash_or_number(historical)?
2442            .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2443
2444        // Retrieve revert state for historical block.
2445        let revert_state = if block_number == best_block_number {
2446            // We do not check against the `last_block_number` here because
2447            // `HashedPostState::from_reverts` only uses the database tables, and not static files.
2448            debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2449            HashedPostState::default()
2450        } else {
2451            let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
2452                provider.tx_ref(),
2453                block_number + 1,
2454            )
2455            .map_err(ProviderError::from)?;
2456            debug!(
2457                target: "engine::tree",
2458                block_number,
2459                best_block_number,
2460                accounts = revert_state.accounts.len(),
2461                storages = revert_state.storages.len(),
2462                "Non-empty revert state"
2463            );
2464            revert_state
2465        };
2466        input.append(revert_state);
2467
2468        // Extend with contents of parent in-memory blocks.
2469        input.extend_with_blocks(
2470            blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
2471        );
2472
2473        Ok(input)
2474    }
2475
2476    /// Handles an error that occurred while inserting a block.
2477    ///
2478    /// If this is a validation error this will mark the block as invalid.
2479    ///
2480    /// Returns the proper payload status response if the block is invalid.
2481    fn on_insert_block_error(
2482        &mut self,
2483        error: InsertBlockError<N::Block>,
2484    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2485        let (block, error) = error.split();
2486
2487        // if invalid block, we check the validation error. Otherwise return the fatal
2488        // error.
2489        let validation_err = error.ensure_validation_error()?;
2490
2491        // If the error was due to an invalid payload, the payload is added to the
2492        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2493        // returned.
2494        warn!(
2495            target: "engine::tree",
2496            invalid_hash=%block.hash(),
2497            invalid_number=block.number(),
2498            %validation_err,
2499            "Invalid block error on new payload",
2500        );
2501        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2502
2503        // keep track of the invalid header
2504        self.state.invalid_headers.insert(block.block_with_parent());
2505        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2506            Box::new(block),
2507        )));
2508        Ok(PayloadStatus::new(
2509            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2510            latest_valid_hash,
2511        ))
2512    }
2513
2514    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2515    fn on_new_payload_error(
2516        &mut self,
2517        error: NewPayloadError,
2518        parent_hash: B256,
2519    ) -> ProviderResult<PayloadStatus> {
2520        error!(target: "engine::tree", %error, "Invalid payload");
2521        // we need to convert the error to a payload status (response to the CL)
2522
2523        let latest_valid_hash =
2524            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2525                // Engine-API rules:
2526                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2527                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2528                None
2529            } else {
2530                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2531            };
2532
2533        let status = PayloadStatusEnum::from(error);
2534        Ok(PayloadStatus::new(status, latest_valid_hash))
2535    }
2536
2537    /// Attempts to find the header for the given block hash if it is canonical.
2538    pub fn find_canonical_header(
2539        &self,
2540        hash: B256,
2541    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2542        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2543
2544        if canonical.is_none() {
2545            canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2546        }
2547
2548        Ok(canonical)
2549    }
2550
2551    /// Updates the tracked finalized block if we have it.
2552    fn update_finalized_block(
2553        &self,
2554        finalized_block_hash: B256,
2555    ) -> Result<(), OnForkChoiceUpdated> {
2556        if finalized_block_hash.is_zero() {
2557            return Ok(())
2558        }
2559
2560        match self.find_canonical_header(finalized_block_hash) {
2561            Ok(None) => {
2562                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2563                // if the finalized block is not known, we can't update the finalized block
2564                return Err(OnForkChoiceUpdated::invalid_state())
2565            }
2566            Ok(Some(finalized)) => {
2567                if Some(finalized.num_hash()) !=
2568                    self.canonical_in_memory_state.get_finalized_num_hash()
2569                {
2570                    // we're also persisting the finalized block on disk so we can reload it on
2571                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2572                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2573                    self.canonical_in_memory_state.set_finalized(finalized);
2574                }
2575            }
2576            Err(err) => {
2577                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2578            }
2579        }
2580
2581        Ok(())
2582    }
2583
2584    /// Updates the tracked safe block if we have it
2585    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2586        if safe_block_hash.is_zero() {
2587            return Ok(())
2588        }
2589
2590        match self.find_canonical_header(safe_block_hash) {
2591            Ok(None) => {
2592                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2593                // if the safe block is not known, we can't update the safe block
2594                return Err(OnForkChoiceUpdated::invalid_state())
2595            }
2596            Ok(Some(safe)) => {
2597                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2598                    // we're also persisting the safe block on disk so we can reload it on
2599                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2600                    let _ = self.persistence.save_safe_block_number(safe.number());
2601                    self.canonical_in_memory_state.set_safe(safe);
2602                }
2603            }
2604            Err(err) => {
2605                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2606            }
2607        }
2608
2609        Ok(())
2610    }
2611
2612    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2613    /// made canonical.
2614    ///
2615    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2616    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2617    ///
2618    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2619    /// are consistent with the head block.
2620    fn ensure_consistent_forkchoice_state(
2621        &self,
2622        state: ForkchoiceState,
2623    ) -> Result<(), OnForkChoiceUpdated> {
2624        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2625        // after the head block is canonicalized.
2626        //
2627        // This ensures that the finalized block is consistent with the head block, i.e. the
2628        // finalized block is an ancestor of the head block.
2629        self.update_finalized_block(state.finalized_block_hash)?;
2630
2631        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2632        // after the head block is canonicalized.
2633        //
2634        // This ensures that the safe block is consistent with the head block, i.e. the safe
2635        // block is an ancestor of the head block.
2636        self.update_safe_block(state.safe_block_hash)
2637    }
2638
2639    /// Pre-validate forkchoice update and check whether it can be processed.
2640    ///
2641    /// This method returns the update outcome if validation fails or
2642    /// the node is syncing and the update cannot be processed at the moment.
2643    fn pre_validate_forkchoice_update(
2644        &mut self,
2645        state: ForkchoiceState,
2646    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2647        if state.head_block_hash.is_zero() {
2648            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2649        }
2650
2651        // check if the new head hash is connected to any ancestor that we previously marked as
2652        // invalid
2653        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2654        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2655            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2656        }
2657
2658        if !self.backfill_sync_state.is_idle() {
2659            // We can only process new forkchoice updates if the pipeline is idle, since it requires
2660            // exclusive access to the database
2661            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2662            return Ok(Some(OnForkChoiceUpdated::syncing()))
2663        }
2664
2665        Ok(None)
2666    }
2667
2668    /// Validates the payload attributes with respect to the header and fork choice state.
2669    ///
2670    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2671    /// return an error if the payload attributes are invalid.
2672    fn process_payload_attributes(
2673        &self,
2674        attrs: T::PayloadAttributes,
2675        head: &N::BlockHeader,
2676        state: ForkchoiceState,
2677        version: EngineApiMessageVersion,
2678    ) -> OnForkChoiceUpdated {
2679        if let Err(err) =
2680            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2681        {
2682            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2683            return OnForkChoiceUpdated::invalid_payload_attributes()
2684        }
2685
2686        // 8. Client software MUST begin a payload build process building on top of
2687        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2688        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2689        //    The build process is specified in the Payload building section.
2690        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2691            state.head_block_hash,
2692            attrs,
2693            version as u8,
2694        ) {
2695            Ok(attributes) => {
2696                // send the payload to the builder and return the receiver for the pending payload
2697                // id, initiating payload job is handled asynchronously
2698                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2699
2700                // Client software MUST respond to this method call in the following way:
2701                // {
2702                //      payloadStatus: {
2703                //          status: VALID,
2704                //          latestValidHash: forkchoiceState.headBlockHash,
2705                //          validationError: null
2706                //      },
2707                //      payloadId: buildProcessId
2708                // }
2709                //
2710                // if the payload is deemed VALID and the build process has begun.
2711                OnForkChoiceUpdated::updated_with_pending_payload_id(
2712                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2713                    pending_payload_id,
2714                )
2715            }
2716            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2717        }
2718    }
2719
2720    /// Remove all blocks up to __and including__ the given block number.
2721    ///
2722    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2723    /// those which have a fork point at or below the finalized hash.
2724    ///
2725    /// Canonical blocks below the upper bound will still be removed.
2726    pub(crate) fn remove_before(
2727        &mut self,
2728        upper_bound: BlockNumHash,
2729        finalized_hash: Option<B256>,
2730    ) -> ProviderResult<()> {
2731        // first fetch the finalized block number and then call the remove_before method on
2732        // tree_state
2733        let num = if let Some(hash) = finalized_hash {
2734            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2735        } else {
2736            None
2737        };
2738
2739        self.state.tree_state.remove_until(
2740            upper_bound,
2741            self.persistence_state.last_persisted_block.hash,
2742            num,
2743        );
2744        Ok(())
2745    }
2746
2747    /// Returns a builder for creating state providers for the given hash.
2748    ///
2749    /// This is an optimization for parallel execution contexts where we want to avoid
2750    /// creating state providers in the critical path.
2751    pub fn state_provider_builder(
2752        &self,
2753        hash: B256,
2754    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2755    where
2756        P: BlockReader + StateProviderFactory + StateReader + Clone,
2757    {
2758        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2759            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2760            // the block leads back to the canonical chain
2761            return Ok(Some(StateProviderBuilder::new(
2762                self.provider.clone(),
2763                historical,
2764                Some(blocks),
2765            )))
2766        }
2767
2768        // Check if the block is persisted
2769        if let Some(header) = self.provider.header(&hash)? {
2770            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2771            // For persisted blocks, we create a builder that will fetch state directly from the
2772            // database
2773            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2774        }
2775
2776        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2777        Ok(None)
2778    }
2779}
2780
2781/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2782/// variant.
2783///
2784/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2785/// is valid or not.
2786#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2787pub enum BlockStatus {
2788    /// The block is valid and block extends canonical chain.
2789    Valid,
2790    /// The block may be valid and has an unknown missing ancestor.
2791    Disconnected {
2792        /// Current canonical head.
2793        head: BlockNumHash,
2794        /// The lowest ancestor block that is not connected to the canonical chain.
2795        missing_ancestor: BlockNumHash,
2796    },
2797}
2798
2799/// How a payload was inserted if it was valid.
2800///
2801/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
2802/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
2803#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2804pub enum InsertPayloadOk {
2805    /// The payload was valid, but we have already seen it.
2806    AlreadySeen(BlockStatus),
2807    /// The payload was valid and inserted into the tree.
2808    Inserted(BlockStatus),
2809}
2810
2811/// Whether or not the blocks are currently persisting and the input block is a descendant.
2812#[derive(Debug, Clone, Copy)]
2813pub enum PersistingKind {
2814    /// The blocks are not currently persisting.
2815    NotPersisting,
2816    /// The blocks are currently persisting but the input block is not a descendant.
2817    PersistingNotDescendant,
2818    /// The blocks are currently persisting and the input block is a descendant.
2819    PersistingDescendant,
2820}
2821
2822impl PersistingKind {
2823    /// Returns true if the parallel state root can be run.
2824    ///
2825    /// We only run the parallel state root if we are not currently persisting any blocks or
2826    /// persisting blocks that are all ancestors of the one we are calculating the state root for.
2827    pub const fn can_run_parallel_state_root(&self) -> bool {
2828        matches!(self, Self::NotPersisting | Self::PersistingDescendant)
2829    }
2830
2831    /// Returns true if the blocks are currently being persisted and the input block is a
2832    /// descendant.
2833    pub const fn is_descendant(&self) -> bool {
2834        matches!(self, Self::PersistingDescendant)
2835    }
2836}