reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use persistence_state::CurrentPersistenceAction;
17use reth_chain_state::{
18    CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
19    MemoryOverlayStateProvider, NewCanonicalChain,
20};
21use reth_consensus::{Consensus, FullConsensus};
22use reth_engine_primitives::{
23    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
24    ForkchoiceStateTracker, OnForkChoiceUpdated,
25};
26use reth_errors::{ConsensusError, ProviderResult};
27use reth_evm::{ConfigureEvm, OnStateHook};
28use reth_payload_builder::PayloadBuilderHandle;
29use reth_payload_primitives::{
30    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
31};
32use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
33use reth_provider::{
34    providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
35    HashedPostStateProvider, ProviderError, StateProviderBox, StateProviderFactory, StateReader,
36    StateRootProvider, TransactionVariant,
37};
38use reth_revm::database::StateProviderDatabase;
39use reth_stages_api::ControlFlow;
40use reth_trie::{HashedPostState, TrieInput};
41use reth_trie_db::DatabaseHashedPostState;
42use revm::state::EvmState;
43use state::TreeState;
44use std::{
45    fmt::Debug,
46    sync::{
47        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
48        Arc,
49    },
50    time::Instant,
51};
52use tokio::sync::{
53    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
54    oneshot::{self, error::TryRecvError},
55};
56use tracing::*;
57
58mod block_buffer;
59mod cached_state;
60pub mod error;
61mod instrumented_state;
62mod invalid_headers;
63mod metrics;
64mod payload_processor;
65pub mod payload_validator;
66mod persistence_state;
67pub mod precompile_cache;
68#[cfg(test)]
69mod tests;
70// TODO(alexey): compare trie updates in `insert_block_inner`
71#[expect(unused)]
72mod trie_updates;
73
74use crate::tree::error::AdvancePersistenceError;
75pub use block_buffer::BlockBuffer;
76pub use invalid_headers::InvalidHeaderCache;
77pub use payload_processor::*;
78pub use payload_validator::{BasicEngineValidator, EngineValidator};
79pub use persistence_state::PersistenceState;
80pub use reth_engine_primitives::TreeConfig;
81use reth_trie::KeccakKeyHasher;
82
83pub mod state;
84
85/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
86///
87/// This is the default threshold, and represents the distance (gap) from the local head to a
88/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
89/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
90///
91/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
92/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
93/// backfill this gap.
94pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
95
96/// A builder for creating state providers that can be used across threads.
97#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99    /// The provider factory used to create providers.
100    provider_factory: P,
101    /// The historical block hash to fetch state from.
102    historical: B256,
103    /// The blocks that form the chain from historical to target and are in memory.
104    overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108    /// Creates a new state provider from the provider factory, historical block hash and optional
109    /// overlaid blocks.
110    pub const fn new(
111        provider_factory: P,
112        historical: B256,
113        overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
114    ) -> Self {
115        Self { provider_factory, historical, overlay }
116    }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121    P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123    /// Creates a new state provider from this builder.
124    pub fn build(&self) -> ProviderResult<StateProviderBox> {
125        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126        if let Some(overlay) = self.overlay.clone() {
127            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128        }
129        Ok(provider)
130    }
131}
132
133/// Tracks the state of the engine api internals.
134///
135/// This type is not shareable.
136#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138    /// Tracks the state of the blockchain tree.
139    tree_state: TreeState<N>,
140    /// Tracks the forkchoice state updates received by the CL.
141    forkchoice_state_tracker: ForkchoiceStateTracker,
142    /// Buffer of detached blocks.
143    buffer: BlockBuffer<N::Block>,
144    /// Tracks the header of invalid payloads that were rejected by the engine because they're
145    /// invalid.
146    invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150    fn new(
151        block_buffer_limit: u32,
152        max_invalid_header_cache_length: u32,
153        canonical_block: BlockNumHash,
154        engine_kind: EngineApiKind,
155    ) -> Self {
156        Self {
157            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158            buffer: BlockBuffer::new(block_buffer_limit),
159            tree_state: TreeState::new(canonical_block, engine_kind),
160            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161        }
162    }
163}
164
165/// The outcome of a tree operation.
166#[derive(Debug)]
167pub struct TreeOutcome<T> {
168    /// The outcome of the operation.
169    pub outcome: T,
170    /// An optional event to tell the caller to do something.
171    pub event: Option<TreeEvent>,
172}
173
174impl<T> TreeOutcome<T> {
175    /// Create new tree outcome.
176    pub const fn new(outcome: T) -> Self {
177        Self { outcome, event: None }
178    }
179
180    /// Set event on the outcome.
181    pub fn with_event(mut self, event: TreeEvent) -> Self {
182        self.event = Some(event);
183        self
184    }
185}
186
187/// Events that are triggered by Tree Chain
188#[derive(Debug)]
189pub enum TreeEvent {
190    /// Tree action is needed.
191    TreeAction(TreeAction),
192    /// Backfill action is needed.
193    BackfillAction(BackfillAction),
194    /// Block download is needed.
195    Download(DownloadRequest),
196}
197
198impl TreeEvent {
199    /// Returns true if the event is a backfill action.
200    const fn is_backfill_action(&self) -> bool {
201        matches!(self, Self::BackfillAction(_))
202    }
203}
204
205/// The actions that can be performed on the tree.
206#[derive(Debug)]
207pub enum TreeAction {
208    /// Make target canonical.
209    MakeCanonical {
210        /// The sync target head hash
211        sync_target_head: B256,
212    },
213}
214
215/// Wrapper struct that combines metrics and state hook
216struct MeteredStateHook {
217    metrics: reth_evm::metrics::ExecutorMetrics,
218    inner_hook: Box<dyn OnStateHook>,
219}
220
221impl OnStateHook for MeteredStateHook {
222    fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
223        // Update the metrics for the number of accounts, storage slots and bytecodes loaded
224        let accounts = state.keys().len();
225        let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
226        let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
227
228        self.metrics.accounts_loaded_histogram.record(accounts as f64);
229        self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
230        self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
231
232        // Call the original state hook
233        self.inner_hook.on_state(source, state);
234    }
235}
236
237/// The engine API tree handler implementation.
238///
239/// This type is responsible for processing engine API requests, maintaining the canonical state and
240/// emitting events.
241pub struct EngineApiTreeHandler<N, P, T, V, C>
242where
243    N: NodePrimitives,
244    T: PayloadTypes,
245    C: ConfigureEvm<Primitives = N> + 'static,
246{
247    provider: P,
248    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
249    payload_validator: V,
250    /// Keeps track of internals such as executed and buffered blocks.
251    state: EngineApiTreeState<N>,
252    /// The half for sending messages to the engine.
253    ///
254    /// This is kept so that we can queue in messages to ourself that we can process later, for
255    /// example distributing workload across multiple messages that would otherwise take too long
256    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
257    /// them one by one so that we can handle incoming engine API in between and don't become
258    /// unresponsive. This can happen during live sync transition where we're trying to close the
259    /// gap (up to 3 epochs of blocks in the worst case).
260    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
261    /// Incoming engine API requests.
262    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
263    /// Outgoing events that are emitted to the handler.
264    outgoing: UnboundedSender<EngineApiEvent<N>>,
265    /// Channels to the persistence layer.
266    persistence: PersistenceHandle<N>,
267    /// Tracks the state changes of the persistence task.
268    persistence_state: PersistenceState,
269    /// Flag indicating the state of the node's backfill synchronization process.
270    backfill_sync_state: BackfillSyncState,
271    /// Keeps track of the state of the canonical chain that isn't persisted yet.
272    /// This is intended to be accessed from external sources, such as rpc.
273    canonical_in_memory_state: CanonicalInMemoryState<N>,
274    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
275    /// updates
276    payload_builder: PayloadBuilderHandle<T>,
277    /// Configuration settings.
278    config: TreeConfig,
279    /// Metrics for the engine api.
280    metrics: EngineApiMetrics,
281    /// The engine API variant of this handler
282    engine_kind: EngineApiKind,
283    /// The EVM configuration.
284    evm_config: C,
285}
286
287impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
288    for EngineApiTreeHandler<N, P, T, V, C>
289where
290    N: NodePrimitives,
291    C: Debug + ConfigureEvm<Primitives = N>,
292{
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct("EngineApiTreeHandler")
295            .field("provider", &self.provider)
296            .field("consensus", &self.consensus)
297            .field("payload_validator", &self.payload_validator)
298            .field("state", &self.state)
299            .field("incoming_tx", &self.incoming_tx)
300            .field("persistence", &self.persistence)
301            .field("persistence_state", &self.persistence_state)
302            .field("backfill_sync_state", &self.backfill_sync_state)
303            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
304            .field("payload_builder", &self.payload_builder)
305            .field("config", &self.config)
306            .field("metrics", &self.metrics)
307            .field("engine_kind", &self.engine_kind)
308            .field("evm_config", &self.evm_config)
309            .finish()
310    }
311}
312
313impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
314where
315    N: NodePrimitives,
316    P: DatabaseProviderFactory
317        + BlockReader<Block = N::Block, Header = N::BlockHeader>
318        + StateProviderFactory
319        + StateReader<Receipt = N::Receipt>
320        + HashedPostStateProvider
321        + Clone
322        + 'static,
323    <P as DatabaseProviderFactory>::Provider:
324        BlockReader<Block = N::Block, Header = N::BlockHeader>,
325    C: ConfigureEvm<Primitives = N> + 'static,
326    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
327    V: EngineValidator<T>,
328{
329    /// Creates a new [`EngineApiTreeHandler`].
330    #[expect(clippy::too_many_arguments)]
331    pub fn new(
332        provider: P,
333        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
334        payload_validator: V,
335        outgoing: UnboundedSender<EngineApiEvent<N>>,
336        state: EngineApiTreeState<N>,
337        canonical_in_memory_state: CanonicalInMemoryState<N>,
338        persistence: PersistenceHandle<N>,
339        persistence_state: PersistenceState,
340        payload_builder: PayloadBuilderHandle<T>,
341        config: TreeConfig,
342        engine_kind: EngineApiKind,
343        evm_config: C,
344    ) -> Self {
345        let (incoming_tx, incoming) = std::sync::mpsc::channel();
346
347        Self {
348            provider,
349            consensus,
350            payload_validator,
351            incoming,
352            outgoing,
353            persistence,
354            persistence_state,
355            backfill_sync_state: BackfillSyncState::Idle,
356            state,
357            canonical_in_memory_state,
358            payload_builder,
359            config,
360            metrics: Default::default(),
361            incoming_tx,
362            engine_kind,
363            evm_config,
364        }
365    }
366
367    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
368    /// own thread.
369    ///
370    /// Returns the sender through which incoming requests can be sent to the task and the receiver
371    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
372    #[expect(clippy::complexity)]
373    pub fn spawn_new(
374        provider: P,
375        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
376        payload_validator: V,
377        persistence: PersistenceHandle<N>,
378        payload_builder: PayloadBuilderHandle<T>,
379        canonical_in_memory_state: CanonicalInMemoryState<N>,
380        config: TreeConfig,
381        kind: EngineApiKind,
382        evm_config: C,
383    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
384    {
385        let best_block_number = provider.best_block_number().unwrap_or(0);
386        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
387
388        let persistence_state = PersistenceState {
389            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
390            rx: None,
391        };
392
393        let (tx, outgoing) = unbounded_channel();
394        let state = EngineApiTreeState::new(
395            config.block_buffer_limit(),
396            config.max_invalid_header_cache_length(),
397            header.num_hash(),
398            kind,
399        );
400
401        let task = Self::new(
402            provider,
403            consensus,
404            payload_validator,
405            tx,
406            state,
407            canonical_in_memory_state,
408            persistence,
409            persistence_state,
410            payload_builder,
411            config,
412            kind,
413            evm_config,
414        );
415        let incoming = task.incoming_tx.clone();
416        std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
417        (incoming, outgoing)
418    }
419
420    /// Returns a new [`Sender`] to send messages to this type.
421    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
422        self.incoming_tx.clone()
423    }
424
425    /// Run the engine API handler.
426    ///
427    /// This will block the current thread and process incoming messages.
428    pub fn run(mut self) {
429        loop {
430            match self.try_recv_engine_message() {
431                Ok(Some(msg)) => {
432                    debug!(target: "engine::tree", %msg, "received new engine message");
433                    if let Err(fatal) = self.on_engine_message(msg) {
434                        error!(target: "engine::tree", %fatal, "insert block fatal error");
435                        return
436                    }
437                }
438                Ok(None) => {
439                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440                }
441                Err(_err) => {
442                    error!(target: "engine::tree", "Engine channel disconnected");
443                    return
444                }
445            }
446
447            if let Err(err) = self.advance_persistence() {
448                error!(target: "engine::tree", %err, "Advancing persistence failed");
449                return
450            }
451        }
452    }
453
454    /// Invoked when previously requested blocks were downloaded.
455    ///
456    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
457    /// will execute the first batch and send the remaining blocks back through the channel so that
458    /// block request processing isn't blocked for a long time.
459    fn on_downloaded(
460        &mut self,
461        mut blocks: Vec<RecoveredBlock<N::Block>>,
462    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463        if blocks.is_empty() {
464            // nothing to execute
465            return Ok(None)
466        }
467
468        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470        for block in blocks.drain(..batch) {
471            if let Some(event) = self.on_downloaded_block(block)? {
472                let needs_backfill = event.is_backfill_action();
473                self.on_tree_event(event)?;
474                if needs_backfill {
475                    // can exit early if backfill is needed
476                    return Ok(None)
477                }
478            }
479        }
480
481        // if we still have blocks to execute, send them as a followup request
482        if !blocks.is_empty() {
483            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484        }
485
486        Ok(None)
487    }
488
489    /// When the Consensus layer receives a new block via the consensus gossip protocol,
490    /// the transactions in the block are sent to the execution layer in the form of a
491    /// [`PayloadTypes::ExecutionData`], for example
492    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
493    /// Execution layer executes the transactions and validates the state in the block header,
494    /// then passes validation data back to Consensus layer, that adds the block to the head of
495    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
496    /// network in the form of a "Beacon block".
497    ///
498    /// These responses should adhere to the [Engine API Spec for
499    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
500    ///
501    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
502    /// returns an error if an internal error occurred.
503    #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
504    fn on_new_payload(
505        &mut self,
506        payload: T::ExecutionData,
507    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508        trace!(target: "engine::tree", "invoked new payload");
509        self.metrics.engine.new_payload_messages.increment(1);
510
511        // start timing for the new payload process
512        let start = Instant::now();
513
514        // Ensures that the given payload does not violate any consensus rules that concern the
515        // block's layout, like:
516        //    - missing or invalid base fee
517        //    - invalid extra data
518        //    - invalid transactions
519        //    - incorrect hash
520        //    - the versioned hashes passed with the payload do not exactly match transaction
521        //      versioned hashes
522        //    - the block does not contain blob transactions if it is pre-cancun
523        //
524        // This validates the following engine API rule:
525        //
526        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
527        //    validation by taking the following steps:
528        //
529        //   1. Obtain the actual array by concatenating blob versioned hashes lists
530        //      (`tx.blob_versioned_hashes`) of each [blob
531        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
532        //      in the payload, respecting the order of inclusion. If the payload has no blob
533        //      transactions the expected array **MUST** be `[]`.
534        //
535        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
536        //      null}` if the expected and the actual arrays don't match.
537        //
538        // This validation **MUST** be instantly run in all cases even during active sync process.
539
540        let num_hash = payload.num_hash();
541        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
542        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
543
544        let block_hash = num_hash.hash;
545
546        // Check for invalid ancestors
547        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
548            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
549            return Ok(TreeOutcome::new(status));
550        }
551
552        // record pre-execution phase duration
553        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
554
555        let status = if self.backfill_sync_state.is_idle() {
556            self.try_insert_payload(payload)?
557        } else {
558            self.try_buffer_payload(payload)?
559        };
560
561        let mut outcome = TreeOutcome::new(status);
562        // if the block is valid and it is the current sync target head, make it canonical
563        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
564            // Only create the canonical event if this block isn't already the canonical head
565            if self.state.tree_state.canonical_block_hash() != block_hash {
566                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
567                    sync_target_head: block_hash,
568                }));
569            }
570        }
571
572        // record total newPayload duration
573        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
574
575        Ok(outcome)
576    }
577
578    /// Processes a payload during normal sync operation.
579    ///
580    /// Returns:
581    /// - `Valid`: Payload successfully validated and inserted
582    /// - `Syncing`: Parent missing, payload buffered for later
583    /// - Error status: Payload is invalid
584    fn try_insert_payload(
585        &mut self,
586        payload: T::ExecutionData,
587    ) -> Result<PayloadStatus, InsertBlockFatalError> {
588        let block_hash = payload.block_hash();
589        let num_hash = payload.num_hash();
590        let parent_hash = payload.parent_hash();
591        let mut latest_valid_hash = None;
592
593        match self.insert_payload(payload) {
594            Ok(status) => {
595                let status = match status {
596                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
597                        latest_valid_hash = Some(block_hash);
598                        self.try_connect_buffered_blocks(num_hash)?;
599                        PayloadStatusEnum::Valid
600                    }
601                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
602                        latest_valid_hash = Some(block_hash);
603                        PayloadStatusEnum::Valid
604                    }
605                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
606                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
607                        // not known to be invalid, but we don't know anything else
608                        PayloadStatusEnum::Syncing
609                    }
610                };
611
612                Ok(PayloadStatus::new(status, latest_valid_hash))
613            }
614            Err(error) => match error {
615                InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
616                InsertPayloadError::Payload(error) => {
617                    Ok(self.on_new_payload_error(error, parent_hash)?)
618                }
619            },
620        }
621    }
622
623    /// Stores a payload for later processing during backfill sync.
624    ///
625    /// During backfill, the node lacks the state needed to validate payloads,
626    /// so they are buffered (stored in memory) until their parent blocks are synced.
627    ///
628    /// Returns:
629    /// - `Syncing`: Payload successfully buffered
630    /// - Error status: Payload is malformed or invalid
631    fn try_buffer_payload(
632        &mut self,
633        payload: T::ExecutionData,
634    ) -> Result<PayloadStatus, InsertBlockFatalError> {
635        let parent_hash = payload.parent_hash();
636
637        match self.payload_validator.ensure_well_formed_payload(payload) {
638            // if the block is well-formed, buffer it for later
639            Ok(block) => {
640                if let Err(error) = self.buffer_block(block) {
641                    Ok(self.on_insert_block_error(error)?)
642                } else {
643                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
644                }
645            }
646            Err(error) => Ok(self.on_new_payload_error(error, parent_hash)?),
647        }
648    }
649
650    /// Returns the new chain for the given head.
651    ///
652    /// This also handles reorgs.
653    ///
654    /// Note: This does not update the tracked state and instead returns the new chain based on the
655    /// given head.
656    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
657        // get the executed new head block
658        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
659            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
660            self.metrics.engine.executed_new_block_cache_miss.increment(1);
661            return Ok(None)
662        };
663
664        let new_head_number = new_head_block.recovered_block().number();
665        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
666
667        let mut new_chain = vec![new_head_block.clone()];
668        let mut current_hash = new_head_block.recovered_block().parent_hash();
669        let mut current_number = new_head_number - 1;
670
671        // Walk back the new chain until we reach a block we know about
672        //
673        // This is only done for in-memory blocks, because we should not have persisted any blocks
674        // that are _above_ the current canonical head.
675        while current_number > current_canonical_number {
676            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
677            {
678                current_hash = block.recovered_block().parent_hash();
679                current_number -= 1;
680                new_chain.push(block);
681            } else {
682                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
683                // This should never happen as we're walking back a chain that should connect to
684                // the canonical chain
685                return Ok(None)
686            }
687        }
688
689        // If we have reached the current canonical head by walking back from the target, then we
690        // know this represents an extension of the canonical chain.
691        if current_hash == self.state.tree_state.current_canonical_head.hash {
692            new_chain.reverse();
693
694            // Simple extension of the current chain
695            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
696        }
697
698        // We have a reorg. Walk back both chains to find the fork point.
699        let mut old_chain = Vec::new();
700        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
701
702        // If the canonical chain is ahead of the new chain,
703        // gather all blocks until new head number.
704        while current_canonical_number > current_number {
705            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
706                old_chain.push(block.clone());
707                old_hash = block.recovered_block().parent_hash();
708                current_canonical_number -= 1;
709            } else {
710                // This shouldn't happen as we're walking back the canonical chain
711                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
712                return Ok(None)
713            }
714        }
715
716        // Both new and old chain pointers are now at the same height.
717        debug_assert_eq!(current_number, current_canonical_number);
718
719        // Walk both chains from specified hashes at same height until
720        // a common ancestor (fork block) is reached.
721        while old_hash != current_hash {
722            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
723                old_hash = block.recovered_block().parent_hash();
724                old_chain.push(block);
725            } else {
726                // This shouldn't happen as we're walking back the canonical chain
727                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
728                return Ok(None)
729            }
730
731            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
732            {
733                current_hash = block.recovered_block().parent_hash();
734                new_chain.push(block);
735            } else {
736                // This shouldn't happen as we've already walked this path
737                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
738                return Ok(None)
739            }
740        }
741        new_chain.reverse();
742        old_chain.reverse();
743
744        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
745    }
746
747    /// Updates the latest block state to the specified canonical ancestor.
748    ///
749    /// This method ensures that the latest block tracks the given canonical header by resetting
750    ///
751    /// # Arguments
752    /// * `canonical_header` - The canonical header to set as the new head
753    ///
754    /// # Returns
755    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
756    ///
757    /// Caution: This unwinds the canonical chain
758    fn update_latest_block_to_canonical_ancestor(
759        &mut self,
760        canonical_header: &SealedHeader<N::BlockHeader>,
761    ) -> ProviderResult<()> {
762        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
763        let current_head_number = self.state.tree_state.canonical_block_number();
764        let new_head_number = canonical_header.number();
765        let new_head_hash = canonical_header.hash();
766
767        // Update tree state with the new canonical head
768        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
769
770        // Handle the state update based on whether this is an unwind scenario
771        if new_head_number < current_head_number {
772            debug!(
773                target: "engine::tree",
774                current_head = current_head_number,
775                new_head = new_head_number,
776                new_head_hash = ?new_head_hash,
777                "FCU unwind detected: reverting to canonical ancestor"
778            );
779
780            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
781        } else {
782            debug!(
783                target: "engine::tree",
784                previous_head = current_head_number,
785                new_head = new_head_number,
786                new_head_hash = ?new_head_hash,
787                "Advancing latest block to canonical ancestor"
788            );
789            self.handle_chain_advance_or_same_height(canonical_header)
790        }
791    }
792
793    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
794    /// to the canonical header
795    fn handle_canonical_chain_unwind(
796        &self,
797        current_head_number: u64,
798        canonical_header: &SealedHeader<N::BlockHeader>,
799    ) -> ProviderResult<()> {
800        let new_head_number = canonical_header.number();
801        debug!(
802            target: "engine::tree",
803            from = current_head_number,
804            to = new_head_number,
805            "Handling unwind: collecting blocks to remove from in-memory state"
806        );
807
808        // Collect blocks that need to be removed from memory
809        let old_blocks =
810            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
811
812        // Load and apply the canonical ancestor block
813        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
814    }
815
816    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
817    fn collect_blocks_for_canonical_unwind(
818        &self,
819        new_head_number: u64,
820        current_head_number: u64,
821    ) -> Vec<ExecutedBlock<N>> {
822        let mut old_blocks = Vec::new();
823
824        for block_num in (new_head_number + 1)..=current_head_number {
825            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
826                let executed_block = block_state.block_ref().block.clone();
827                old_blocks.push(executed_block);
828                debug!(
829                    target: "engine::tree",
830                    block_number = block_num,
831                    "Collected block for removal from in-memory state"
832                );
833            }
834        }
835
836        if old_blocks.is_empty() {
837            debug!(
838                target: "engine::tree",
839                "No blocks found in memory to remove, will clear and reset state"
840            );
841        }
842
843        old_blocks
844    }
845
846    /// Applies the canonical ancestor block via a reorg operation.
847    fn apply_canonical_ancestor_via_reorg(
848        &self,
849        canonical_header: &SealedHeader<N::BlockHeader>,
850        old_blocks: Vec<ExecutedBlock<N>>,
851    ) -> ProviderResult<()> {
852        let new_head_hash = canonical_header.hash();
853        let new_head_number = canonical_header.number();
854
855        // Try to load the canonical ancestor's block
856        match self.canonical_block_by_hash(new_head_hash)? {
857            Some(executed_block) => {
858                let block_with_trie = ExecutedBlockWithTrieUpdates {
859                    block: executed_block,
860                    trie: ExecutedTrieUpdates::Missing,
861                };
862
863                // Perform the reorg to properly handle the unwind
864                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
865                    new: vec![block_with_trie],
866                    old: old_blocks,
867                });
868
869                // CRITICAL: Update the canonical head after the reorg
870                // This ensures get_canonical_head() returns the correct block
871                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
872
873                debug!(
874                    target: "engine::tree",
875                    block_number = new_head_number,
876                    block_hash = ?new_head_hash,
877                    "Successfully loaded canonical ancestor into memory via reorg"
878                );
879            }
880            None => {
881                // Fallback: update header only if block cannot be found
882                warn!(
883                    target: "engine::tree",
884                    block_hash = ?new_head_hash,
885                    "Could not find canonical ancestor block, updating header only"
886                );
887                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
888            }
889        }
890
891        Ok(())
892    }
893
894    /// Handles chain advance or same height scenarios.
895    fn handle_chain_advance_or_same_height(
896        &self,
897        canonical_header: &SealedHeader<N::BlockHeader>,
898    ) -> ProviderResult<()> {
899        let new_head_number = canonical_header.number();
900        let new_head_hash = canonical_header.hash();
901
902        // Update the canonical head header
903        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
904
905        // Load the block into memory if it's not already present
906        self.ensure_block_in_memory(new_head_number, new_head_hash)
907    }
908
909    /// Ensures a block is loaded into memory if not already present.
910    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
911        // Check if block is already in memory
912        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
913            return Ok(());
914        }
915
916        // Try to load the block from storage
917        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
918            let block_with_trie = ExecutedBlockWithTrieUpdates {
919                block: executed_block,
920                trie: ExecutedTrieUpdates::Missing,
921            };
922
923            self.canonical_in_memory_state
924                .update_chain(NewCanonicalChain::Commit { new: vec![block_with_trie] });
925
926            debug!(
927                target: "engine::tree",
928                block_number,
929                block_hash = ?block_hash,
930                "Added canonical block to in-memory state"
931            );
932        }
933
934        Ok(())
935    }
936
937    /// Determines if the given block is part of a fork by checking that these
938    /// conditions are true:
939    /// * walking back from the target hash to verify that the target hash is not part of an
940    ///   extension of the canonical chain.
941    /// * walking back from the current head to verify that the target hash is not already part of
942    ///   the canonical chain.
943    ///
944    /// The header is required as an arg, because we might be checking that the header is a fork
945    /// block before it's in the tree state and before it's in the database.
946    fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
947        let target_hash = target.block.hash;
948        // verify that the given hash is not part of an extension of the canon chain.
949        let canonical_head = self.state.tree_state.canonical_head();
950        let mut current_hash;
951        let mut current_block = target;
952        loop {
953            if current_block.block.hash == canonical_head.hash {
954                return Ok(false)
955            }
956            // We already passed the canonical head
957            if current_block.block.number <= canonical_head.number {
958                break
959            }
960            current_hash = current_block.parent;
961
962            let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
963            current_block = next_block.block_with_parent();
964        }
965
966        // verify that the given hash is not already part of canonical chain stored in memory
967        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
968            return Ok(false)
969        }
970
971        // verify that the given hash is not already part of persisted canonical chain
972        if self.provider.block_number(target_hash)?.is_some() {
973            return Ok(false)
974        }
975
976        Ok(true)
977    }
978
979    /// Returns the persisting kind for the input block.
980    fn persisting_kind_for(&self, block: BlockWithParent) -> PersistingKind {
981        // Check that we're currently persisting.
982        let Some(action) = self.persistence_state.current_action() else {
983            return PersistingKind::NotPersisting
984        };
985        // Check that the persistince action is saving blocks, not removing them.
986        let CurrentPersistenceAction::SavingBlocks { highest } = action else {
987            return PersistingKind::PersistingNotDescendant
988        };
989
990        // The block being validated can only be a descendant if its number is higher than
991        // the highest block persisting. Otherwise, it's likely a fork of a lower block.
992        if block.block.number > highest.number &&
993            self.state.tree_state.is_descendant(*highest, block)
994        {
995            return PersistingKind::PersistingDescendant
996        }
997
998        // In all other cases, the block is not a descendant.
999        PersistingKind::PersistingNotDescendant
1000    }
1001
1002    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
1003    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
1004    /// chain.
1005    ///
1006    /// These responses should adhere to the [Engine API Spec for
1007    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
1008    ///
1009    /// Returns an error if an internal error occurred like a database error.
1010    #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
1011    fn on_forkchoice_updated(
1012        &mut self,
1013        state: ForkchoiceState,
1014        attrs: Option<T::PayloadAttributes>,
1015        version: EngineApiMessageVersion,
1016    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1017        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1018
1019        // Record metrics
1020        self.record_forkchoice_metrics(&attrs);
1021
1022        // Pre-validation of forkchoice state
1023        if let Some(early_result) = self.validate_forkchoice_state(state)? {
1024            return Ok(TreeOutcome::new(early_result));
1025        }
1026
1027        // Return early if we are on the correct fork
1028        if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
1029            return Ok(result);
1030        }
1031
1032        // Attempt to apply a chain update when the head differs from our canonical chain.
1033        // This handles reorgs and chain extensions by making the specified head canonical.
1034        if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1035            return Ok(result);
1036        }
1037
1038        // Fallback that ensures to catch up to the network's state.
1039        self.handle_missing_block(state)
1040    }
1041
1042    /// Records metrics for forkchoice updated calls
1043    fn record_forkchoice_metrics(&self, attrs: &Option<T::PayloadAttributes>) {
1044        self.metrics.engine.forkchoice_updated_messages.increment(1);
1045        if attrs.is_some() {
1046            self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1047        }
1048        self.canonical_in_memory_state.on_forkchoice_update_received();
1049    }
1050
1051    /// Pre-validates the forkchoice state and returns early if validation fails.
1052    ///
1053    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
1054    /// returned. Returns `None` if validation passes and processing should continue.
1055    fn validate_forkchoice_state(
1056        &mut self,
1057        state: ForkchoiceState,
1058    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1059        if state.head_block_hash.is_zero() {
1060            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1061        }
1062
1063        // Check if the new head hash is connected to any ancestor that we previously marked as
1064        // invalid
1065        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1066        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1067            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1068        }
1069
1070        if !self.backfill_sync_state.is_idle() {
1071            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1072            // exclusive access to the database
1073            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1074            return Ok(Some(OnForkChoiceUpdated::syncing()));
1075        }
1076
1077        Ok(None)
1078    }
1079
1080    /// Handles the case where the forkchoice head is already canonical.
1081    ///
1082    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1083    /// processing is complete. Returns `None` if the head is not canonical and processing
1084    /// should continue.
1085    fn handle_canonical_head(
1086        &self,
1087        state: ForkchoiceState,
1088        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1089        version: EngineApiMessageVersion,
1090    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1091        // Process the forkchoice update by trying to make the head block canonical
1092        //
1093        // We can only process this forkchoice update if:
1094        // - we have the `head` block
1095        // - the head block is part of a chain that is connected to the canonical chain. This
1096        //   includes reorgs.
1097        //
1098        // Performing a FCU involves:
1099        // - marking the FCU's head block as canonical
1100        // - updating in memory state to reflect the new canonical chain
1101        // - updating canonical state trackers
1102        // - emitting a canonicalization event for the new chain (including reorg)
1103        // - if we have payload attributes, delegate them to the payload service
1104
1105        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1106            return Ok(None);
1107        }
1108
1109        trace!(target: "engine::tree", "fcu head hash is already canonical");
1110
1111        // Update the safe and finalized blocks and ensure their values are valid
1112        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1113            // safe or finalized hashes are invalid
1114            return Ok(Some(TreeOutcome::new(outcome)));
1115        }
1116
1117        // Process payload attributes if the head is already canonical
1118        if let Some(attr) = attrs {
1119            let tip = self
1120                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1121                .ok_or_else(|| {
1122                    // If we can't find the canonical block, then something is wrong and we need
1123                    // to return an error
1124                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1125                })?;
1126            // Clone only when we actually need to process the attributes
1127            let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1128            return Ok(Some(TreeOutcome::new(updated)));
1129        }
1130
1131        // The head block is already canonical
1132        let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1133            PayloadStatusEnum::Valid,
1134            Some(state.head_block_hash),
1135        )));
1136        Ok(Some(outcome))
1137    }
1138
1139    /// Applies chain update for the new head block and processes payload attributes.
1140    ///
1141    /// This method handles the case where the forkchoice head differs from our current canonical
1142    /// head. It attempts to make the specified head block canonical by:
1143    /// - Checking if the head is already part of the canonical chain
1144    /// - Applying chain reorganizations (reorgs) if necessary
1145    /// - Processing payload attributes if provided
1146    /// - Returning the appropriate forkchoice update response
1147    ///
1148    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1149    /// Returns `None` if no chain update was needed or possible.
1150    fn apply_chain_update(
1151        &mut self,
1152        state: ForkchoiceState,
1153        attrs: &Option<T::PayloadAttributes>,
1154        version: EngineApiMessageVersion,
1155    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1156        // Check if the head is already part of the canonical chain
1157        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1158            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1159
1160            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1161            // own chain at will, so we need to always trigger a new payload job if requested.
1162            if self.engine_kind.is_opstack() ||
1163                self.config.always_process_payload_attributes_on_canonical_head()
1164            {
1165                if let Some(attr) = attrs {
1166                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1167                    // Clone only when we actually need to process the attributes
1168                    let updated = self.process_payload_attributes(
1169                        attr.clone(),
1170                        &canonical_header,
1171                        state,
1172                        version,
1173                    );
1174                    return Ok(Some(TreeOutcome::new(updated)));
1175                }
1176
1177                // At this point, no alternative block has been triggered, so we need effectively
1178                // unwind the _canonical_ chain to the FCU's head, which is part of the canonical
1179                // chain. We need to update the latest block state to reflect the
1180                // canonical ancestor. This ensures that state providers and the
1181                // transaction pool operate with the correct chain state after
1182                // forkchoice update processing.
1183
1184                if self.config.unwind_canonical_header() {
1185                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1186                }
1187            }
1188
1189            // According to the Engine API specification, client software MAY skip an update of the
1190            // forkchoice state and MUST NOT begin a payload build process if
1191            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1192            // of canonical chain, i.e. the ancestor passed payload validation process
1193            // and deemed `VALID`. In the case of such an event, client software MUST
1194            // return `{payloadStatus: {status: VALID, latestValidHash:
1195            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1196
1197            // The head block is already canonical and we're not processing payload attributes,
1198            // so we're not triggering a payload job and can return right away
1199
1200            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1201                PayloadStatusEnum::Valid,
1202                Some(state.head_block_hash),
1203            )));
1204            return Ok(Some(outcome));
1205        }
1206
1207        // Ensure we can apply a new chain update for the head block
1208        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1209            let tip = chain_update.tip().clone_sealed_header();
1210            self.on_canonical_chain_update(chain_update);
1211
1212            // Update the safe and finalized blocks and ensure their values are valid
1213            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1214                // safe or finalized hashes are invalid
1215                return Ok(Some(TreeOutcome::new(outcome)));
1216            }
1217
1218            if let Some(attr) = attrs {
1219                // Clone only when we actually need to process the attributes
1220                let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1221                return Ok(Some(TreeOutcome::new(updated)));
1222            }
1223
1224            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1225                PayloadStatusEnum::Valid,
1226                Some(state.head_block_hash),
1227            )));
1228            return Ok(Some(outcome));
1229        }
1230
1231        Ok(None)
1232    }
1233
1234    /// Handles the case where the head block is missing and needs to be downloaded.
1235    ///
1236    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1237    /// Returns a `TreeOutcome` with syncing status and download event.
1238    fn handle_missing_block(
1239        &self,
1240        state: ForkchoiceState,
1241    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1242        // We don't have the block to perform the forkchoice update
1243        // We assume the FCU is valid and at least the head is missing,
1244        // so we need to start syncing to it
1245        //
1246        // find the appropriate target to sync to, if we don't have the safe block hash then we
1247        // start syncing to the safe block via backfill first
1248        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1249        // check that safe block is valid and missing
1250        !state.safe_block_hash.is_zero() &&
1251        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1252        {
1253            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1254            state.safe_block_hash
1255        } else {
1256            state.head_block_hash
1257        };
1258
1259        let target = self.lowest_buffered_ancestor_or(target);
1260        trace!(target: "engine::tree", %target, "downloading missing block");
1261
1262        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1263            PayloadStatusEnum::Syncing,
1264        )))
1265        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1266    }
1267
1268    /// Attempts to receive the next engine request.
1269    ///
1270    /// If there's currently no persistence action in progress, this will block until a new request
1271    /// is received. If there's a persistence action in progress, this will try to receive the
1272    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1273    /// received in time.
1274    ///
1275    /// Returns an error if the engine channel is disconnected.
1276    #[expect(clippy::type_complexity)]
1277    fn try_recv_engine_message(
1278        &self,
1279    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1280        if self.persistence_state.in_progress() {
1281            // try to receive the next request with a timeout to not block indefinitely
1282            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1283                Ok(msg) => Ok(Some(msg)),
1284                Err(err) => match err {
1285                    RecvTimeoutError::Timeout => Ok(None),
1286                    RecvTimeoutError::Disconnected => Err(RecvError),
1287                },
1288            }
1289        } else {
1290            self.incoming.recv().map(Some)
1291        }
1292    }
1293
1294    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1295    /// the current persistence action while we're removing blocks.
1296    fn remove_blocks(&mut self, new_tip_num: u64) {
1297        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1298        if new_tip_num < self.persistence_state.last_persisted_block.number {
1299            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1300            let (tx, rx) = oneshot::channel();
1301            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1302            self.persistence_state.start_remove(new_tip_num, rx);
1303        }
1304    }
1305
1306    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1307    /// the current persistence action while we're saving blocks.
1308    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1309        if blocks_to_persist.is_empty() {
1310            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1311            return
1312        }
1313
1314        // NOTE: checked non-empty above
1315        let highest_num_hash = blocks_to_persist
1316            .iter()
1317            .max_by_key(|block| block.recovered_block().number())
1318            .map(|b| b.recovered_block().num_hash())
1319            .expect("Checked non-empty persisting blocks");
1320
1321        debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1322        let (tx, rx) = oneshot::channel();
1323        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1324
1325        self.persistence_state.start_save(highest_num_hash, rx);
1326    }
1327
1328    /// Attempts to advance the persistence state.
1329    ///
1330    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1331    /// or send a new persistence action if necessary.
1332    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1333        if self.persistence_state.in_progress() {
1334            let (mut rx, start_time, current_action) = self
1335                .persistence_state
1336                .rx
1337                .take()
1338                .expect("if a persistence task is in progress Receiver must be Some");
1339            // Check if persistence has complete
1340            match rx.try_recv() {
1341                Ok(last_persisted_hash_num) => {
1342                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1343                    let Some(BlockNumHash {
1344                        hash: last_persisted_block_hash,
1345                        number: last_persisted_block_number,
1346                    }) = last_persisted_hash_num
1347                    else {
1348                        // if this happened, then we persisted no blocks because we sent an
1349                        // empty vec of blocks
1350                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1351                        return Ok(())
1352                    };
1353
1354                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1355                    self.persistence_state
1356                        .finish(last_persisted_block_hash, last_persisted_block_number);
1357                    self.on_new_persisted_block()?;
1358                }
1359                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1360                Err(TryRecvError::Empty) => {
1361                    self.persistence_state.rx = Some((rx, start_time, current_action))
1362                }
1363            }
1364        }
1365
1366        if !self.persistence_state.in_progress() {
1367            if let Some(new_tip_num) = self.find_disk_reorg()? {
1368                self.remove_blocks(new_tip_num)
1369            } else if self.should_persist() {
1370                let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1371                self.persist_blocks(blocks_to_persist);
1372            }
1373        }
1374
1375        Ok(())
1376    }
1377
1378    /// Handles a message from the engine.
1379    fn on_engine_message(
1380        &mut self,
1381        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1382    ) -> Result<(), InsertBlockFatalError> {
1383        match msg {
1384            FromEngine::Event(event) => match event {
1385                FromOrchestrator::BackfillSyncStarted => {
1386                    debug!(target: "engine::tree", "received backfill sync started event");
1387                    self.backfill_sync_state = BackfillSyncState::Active;
1388                }
1389                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1390                    self.on_backfill_sync_finished(ctrl)?;
1391                }
1392            },
1393            FromEngine::Request(request) => {
1394                match request {
1395                    EngineApiRequest::InsertExecutedBlock(block) => {
1396                        let block_num_hash = block.recovered_block().num_hash();
1397                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1398                            // outdated block that can be skipped
1399                            return Ok(())
1400                        }
1401
1402                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1403                        let now = Instant::now();
1404
1405                        // if the parent is the canonical head, we can insert the block as the
1406                        // pending block
1407                        if self.state.tree_state.canonical_block_hash() ==
1408                            block.recovered_block().parent_hash()
1409                        {
1410                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1411                            self.canonical_in_memory_state.set_pending_block(block.clone());
1412                        }
1413
1414                        self.state.tree_state.insert_executed(block.clone());
1415                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1416                        self.emit_event(EngineApiEvent::BeaconConsensus(
1417                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1418                        ));
1419                    }
1420                    EngineApiRequest::Beacon(request) => {
1421                        match request {
1422                            BeaconEngineMessage::ForkchoiceUpdated {
1423                                state,
1424                                payload_attrs,
1425                                tx,
1426                                version,
1427                            } => {
1428                                let mut output =
1429                                    self.on_forkchoice_updated(state, payload_attrs, version);
1430
1431                                if let Ok(res) = &mut output {
1432                                    // track last received forkchoice state
1433                                    self.state
1434                                        .forkchoice_state_tracker
1435                                        .set_latest(state, res.outcome.forkchoice_status());
1436
1437                                    // emit an event about the handled FCU
1438                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1439                                        state,
1440                                        res.outcome.forkchoice_status(),
1441                                    ));
1442
1443                                    // handle the event if any
1444                                    self.on_maybe_tree_event(res.event.take())?;
1445                                }
1446
1447                                if let Err(err) =
1448                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1449                                {
1450                                    self.metrics
1451                                        .engine
1452                                        .failed_forkchoice_updated_response_deliveries
1453                                        .increment(1);
1454                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1455                                }
1456                            }
1457                            BeaconEngineMessage::NewPayload { payload, tx } => {
1458                                let mut output = self.on_new_payload(payload);
1459
1460                                let maybe_event =
1461                                    output.as_mut().ok().and_then(|out| out.event.take());
1462
1463                                // emit response
1464                                if let Err(err) =
1465                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1466                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1467                                    }))
1468                                {
1469                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1470                                    self.metrics
1471                                        .engine
1472                                        .failed_new_payload_response_deliveries
1473                                        .increment(1);
1474                                }
1475
1476                                // handle the event if any
1477                                self.on_maybe_tree_event(maybe_event)?;
1478                            }
1479                        }
1480                    }
1481                }
1482            }
1483            FromEngine::DownloadedBlocks(blocks) => {
1484                if let Some(event) = self.on_downloaded(blocks)? {
1485                    self.on_tree_event(event)?;
1486                }
1487            }
1488        }
1489        Ok(())
1490    }
1491
1492    /// Invoked if the backfill sync has finished to target.
1493    ///
1494    /// At this point we consider the block synced to the backfill target.
1495    ///
1496    /// Checks the tracked finalized block against the block on disk and requests another backfill
1497    /// run if the distance to the tip exceeds the threshold for another backfill run.
1498    ///
1499    /// This will also do the necessary housekeeping of the tree state, this includes:
1500    ///  - removing all blocks below the backfill height
1501    ///  - resetting the canonical in-memory state
1502    ///
1503    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1504    /// target block.
1505    fn on_backfill_sync_finished(
1506        &mut self,
1507        ctrl: ControlFlow,
1508    ) -> Result<(), InsertBlockFatalError> {
1509        debug!(target: "engine::tree", "received backfill sync finished event");
1510        self.backfill_sync_state = BackfillSyncState::Idle;
1511
1512        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1513        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1514            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1515            // update the `invalid_headers` cache with the new invalid header
1516            self.state.invalid_headers.insert(**bad_block);
1517
1518            // if this was an unwind then the target is the new height
1519            Some(*target)
1520        } else {
1521            // backfill height is the block number that the backfill finished at
1522            ctrl.block_number()
1523        };
1524
1525        // backfill height is the block number that the backfill finished at
1526        let Some(backfill_height) = backfill_height else { return Ok(()) };
1527
1528        // state house keeping after backfill sync
1529        // remove all executed blocks below the backfill height
1530        //
1531        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1532        // before that
1533        let Some(backfill_num_hash) = self
1534            .provider
1535            .block_hash(backfill_height)?
1536            .map(|hash| BlockNumHash { hash, number: backfill_height })
1537        else {
1538            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1539            return Ok(())
1540        };
1541
1542        if ctrl.is_unwind() {
1543            // the node reset so we need to clear everything above that height so that backfill
1544            // height is the new canonical block.
1545            self.state.tree_state.reset(backfill_num_hash)
1546        } else {
1547            self.state.tree_state.remove_until(
1548                backfill_num_hash,
1549                self.persistence_state.last_persisted_block.hash,
1550                Some(backfill_num_hash),
1551            );
1552        }
1553
1554        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1555        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1556
1557        // remove all buffered blocks below the backfill height
1558        self.state.buffer.remove_old_blocks(backfill_height);
1559        // we remove all entries because now we're synced to the backfill target and consider this
1560        // the canonical chain
1561        self.canonical_in_memory_state.clear_state();
1562
1563        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1564            // update the tracked chain height, after backfill sync both the canonical height and
1565            // persisted height are the same
1566            self.state.tree_state.set_canonical_head(new_head.num_hash());
1567            self.persistence_state.finish(new_head.hash(), new_head.number());
1568
1569            // update the tracked canonical head
1570            self.canonical_in_memory_state.set_canonical_head(new_head);
1571        }
1572
1573        // check if we need to run backfill again by comparing the most recent finalized height to
1574        // the backfill height
1575        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1576        else {
1577            return Ok(())
1578        };
1579        if sync_target_state.finalized_block_hash.is_zero() {
1580            // no finalized block, can't check distance
1581            return Ok(())
1582        }
1583        // get the block number of the finalized block, if we have it
1584        let newest_finalized = self
1585            .state
1586            .buffer
1587            .block(&sync_target_state.finalized_block_hash)
1588            .map(|block| block.number());
1589
1590        // The block number that the backfill finished at - if the progress or newest
1591        // finalized is None then we can't check the distance anyways.
1592        //
1593        // If both are Some, we perform another distance check and return the desired
1594        // backfill target
1595        if let Some(backfill_target) =
1596            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1597                // Determines whether or not we should run backfill again, in case
1598                // the new gap is still large enough and requires running backfill again
1599                self.backfill_sync_target(progress, finalized_number, None)
1600            })
1601        {
1602            // request another backfill run
1603            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1604                backfill_target.into(),
1605            )));
1606            return Ok(())
1607        };
1608
1609        // try to close the gap by executing buffered blocks that are child blocks of the new head
1610        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1611    }
1612
1613    /// Attempts to make the given target canonical.
1614    ///
1615    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1616    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1617        if let Some(chain_update) = self.on_new_head(target)? {
1618            self.on_canonical_chain_update(chain_update);
1619        }
1620
1621        Ok(())
1622    }
1623
1624    /// Convenience function to handle an optional tree event.
1625    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1626        if let Some(event) = event {
1627            self.on_tree_event(event)?;
1628        }
1629
1630        Ok(())
1631    }
1632
1633    /// Handles a tree event.
1634    ///
1635    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1636    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1637        match event {
1638            TreeEvent::TreeAction(action) => match action {
1639                TreeAction::MakeCanonical { sync_target_head } => {
1640                    self.make_canonical(sync_target_head)?;
1641                }
1642            },
1643            TreeEvent::BackfillAction(action) => {
1644                self.emit_event(EngineApiEvent::BackfillAction(action));
1645            }
1646            TreeEvent::Download(action) => {
1647                self.emit_event(EngineApiEvent::Download(action));
1648            }
1649        }
1650
1651        Ok(())
1652    }
1653
1654    /// Emits an outgoing event to the engine.
1655    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1656        let event = event.into();
1657
1658        if event.is_backfill_action() {
1659            debug_assert_eq!(
1660                self.backfill_sync_state,
1661                BackfillSyncState::Idle,
1662                "backfill action should only be emitted when backfill is idle"
1663            );
1664
1665            if self.persistence_state.in_progress() {
1666                // backfill sync and persisting data are mutually exclusive, so we can't start
1667                // backfill while we're still persisting
1668                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1669                return
1670            }
1671
1672            self.backfill_sync_state = BackfillSyncState::Pending;
1673            self.metrics.engine.pipeline_runs.increment(1);
1674            debug!(target: "engine::tree", "emitting backfill action event");
1675        }
1676
1677        let _ = self.outgoing.send(event).inspect_err(
1678            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1679        );
1680    }
1681
1682    /// Returns true if the canonical chain length minus the last persisted
1683    /// block is greater than or equal to the persistence threshold and
1684    /// backfill is not running.
1685    pub const fn should_persist(&self) -> bool {
1686        if !self.backfill_sync_state.is_idle() {
1687            // can't persist if backfill is running
1688            return false
1689        }
1690
1691        let min_block = self.persistence_state.last_persisted_block.number;
1692        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1693            self.config.persistence_threshold()
1694    }
1695
1696    /// Returns a batch of consecutive canonical blocks to persist in the range
1697    /// `(last_persisted_number .. canonical_head - threshold]`. The expected
1698    /// order is oldest -> newest.
1699    ///
1700    /// If any blocks are missing trie updates, all blocks are persisted, not taking `threshold`
1701    /// into account.
1702    ///
1703    /// For those blocks that didn't have the trie updates calculated, runs the state root
1704    /// calculation, and saves the trie updates.
1705    ///
1706    /// Returns an error if the state root calculation fails.
1707    fn get_canonical_blocks_to_persist(
1708        &mut self,
1709    ) -> Result<Vec<ExecutedBlockWithTrieUpdates<N>>, AdvancePersistenceError> {
1710        // We will calculate the state root using the database, so we need to be sure there are no
1711        // changes
1712        debug_assert!(!self.persistence_state.in_progress());
1713
1714        let mut blocks_to_persist = Vec::new();
1715        let mut current_hash = self.state.tree_state.canonical_block_hash();
1716        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1717        let canonical_head_number = self.state.tree_state.canonical_block_number();
1718        let all_blocks_have_trie_updates = self
1719            .state
1720            .tree_state
1721            .blocks_by_hash
1722            .values()
1723            .all(|block| block.trie_updates().is_some());
1724
1725        let target_number = if all_blocks_have_trie_updates {
1726            // Persist only up to block buffer target if all blocks have trie updates
1727            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1728        } else {
1729            // Persist all blocks if any block is missing trie updates
1730            canonical_head_number
1731        };
1732
1733        debug!(
1734            target: "engine::tree",
1735            ?current_hash,
1736            ?last_persisted_number,
1737            ?canonical_head_number,
1738            ?all_blocks_have_trie_updates,
1739            ?target_number,
1740            "Returning canonical blocks to persist"
1741        );
1742        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1743            if block.recovered_block().number() <= last_persisted_number {
1744                break;
1745            }
1746
1747            if block.recovered_block().number() <= target_number {
1748                blocks_to_persist.push(block.clone());
1749            }
1750
1751            current_hash = block.recovered_block().parent_hash();
1752        }
1753
1754        // Reverse the order so that the oldest block comes first
1755        blocks_to_persist.reverse();
1756
1757        // Calculate missing trie updates
1758        for block in &mut blocks_to_persist {
1759            if block.trie.is_present() {
1760                continue
1761            }
1762
1763            debug!(
1764                target: "engine::tree",
1765                block = ?block.recovered_block().num_hash(),
1766                "Calculating trie updates before persisting"
1767            );
1768
1769            let provider = self
1770                .state_provider_builder(block.recovered_block().parent_hash())?
1771                .ok_or(AdvancePersistenceError::MissingAncestor(
1772                    block.recovered_block().parent_hash(),
1773                ))?
1774                .build()?;
1775
1776            let mut trie_input = self.compute_trie_input(
1777                self.persisting_kind_for(block.recovered_block.block_with_parent()),
1778                self.provider.database_provider_ro()?,
1779                block.recovered_block().parent_hash(),
1780                None,
1781            )?;
1782            // Extend with block we are generating trie updates for.
1783            trie_input.append_ref(block.hashed_state());
1784            let (_root, updates) = provider.state_root_from_nodes_with_updates(trie_input)?;
1785            debug_assert_eq!(_root, block.recovered_block().state_root());
1786
1787            // Update trie updates in both tree state and blocks to persist that we return
1788            let trie_updates = Arc::new(updates);
1789            let tree_state_block = self
1790                .state
1791                .tree_state
1792                .blocks_by_hash
1793                .get_mut(&block.recovered_block().hash())
1794                .expect("blocks to persist are constructed from tree state blocks");
1795            tree_state_block.trie.set_present(trie_updates.clone());
1796            block.trie.set_present(trie_updates);
1797        }
1798
1799        Ok(blocks_to_persist)
1800    }
1801
1802    /// This clears the blocks from the in-memory tree state that have been persisted to the
1803    /// database.
1804    ///
1805    /// This also updates the canonical in-memory state to reflect the newest persisted block
1806    /// height.
1807    ///
1808    /// Assumes that `finish` has been called on the `persistence_state` at least once
1809    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1810        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1811        // state.
1812        if let Some(remove_above) = self.find_disk_reorg()? {
1813            self.remove_blocks(remove_above);
1814            return Ok(())
1815        }
1816
1817        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1818        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1819        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1820            number: self.persistence_state.last_persisted_block.number,
1821            hash: self.persistence_state.last_persisted_block.hash,
1822        });
1823        Ok(())
1824    }
1825
1826    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1827    ///
1828    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1829    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1830    /// has in memory.
1831    ///
1832    /// For finalized blocks, this will return `None`.
1833    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1834        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1835        // check memory first
1836        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1837            return Ok(Some(block.block.clone()))
1838        }
1839
1840        let (block, senders) = self
1841            .provider
1842            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1843            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1844            .split_sealed();
1845        let execution_output = self
1846            .provider
1847            .get_state(block.header().number())?
1848            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1849        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1850
1851        Ok(Some(ExecutedBlock {
1852            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1853            execution_output: Arc::new(execution_output),
1854            hashed_state: Arc::new(hashed_state),
1855        }))
1856    }
1857
1858    /// Return sealed block header from in-memory state or database by hash.
1859    fn sealed_header_by_hash(
1860        &self,
1861        hash: B256,
1862    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1863        // check memory first
1864        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1865
1866        if header.is_some() {
1867            Ok(header)
1868        } else {
1869            self.provider.sealed_header_by_hash(hash)
1870        }
1871    }
1872
1873    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1874    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1875    /// not exist in the buffer, this returns the hash that is passed in.
1876    ///
1877    /// Returns the parent hash of the block itself if the block is buffered and has no other
1878    /// buffered ancestors.
1879    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1880        self.state
1881            .buffer
1882            .lowest_ancestor(&hash)
1883            .map(|block| block.parent_hash())
1884            .unwrap_or_else(|| hash)
1885    }
1886
1887    /// If validation fails, the response MUST contain the latest valid hash:
1888    ///
1889    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1890    ///     conditions:
1891    ///     - It is fully validated and deemed VALID
1892    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1893    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1894    ///     conditions are satisfied by a `PoW` block.
1895    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1896    ///     the above conditions.
1897    fn latest_valid_hash_for_invalid_payload(
1898        &mut self,
1899        parent_hash: B256,
1900    ) -> ProviderResult<Option<B256>> {
1901        // Check if parent exists in side chain or in canonical chain.
1902        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1903            return Ok(Some(parent_hash))
1904        }
1905
1906        // iterate over ancestors in the invalid cache
1907        // until we encounter the first valid ancestor
1908        let mut current_hash = parent_hash;
1909        let mut current_block = self.state.invalid_headers.get(&current_hash);
1910        while let Some(block_with_parent) = current_block {
1911            current_hash = block_with_parent.parent;
1912            current_block = self.state.invalid_headers.get(&current_hash);
1913
1914            // If current_header is None, then the current_hash does not have an invalid
1915            // ancestor in the cache, check its presence in blockchain tree
1916            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1917                return Ok(Some(current_hash))
1918            }
1919        }
1920        Ok(None)
1921    }
1922
1923    /// Prepares the invalid payload response for the given hash, checking the
1924    /// database for the parent hash and populating the payload status with the latest valid hash
1925    /// according to the engine api spec.
1926    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1927        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1928        // PoW block, which we need to identify by looking at the parent's block difficulty
1929        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1930            !parent.difficulty().is_zero()
1931        {
1932            parent_hash = B256::ZERO;
1933        }
1934
1935        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1936        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1937            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1938        })
1939        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1940    }
1941
1942    /// Returns true if the given hash is the last received sync target block.
1943    ///
1944    /// See [`ForkchoiceStateTracker::sync_target_state`]
1945    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1946        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1947            return target.head_block_hash == block_hash
1948        }
1949        false
1950    }
1951
1952    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1953    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1954    ///
1955    /// Returns a payload status response according to the engine API spec if the block is known to
1956    /// be invalid.
1957    fn check_invalid_ancestor_with_head(
1958        &mut self,
1959        check: B256,
1960        head: &SealedBlock<N::Block>,
1961    ) -> ProviderResult<Option<PayloadStatus>> {
1962        // check if the check hash was previously marked as invalid
1963        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1964
1965        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1966    }
1967
1968    /// Invoked when a new payload received is invalid.
1969    fn on_invalid_new_payload(
1970        &mut self,
1971        head: SealedBlock<N::Block>,
1972        invalid: BlockWithParent,
1973    ) -> ProviderResult<PayloadStatus> {
1974        // populate the latest valid hash field
1975        let status = self.prepare_invalid_response(invalid.parent)?;
1976
1977        // insert the head block into the invalid header cache
1978        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1979        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1980
1981        Ok(status)
1982    }
1983
1984    /// Finds any invalid ancestor for the given payload.
1985    ///
1986    /// This function walks up the chain of buffered ancestors from the payload's block
1987    /// hash and checks if any ancestor is marked as invalid in the tree state.
1988    ///
1989    /// The check works by:
1990    /// 1. Finding the lowest buffered ancestor for the given block hash
1991    /// 2. If the ancestor is the same as the block hash itself, using the parent hash instead
1992    /// 3. Checking if this ancestor is in the `invalid_headers` map
1993    ///
1994    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
1995    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1996        let parent_hash = payload.parent_hash();
1997        let block_hash = payload.block_hash();
1998        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1999        if lowest_buffered_ancestor == block_hash {
2000            lowest_buffered_ancestor = parent_hash;
2001        }
2002
2003        // Check if the block has an invalid ancestor
2004        self.state.invalid_headers.get(&lowest_buffered_ancestor)
2005    }
2006
2007    /// Handles a payload that has an invalid ancestor.
2008    ///
2009    /// This function validates the payload and processes it according to whether it's
2010    /// well-formed or malformed:
2011    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
2012    ///    known-bad block, which violates consensus rules
2013    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
2014    ///    validated due to its own structural issues
2015    fn handle_invalid_ancestor_payload(
2016        &mut self,
2017        payload: T::ExecutionData,
2018        invalid: BlockWithParent,
2019    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2020        let parent_hash = payload.parent_hash();
2021
2022        // Here we might have 2 cases
2023        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2024        //    remember it as invalid
2025        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2026        //    an error and forget it
2027        let block = match self.payload_validator.ensure_well_formed_payload(payload) {
2028            Ok(block) => block,
2029            Err(error) => return Ok(self.on_new_payload_error(error, parent_hash)?),
2030        };
2031
2032        Ok(self.on_invalid_new_payload(block.into_sealed_block(), invalid)?)
2033    }
2034
2035    /// Checks if the given `head` points to an invalid header, which requires a specific response
2036    /// to a forkchoice update.
2037    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2038        // check if the head was previously marked as invalid
2039        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2040
2041        // Try to prepare invalid response, but handle errors gracefully
2042        match self.prepare_invalid_response(header.parent) {
2043            Ok(status) => Ok(Some(status)),
2044            Err(err) => {
2045                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2046                // Return a basic invalid status without latest valid hash
2047                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2048                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2049                })))
2050            }
2051        }
2052    }
2053
2054    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2055    /// and block body itself.
2056    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
2057        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2058            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2059            return Err(e)
2060        }
2061
2062        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
2063            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2064            return Err(e)
2065        }
2066
2067        Ok(())
2068    }
2069
2070    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2071    #[instrument(level = "trace", skip(self), target = "engine::tree")]
2072    fn try_connect_buffered_blocks(
2073        &mut self,
2074        parent: BlockNumHash,
2075    ) -> Result<(), InsertBlockFatalError> {
2076        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2077
2078        if blocks.is_empty() {
2079            // nothing to append
2080            return Ok(())
2081        }
2082
2083        let now = Instant::now();
2084        let block_count = blocks.len();
2085        for child in blocks {
2086            let child_num_hash = child.num_hash();
2087            match self.insert_block(child) {
2088                Ok(res) => {
2089                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2090                    if self.is_sync_target_head(child_num_hash.hash) &&
2091                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2092                    {
2093                        self.make_canonical(child_num_hash.hash)?;
2094                    }
2095                }
2096                Err(err) => {
2097                    if let InsertPayloadError::Block(err) = err {
2098                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2099                        if let Err(fatal) = self.on_insert_block_error(err) {
2100                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2101                            return Err(fatal)
2102                        }
2103                    }
2104                }
2105            }
2106        }
2107
2108        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2109        Ok(())
2110    }
2111
2112    /// Pre-validates the block and inserts it into the buffer.
2113    fn buffer_block(
2114        &mut self,
2115        block: RecoveredBlock<N::Block>,
2116    ) -> Result<(), InsertBlockError<N::Block>> {
2117        if let Err(err) = self.validate_block(&block) {
2118            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
2119        }
2120        self.state.buffer.insert_block(block);
2121        Ok(())
2122    }
2123
2124    /// Returns true if the distance from the local tip to the block is greater than the configured
2125    /// threshold.
2126    ///
2127    /// If the `local_tip` is greater than the `block`, then this will return false.
2128    #[inline]
2129    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2130        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2131    }
2132
2133    /// Returns how far the local tip is from the given block. If the local tip is at the same
2134    /// height or its block number is greater than the given block, this returns None.
2135    #[inline]
2136    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2137        if block > local_tip {
2138            Some(block - local_tip)
2139        } else {
2140            None
2141        }
2142    }
2143
2144    /// Returns the target hash to sync to if the distance from the local tip to the block is
2145    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2146    /// that block already).
2147    ///
2148    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2149    /// (missing) finalized block.
2150    fn backfill_sync_target(
2151        &self,
2152        canonical_tip_num: u64,
2153        target_block_number: u64,
2154        downloaded_block: Option<BlockNumHash>,
2155    ) -> Option<B256> {
2156        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2157
2158        // check if the downloaded block is the tracked finalized block
2159        let exceeds_backfill_threshold =
2160            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2161                // if we downloaded the finalized block we can now check how far we're off
2162                (Some(downloaded_block), Some(state))
2163                    if downloaded_block.hash == state.finalized_block_hash =>
2164                {
2165                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2166                }
2167                _ => match sync_target_state
2168                    .as_ref()
2169                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2170                {
2171                    Some(buffered_finalized) => {
2172                        // if we have buffered the finalized block, we should check how far we're
2173                        // off
2174                        self.exceeds_backfill_run_threshold(
2175                            canonical_tip_num,
2176                            buffered_finalized.number(),
2177                        )
2178                    }
2179                    None => {
2180                        // check if the distance exceeds the threshold for backfill sync
2181                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2182                    }
2183                },
2184            };
2185
2186        // if the number of missing blocks is greater than the max, trigger backfill
2187        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2188            // if we have already canonicalized the finalized block, we should skip backfill
2189            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2190                Err(err) => {
2191                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2192                }
2193                Ok(None) => {
2194                    // ensure the finalized block is known (not the zero hash)
2195                    if !state.finalized_block_hash.is_zero() {
2196                        // we don't have the block yet and the distance exceeds the allowed
2197                        // threshold
2198                        return Some(state.finalized_block_hash)
2199                    }
2200
2201                    // OPTIMISTIC SYNCING
2202                    //
2203                    // It can happen when the node is doing an
2204                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2205                    // but is expecting the EL to sync as high
2206                    // as possible before finalizing.
2207                    //
2208                    // This usually doesn't happen on ETH mainnet since CLs use the more
2209                    // secure checkpoint syncing.
2210                    //
2211                    // However, optimism chains will do this. The risk of a reorg is however
2212                    // low.
2213                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2214                    return Some(state.head_block_hash)
2215                }
2216                Ok(Some(_)) => {
2217                    // we're fully synced to the finalized block
2218                }
2219            }
2220        }
2221
2222        None
2223    }
2224
2225    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2226    /// happen if a reorg is happening while we are persisting a block.
2227    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2228        let mut canonical = self.state.tree_state.current_canonical_head;
2229        let mut persisted = self.persistence_state.last_persisted_block;
2230
2231        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2232            Ok(self
2233                .sealed_header_by_hash(num_hash.hash)?
2234                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2235                .parent_num_hash())
2236        };
2237
2238        // Happy path, canonical chain is ahead or equal to persisted chain.
2239        // Walk canonical chain back to make sure that it connects to persisted chain.
2240        while canonical.number > persisted.number {
2241            canonical = parent_num_hash(canonical)?;
2242        }
2243
2244        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2245        if canonical == persisted {
2246            return Ok(None);
2247        }
2248
2249        // At this point, we know that `persisted` block can't be reached by walking the canonical
2250        // chain back. In this case we need to truncate it to the first canonical block it connects
2251        // to.
2252
2253        // Firstly, walk back until we reach the same height as `canonical`.
2254        while persisted.number > canonical.number {
2255            persisted = parent_num_hash(persisted)?;
2256        }
2257
2258        debug_assert_eq!(persisted.number, canonical.number);
2259
2260        // Now walk both chains back until we find a common ancestor.
2261        while persisted.hash != canonical.hash {
2262            canonical = parent_num_hash(canonical)?;
2263            persisted = parent_num_hash(persisted)?;
2264        }
2265
2266        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2267
2268        Ok(Some(persisted.number))
2269    }
2270
2271    /// Invoked when we the canonical chain has been updated.
2272    ///
2273    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2274    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2275        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2276        let start = Instant::now();
2277
2278        // update the tracked canonical head
2279        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2280
2281        let tip = chain_update.tip().clone_sealed_header();
2282        let notification = chain_update.to_chain_notification();
2283
2284        // reinsert any missing reorged blocks
2285        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2286            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2287            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2288            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2289
2290            self.update_reorg_metrics(old.len());
2291            self.reinsert_reorged_blocks(new.clone());
2292            // Try reinserting the reorged canonical chain. This is only possible if we have
2293            // `persisted_trie_updates` for those blocks.
2294            let old = old
2295                .iter()
2296                .filter_map(|block| {
2297                    let trie = self
2298                        .state
2299                        .tree_state
2300                        .persisted_trie_updates
2301                        .get(&block.recovered_block.hash())?
2302                        .1
2303                        .clone();
2304                    Some(ExecutedBlockWithTrieUpdates {
2305                        block: block.clone(),
2306                        trie: ExecutedTrieUpdates::Present(trie),
2307                    })
2308                })
2309                .collect::<Vec<_>>();
2310            self.reinsert_reorged_blocks(old);
2311        }
2312
2313        // update the tracked in-memory state with the new chain
2314        self.canonical_in_memory_state.update_chain(chain_update);
2315        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2316
2317        // Update metrics based on new tip
2318        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2319
2320        // sends an event to all active listeners about the new canonical chain
2321        self.canonical_in_memory_state.notify_canon_state(notification);
2322
2323        // emit event
2324        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2325            Box::new(tip),
2326            start.elapsed(),
2327        ));
2328    }
2329
2330    /// This updates metrics based on the given reorg length.
2331    fn update_reorg_metrics(&self, old_chain_length: usize) {
2332        self.metrics.tree.reorgs.increment(1);
2333        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2334    }
2335
2336    /// This reinserts any blocks in the new chain that do not already exist in the tree
2337    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2338        for block in new_chain {
2339            if self
2340                .state
2341                .tree_state
2342                .executed_block_by_hash(block.recovered_block().hash())
2343                .is_none()
2344            {
2345                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2346                self.state.tree_state.insert_executed(block);
2347            }
2348        }
2349    }
2350
2351    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2352    ///
2353    /// This mainly compares the missing parent of the downloaded block with the current canonical
2354    /// tip, and decides whether or not backfill sync should be triggered.
2355    fn on_disconnected_downloaded_block(
2356        &self,
2357        downloaded_block: BlockNumHash,
2358        missing_parent: BlockNumHash,
2359        head: BlockNumHash,
2360    ) -> Option<TreeEvent> {
2361        // compare the missing parent with the canonical tip
2362        if let Some(target) =
2363            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2364        {
2365            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2366            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2367        }
2368
2369        // continue downloading the missing parent
2370        //
2371        // this happens if either:
2372        //  * the missing parent block num < canonical tip num
2373        //    * this case represents a missing block on a fork that is shorter than the canonical
2374        //      chain
2375        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2376        //    less than the backfill threshold
2377        //    * this case represents a potentially long range of blocks to download and execute
2378        let request = if let Some(distance) =
2379            self.distance_from_local_tip(head.number, missing_parent.number)
2380        {
2381            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2382            DownloadRequest::BlockRange(missing_parent.hash, distance)
2383        } else {
2384            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2385            // This happens when the missing parent is on an outdated
2386            // sidechain and we can only download the missing block itself
2387            DownloadRequest::single_block(missing_parent.hash)
2388        };
2389
2390        Some(TreeEvent::Download(request))
2391    }
2392
2393    /// Invoked with a block downloaded from the network
2394    ///
2395    /// Returns an event with the appropriate action to take, such as:
2396    ///  - download more missing blocks
2397    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2398    #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2399    fn on_downloaded_block(
2400        &mut self,
2401        block: RecoveredBlock<N::Block>,
2402    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2403        let block_num_hash = block.num_hash();
2404        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2405        if self
2406            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2407            .is_some()
2408        {
2409            return Ok(None)
2410        }
2411
2412        if !self.backfill_sync_state.is_idle() {
2413            return Ok(None)
2414        }
2415
2416        // try to append the block
2417        match self.insert_block(block) {
2418            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2419                if self.is_sync_target_head(block_num_hash.hash) {
2420                    trace!(target: "engine::tree", "appended downloaded sync target block");
2421
2422                    // we just inserted the current sync target block, we can try to make it
2423                    // canonical
2424                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2425                        sync_target_head: block_num_hash.hash,
2426                    })))
2427                }
2428                trace!(target: "engine::tree", "appended downloaded block");
2429                self.try_connect_buffered_blocks(block_num_hash)?;
2430            }
2431            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2432                // block is not connected to the canonical head, we need to download
2433                // its missing branch first
2434                return Ok(self.on_disconnected_downloaded_block(
2435                    block_num_hash,
2436                    missing_ancestor,
2437                    head,
2438                ))
2439            }
2440            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2441                trace!(target: "engine::tree", "downloaded block already executed");
2442            }
2443            Err(err) => {
2444                if let InsertPayloadError::Block(err) = err {
2445                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2446                    if let Err(fatal) = self.on_insert_block_error(err) {
2447                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2448                        return Err(fatal)
2449                    }
2450                }
2451            }
2452        }
2453        Ok(None)
2454    }
2455
2456    /// Inserts a payload into the tree and executes it.
2457    ///
2458    /// This function validates the payload's basic structure, then executes it using the
2459    /// payload validator. The execution includes running all transactions in the payload
2460    /// and validating the resulting state transitions.
2461    ///
2462    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2463    /// or `InsertPayloadError` if validation or execution failed.
2464    fn insert_payload(
2465        &mut self,
2466        payload: T::ExecutionData,
2467    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2468        self.insert_block_or_payload(
2469            payload.block_with_parent(),
2470            payload,
2471            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2472            |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2473        )
2474    }
2475
2476    fn insert_block(
2477        &mut self,
2478        block: RecoveredBlock<N::Block>,
2479    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2480        self.insert_block_or_payload(
2481            block.block_with_parent(),
2482            block,
2483            |validator, block, ctx| validator.validate_block(block, ctx),
2484            |_, block| Ok(block),
2485        )
2486    }
2487
2488    /// Inserts a block or payload into the blockchain tree with full execution.
2489    ///
2490    /// This is a generic function that handles both blocks and payloads by accepting
2491    /// a block identifier, input data, and execution/validation functions. It performs
2492    /// comprehensive checks and execution:
2493    ///
2494    /// - Validates that the block doesn't already exist in the tree
2495    /// - Ensures parent state is available, buffering if necessary
2496    /// - Executes the block/payload using the provided execute function
2497    /// - Handles both canonical and fork chain insertions
2498    /// - Updates pending block state when appropriate
2499    /// - Emits consensus engine events and records metrics
2500    ///
2501    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2502    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2503    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2504    fn insert_block_or_payload<Input, Err>(
2505        &mut self,
2506        block_id: BlockWithParent,
2507        input: Input,
2508        execute: impl FnOnce(
2509            &mut V,
2510            Input,
2511            TreeCtx<'_, N>,
2512        ) -> Result<ExecutedBlockWithTrieUpdates<N>, Err>,
2513        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2514    ) -> Result<InsertPayloadOk, Err>
2515    where
2516        Err: From<InsertBlockError<N::Block>>,
2517    {
2518        let block_insert_start = Instant::now();
2519        let block_num_hash = block_id.block;
2520        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2521
2522        match self.sealed_header_by_hash(block_num_hash.hash) {
2523            Err(err) => {
2524                let block = convert_to_block(self, input)?;
2525                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2526            }
2527            Ok(Some(_)) => {
2528                // We now assume that we already have this block in the tree. However, we need to
2529                // run the conversion to ensure that the block hash is valid.
2530                convert_to_block(self, input)?;
2531                return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2532            }
2533            _ => {}
2534        };
2535
2536        // Ensure that the parent state is available.
2537        match self.state_provider_builder(block_id.parent) {
2538            Err(err) => {
2539                let block = convert_to_block(self, input)?;
2540                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2541            }
2542            Ok(None) => {
2543                let block = convert_to_block(self, input)?;
2544
2545                // we don't have the state required to execute this block, buffering it and find the
2546                // missing parent block
2547                let missing_ancestor = self
2548                    .state
2549                    .buffer
2550                    .lowest_ancestor(&block.parent_hash())
2551                    .map(|block| block.parent_num_hash())
2552                    .unwrap_or_else(|| block.parent_num_hash());
2553
2554                self.state.buffer.insert_block(block);
2555
2556                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2557                    head: self.state.tree_state.current_canonical_head,
2558                    missing_ancestor,
2559                }))
2560            }
2561            Ok(Some(_)) => {}
2562        }
2563
2564        // determine whether we are on a fork chain
2565        let is_fork = match self.is_fork(block_id) {
2566            Err(err) => {
2567                let block = convert_to_block(self, input)?;
2568                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2569            }
2570            Ok(is_fork) => is_fork,
2571        };
2572
2573        let ctx =
2574            TreeCtx::new(&mut self.state, &self.persistence_state, &self.canonical_in_memory_state);
2575
2576        let start = Instant::now();
2577
2578        let executed = execute(&mut self.payload_validator, input, ctx)?;
2579
2580        // if the parent is the canonical head, we can insert the block as the pending block
2581        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2582        {
2583            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2584            self.canonical_in_memory_state.set_pending_block(executed.clone());
2585        }
2586
2587        self.state.tree_state.insert_executed(executed.clone());
2588        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2589
2590        // emit insert event
2591        let elapsed = start.elapsed();
2592        let engine_event = if is_fork {
2593            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2594        } else {
2595            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2596        };
2597        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2598
2599        self.metrics
2600            .engine
2601            .block_insert_total_duration
2602            .record(block_insert_start.elapsed().as_secs_f64());
2603        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2604        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2605    }
2606
2607    /// Computes the trie input at the provided parent hash.
2608    ///
2609    /// The goal of this function is to take in-memory blocks and generate a [`TrieInput`] that
2610    /// serves as an overlay to the database blocks.
2611    ///
2612    /// It works as follows:
2613    /// 1. Collect in-memory blocks that are descendants of the provided parent hash using
2614    ///    [`TreeState::blocks_by_hash`].
2615    /// 2. If the persistence is in progress, and the block that we're computing the trie input for
2616    ///    is a descendant of the currently persisting blocks, we need to be sure that in-memory
2617    ///    blocks are not overlapping with the database blocks that may have been already persisted.
2618    ///    To do that, we're filtering out in-memory blocks that are lower than the highest database
2619    ///    block.
2620    /// 3. Once in-memory blocks are collected and optionally filtered, we compute the
2621    ///    [`HashedPostState`] from them.
2622    fn compute_trie_input<TP: DBProvider + BlockNumReader>(
2623        &self,
2624        persisting_kind: PersistingKind,
2625        provider: TP,
2626        parent_hash: B256,
2627        allocated_trie_input: Option<TrieInput>,
2628    ) -> ProviderResult<TrieInput> {
2629        // get allocated trie input or use a default trie input
2630        let mut input = allocated_trie_input.unwrap_or_default();
2631
2632        let best_block_number = provider.best_block_number()?;
2633
2634        let (mut historical, mut blocks) = self
2635            .state
2636            .tree_state
2637            .blocks_by_hash(parent_hash)
2638            .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2639
2640        // If the current block is a descendant of the currently persisting blocks, then we need to
2641        // filter in-memory blocks, so that none of them are already persisted in the database.
2642        if persisting_kind.is_descendant() {
2643            // Iterate over the blocks from oldest to newest.
2644            while let Some(block) = blocks.last() {
2645                let recovered_block = block.recovered_block();
2646                if recovered_block.number() <= best_block_number {
2647                    // Remove those blocks that lower than or equal to the highest database
2648                    // block.
2649                    blocks.pop();
2650                } else {
2651                    // If the block is higher than the best block number, stop filtering, as it's
2652                    // the first block that's not in the database.
2653                    break
2654                }
2655            }
2656
2657            historical = if let Some(block) = blocks.last() {
2658                // If there are any in-memory blocks left after filtering, set the anchor to the
2659                // parent of the oldest block.
2660                (block.recovered_block().number() - 1).into()
2661            } else {
2662                // Otherwise, set the anchor to the original provided parent hash.
2663                parent_hash.into()
2664            };
2665        }
2666
2667        if blocks.is_empty() {
2668            debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2669        } else {
2670            debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2671        }
2672
2673        // Convert the historical block to the block number.
2674        let block_number = provider
2675            .convert_hash_or_number(historical)?
2676            .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2677
2678        // Retrieve revert state for historical block.
2679        let revert_state = if block_number == best_block_number {
2680            // We do not check against the `last_block_number` here because
2681            // `HashedPostState::from_reverts` only uses the database tables, and not static files.
2682            debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2683            HashedPostState::default()
2684        } else {
2685            let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
2686                provider.tx_ref(),
2687                block_number + 1..,
2688            )
2689            .map_err(ProviderError::from)?;
2690            debug!(
2691                target: "engine::tree",
2692                block_number,
2693                best_block_number,
2694                accounts = revert_state.accounts.len(),
2695                storages = revert_state.storages.len(),
2696                "Non-empty revert state"
2697            );
2698            revert_state
2699        };
2700        input.append(revert_state);
2701
2702        // Extend with contents of parent in-memory blocks.
2703        input.extend_with_blocks(
2704            blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
2705        );
2706
2707        Ok(input)
2708    }
2709
2710    /// Handles an error that occurred while inserting a block.
2711    ///
2712    /// If this is a validation error this will mark the block as invalid.
2713    ///
2714    /// Returns the proper payload status response if the block is invalid.
2715    fn on_insert_block_error(
2716        &mut self,
2717        error: InsertBlockError<N::Block>,
2718    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2719        let (block, error) = error.split();
2720
2721        // if invalid block, we check the validation error. Otherwise return the fatal
2722        // error.
2723        let validation_err = error.ensure_validation_error()?;
2724
2725        // If the error was due to an invalid payload, the payload is added to the
2726        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2727        // returned.
2728        warn!(
2729            target: "engine::tree",
2730            invalid_hash=%block.hash(),
2731            invalid_number=block.number(),
2732            %validation_err,
2733            "Invalid block error on new payload",
2734        );
2735        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2736
2737        // keep track of the invalid header
2738        self.state.invalid_headers.insert(block.block_with_parent());
2739        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2740            Box::new(block),
2741        )));
2742
2743        Ok(PayloadStatus::new(
2744            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2745            latest_valid_hash,
2746        ))
2747    }
2748
2749    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2750    fn on_new_payload_error(
2751        &mut self,
2752        error: NewPayloadError,
2753        parent_hash: B256,
2754    ) -> ProviderResult<PayloadStatus> {
2755        error!(target: "engine::tree", %error, "Invalid payload");
2756        // we need to convert the error to a payload status (response to the CL)
2757
2758        let latest_valid_hash =
2759            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2760                // Engine-API rules:
2761                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2762                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2763                None
2764            } else {
2765                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2766            };
2767
2768        let status = PayloadStatusEnum::from(error);
2769        Ok(PayloadStatus::new(status, latest_valid_hash))
2770    }
2771
2772    /// Attempts to find the header for the given block hash if it is canonical.
2773    pub fn find_canonical_header(
2774        &self,
2775        hash: B256,
2776    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2777        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2778
2779        if canonical.is_none() {
2780            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2781        }
2782
2783        Ok(canonical)
2784    }
2785
2786    /// Updates the tracked finalized block if we have it.
2787    fn update_finalized_block(
2788        &self,
2789        finalized_block_hash: B256,
2790    ) -> Result<(), OnForkChoiceUpdated> {
2791        if finalized_block_hash.is_zero() {
2792            return Ok(())
2793        }
2794
2795        match self.find_canonical_header(finalized_block_hash) {
2796            Ok(None) => {
2797                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2798                // if the finalized block is not known, we can't update the finalized block
2799                return Err(OnForkChoiceUpdated::invalid_state())
2800            }
2801            Ok(Some(finalized)) => {
2802                if Some(finalized.num_hash()) !=
2803                    self.canonical_in_memory_state.get_finalized_num_hash()
2804                {
2805                    // we're also persisting the finalized block on disk so we can reload it on
2806                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2807                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2808                    self.canonical_in_memory_state.set_finalized(finalized);
2809                }
2810            }
2811            Err(err) => {
2812                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2813            }
2814        }
2815
2816        Ok(())
2817    }
2818
2819    /// Updates the tracked safe block if we have it
2820    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2821        if safe_block_hash.is_zero() {
2822            return Ok(())
2823        }
2824
2825        match self.find_canonical_header(safe_block_hash) {
2826            Ok(None) => {
2827                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2828                // if the safe block is not known, we can't update the safe block
2829                return Err(OnForkChoiceUpdated::invalid_state())
2830            }
2831            Ok(Some(safe)) => {
2832                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2833                    // we're also persisting the safe block on disk so we can reload it on
2834                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2835                    let _ = self.persistence.save_safe_block_number(safe.number());
2836                    self.canonical_in_memory_state.set_safe(safe);
2837                }
2838            }
2839            Err(err) => {
2840                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2841            }
2842        }
2843
2844        Ok(())
2845    }
2846
2847    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2848    /// made canonical.
2849    ///
2850    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2851    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2852    ///
2853    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2854    /// are consistent with the head block.
2855    fn ensure_consistent_forkchoice_state(
2856        &self,
2857        state: ForkchoiceState,
2858    ) -> Result<(), OnForkChoiceUpdated> {
2859        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2860        // after the head block is canonicalized.
2861        //
2862        // This ensures that the finalized block is consistent with the head block, i.e. the
2863        // finalized block is an ancestor of the head block.
2864        self.update_finalized_block(state.finalized_block_hash)?;
2865
2866        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2867        // after the head block is canonicalized.
2868        //
2869        // This ensures that the safe block is consistent with the head block, i.e. the safe
2870        // block is an ancestor of the head block.
2871        self.update_safe_block(state.safe_block_hash)
2872    }
2873
2874    /// Validates the payload attributes with respect to the header and fork choice state.
2875    ///
2876    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2877    /// return an error if the payload attributes are invalid.
2878    fn process_payload_attributes(
2879        &self,
2880        attrs: T::PayloadAttributes,
2881        head: &N::BlockHeader,
2882        state: ForkchoiceState,
2883        version: EngineApiMessageVersion,
2884    ) -> OnForkChoiceUpdated {
2885        if let Err(err) =
2886            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2887        {
2888            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2889            return OnForkChoiceUpdated::invalid_payload_attributes()
2890        }
2891
2892        // 8. Client software MUST begin a payload build process building on top of
2893        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2894        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2895        //    The build process is specified in the Payload building section.
2896        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2897            state.head_block_hash,
2898            attrs,
2899            version as u8,
2900        ) {
2901            Ok(attributes) => {
2902                // send the payload to the builder and return the receiver for the pending payload
2903                // id, initiating payload job is handled asynchronously
2904                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2905
2906                // Client software MUST respond to this method call in the following way:
2907                // {
2908                //      payloadStatus: {
2909                //          status: VALID,
2910                //          latestValidHash: forkchoiceState.headBlockHash,
2911                //          validationError: null
2912                //      },
2913                //      payloadId: buildProcessId
2914                // }
2915                //
2916                // if the payload is deemed VALID and the build process has begun.
2917                OnForkChoiceUpdated::updated_with_pending_payload_id(
2918                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2919                    pending_payload_id,
2920                )
2921            }
2922            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2923        }
2924    }
2925
2926    /// Remove all blocks up to __and including__ the given block number.
2927    ///
2928    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2929    /// those which have a fork point at or below the finalized hash.
2930    ///
2931    /// Canonical blocks below the upper bound will still be removed.
2932    pub(crate) fn remove_before(
2933        &mut self,
2934        upper_bound: BlockNumHash,
2935        finalized_hash: Option<B256>,
2936    ) -> ProviderResult<()> {
2937        // first fetch the finalized block number and then call the remove_before method on
2938        // tree_state
2939        let num = if let Some(hash) = finalized_hash {
2940            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2941        } else {
2942            None
2943        };
2944
2945        self.state.tree_state.remove_until(
2946            upper_bound,
2947            self.persistence_state.last_persisted_block.hash,
2948            num,
2949        );
2950        Ok(())
2951    }
2952
2953    /// Returns a builder for creating state providers for the given hash.
2954    ///
2955    /// This is an optimization for parallel execution contexts where we want to avoid
2956    /// creating state providers in the critical path.
2957    pub fn state_provider_builder(
2958        &self,
2959        hash: B256,
2960    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2961    where
2962        P: BlockReader + StateProviderFactory + StateReader + Clone,
2963    {
2964        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2965            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2966            // the block leads back to the canonical chain
2967            return Ok(Some(StateProviderBuilder::new(
2968                self.provider.clone(),
2969                historical,
2970                Some(blocks),
2971            )))
2972        }
2973
2974        // Check if the block is persisted
2975        if let Some(header) = self.provider.header(hash)? {
2976            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2977            // For persisted blocks, we create a builder that will fetch state directly from the
2978            // database
2979            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2980        }
2981
2982        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2983        Ok(None)
2984    }
2985}
2986
2987/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2988/// variant.
2989///
2990/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2991/// is valid or not.
2992#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2993pub enum BlockStatus {
2994    /// The block is valid and block extends canonical chain.
2995    Valid,
2996    /// The block may be valid and has an unknown missing ancestor.
2997    Disconnected {
2998        /// Current canonical head.
2999        head: BlockNumHash,
3000        /// The lowest ancestor block that is not connected to the canonical chain.
3001        missing_ancestor: BlockNumHash,
3002    },
3003}
3004
3005/// How a payload was inserted if it was valid.
3006///
3007/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
3008/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
3009#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3010pub enum InsertPayloadOk {
3011    /// The payload was valid, but we have already seen it.
3012    AlreadySeen(BlockStatus),
3013    /// The payload was valid and inserted into the tree.
3014    Inserted(BlockStatus),
3015}
3016
3017/// Whether or not the blocks are currently persisting and the input block is a descendant.
3018#[derive(Debug, Clone, Copy)]
3019pub enum PersistingKind {
3020    /// The blocks are not currently persisting.
3021    NotPersisting,
3022    /// The blocks are currently persisting but the input block is not a descendant.
3023    PersistingNotDescendant,
3024    /// The blocks are currently persisting and the input block is a descendant.
3025    PersistingDescendant,
3026}
3027
3028impl PersistingKind {
3029    /// Returns true if the parallel state root can be run.
3030    ///
3031    /// We only run the parallel state root if we are not currently persisting any blocks or
3032    /// persisting blocks that are all ancestors of the one we are calculating the state root for.
3033    pub const fn can_run_parallel_state_root(&self) -> bool {
3034        matches!(self, Self::NotPersisting | Self::PersistingDescendant)
3035    }
3036
3037    /// Returns true if the blocks are currently being persisted and the input block is a
3038    /// descendant.
3039    pub const fn is_descendant(&self) -> bool {
3040        matches!(self, Self::PersistingDescendant)
3041    }
3042}