reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use reth_chain_state::{
17    CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
18    NewCanonicalChain,
19};
20use reth_consensus::{Consensus, FullConsensus};
21use reth_engine_primitives::{
22    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
23    ForkchoiceStateTracker, OnForkChoiceUpdated,
24};
25use reth_errors::{ConsensusError, ProviderResult};
26use reth_evm::{ConfigureEvm, OnStateHook};
27use reth_payload_builder::PayloadBuilderHandle;
28use reth_payload_primitives::{
29    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
30};
31use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
32use reth_provider::{
33    BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
34    StateProviderFactory, StateReader, TransactionVariant, TrieReader,
35};
36use reth_revm::database::StateProviderDatabase;
37use reth_stages_api::ControlFlow;
38use revm::state::EvmState;
39use state::TreeState;
40use std::{
41    fmt::Debug,
42    ops,
43    sync::{
44        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
45        Arc,
46    },
47    time::Instant,
48};
49use tokio::sync::{
50    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
51    oneshot::{self, error::TryRecvError},
52};
53use tracing::*;
54
55mod block_buffer;
56mod cached_state;
57pub mod error;
58pub mod instrumented_state;
59mod invalid_headers;
60mod metrics;
61mod payload_processor;
62pub mod payload_validator;
63mod persistence_state;
64pub mod precompile_cache;
65#[cfg(test)]
66mod tests;
67#[expect(unused)]
68mod trie_updates;
69
70use crate::tree::error::AdvancePersistenceError;
71pub use block_buffer::BlockBuffer;
72pub use invalid_headers::InvalidHeaderCache;
73pub use payload_processor::*;
74pub use payload_validator::{BasicEngineValidator, EngineValidator};
75pub use persistence_state::PersistenceState;
76pub use reth_engine_primitives::TreeConfig;
77
78pub mod state;
79
80/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
81///
82/// This is the default threshold, and represents the distance (gap) from the local head to a
83/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
84/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
85///
86/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
87/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
88/// backfill this gap.
89pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91/// A builder for creating state providers that can be used across threads.
92#[derive(Clone, Debug)]
93pub struct StateProviderBuilder<N: NodePrimitives, P> {
94    /// The provider factory used to create providers.
95    provider_factory: P,
96    /// The historical block hash to fetch state from.
97    historical: B256,
98    /// The blocks that form the chain from historical to target and are in memory.
99    overlay: Option<Vec<ExecutedBlock<N>>>,
100}
101
102impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
103    /// Creates a new state provider from the provider factory, historical block hash and optional
104    /// overlaid blocks.
105    pub const fn new(
106        provider_factory: P,
107        historical: B256,
108        overlay: Option<Vec<ExecutedBlock<N>>>,
109    ) -> Self {
110        Self { provider_factory, historical, overlay }
111    }
112}
113
114impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
115where
116    P: BlockReader + StateProviderFactory + StateReader + Clone,
117{
118    /// Creates a new state provider from this builder.
119    pub fn build(&self) -> ProviderResult<StateProviderBox> {
120        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
121        if let Some(overlay) = self.overlay.clone() {
122            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
123        }
124        Ok(provider)
125    }
126}
127
128/// Tracks the state of the engine api internals.
129///
130/// This type is not shareable.
131#[derive(Debug)]
132pub struct EngineApiTreeState<N: NodePrimitives> {
133    /// Tracks the state of the blockchain tree.
134    tree_state: TreeState<N>,
135    /// Tracks the forkchoice state updates received by the CL.
136    forkchoice_state_tracker: ForkchoiceStateTracker,
137    /// Buffer of detached blocks.
138    buffer: BlockBuffer<N::Block>,
139    /// Tracks the header of invalid payloads that were rejected by the engine because they're
140    /// invalid.
141    invalid_headers: InvalidHeaderCache,
142}
143
144impl<N: NodePrimitives> EngineApiTreeState<N> {
145    fn new(
146        block_buffer_limit: u32,
147        max_invalid_header_cache_length: u32,
148        canonical_block: BlockNumHash,
149        engine_kind: EngineApiKind,
150    ) -> Self {
151        Self {
152            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
153            buffer: BlockBuffer::new(block_buffer_limit),
154            tree_state: TreeState::new(canonical_block, engine_kind),
155            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
156        }
157    }
158}
159
160/// The outcome of a tree operation.
161#[derive(Debug)]
162pub struct TreeOutcome<T> {
163    /// The outcome of the operation.
164    pub outcome: T,
165    /// An optional event to tell the caller to do something.
166    pub event: Option<TreeEvent>,
167}
168
169impl<T> TreeOutcome<T> {
170    /// Create new tree outcome.
171    pub const fn new(outcome: T) -> Self {
172        Self { outcome, event: None }
173    }
174
175    /// Set event on the outcome.
176    pub fn with_event(mut self, event: TreeEvent) -> Self {
177        self.event = Some(event);
178        self
179    }
180}
181
182/// Events that are triggered by Tree Chain
183#[derive(Debug)]
184pub enum TreeEvent {
185    /// Tree action is needed.
186    TreeAction(TreeAction),
187    /// Backfill action is needed.
188    BackfillAction(BackfillAction),
189    /// Block download is needed.
190    Download(DownloadRequest),
191}
192
193impl TreeEvent {
194    /// Returns true if the event is a backfill action.
195    const fn is_backfill_action(&self) -> bool {
196        matches!(self, Self::BackfillAction(_))
197    }
198}
199
200/// The actions that can be performed on the tree.
201#[derive(Debug)]
202pub enum TreeAction {
203    /// Make target canonical.
204    MakeCanonical {
205        /// The sync target head hash
206        sync_target_head: B256,
207    },
208}
209
210/// Wrapper struct that combines metrics and state hook
211struct MeteredStateHook {
212    metrics: reth_evm::metrics::ExecutorMetrics,
213    inner_hook: Box<dyn OnStateHook>,
214}
215
216impl OnStateHook for MeteredStateHook {
217    fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
218        // Update the metrics for the number of accounts, storage slots and bytecodes loaded
219        let accounts = state.keys().len();
220        let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
221        let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
222
223        self.metrics.accounts_loaded_histogram.record(accounts as f64);
224        self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
225        self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
226
227        // Call the original state hook
228        self.inner_hook.on_state(source, state);
229    }
230}
231
232/// The engine API tree handler implementation.
233///
234/// This type is responsible for processing engine API requests, maintaining the canonical state and
235/// emitting events.
236pub struct EngineApiTreeHandler<N, P, T, V, C>
237where
238    N: NodePrimitives,
239    T: PayloadTypes,
240    C: ConfigureEvm<Primitives = N> + 'static,
241{
242    provider: P,
243    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
244    payload_validator: V,
245    /// Keeps track of internals such as executed and buffered blocks.
246    state: EngineApiTreeState<N>,
247    /// The half for sending messages to the engine.
248    ///
249    /// This is kept so that we can queue in messages to ourself that we can process later, for
250    /// example distributing workload across multiple messages that would otherwise take too long
251    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
252    /// them one by one so that we can handle incoming engine API in between and don't become
253    /// unresponsive. This can happen during live sync transition where we're trying to close the
254    /// gap (up to 3 epochs of blocks in the worst case).
255    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
256    /// Incoming engine API requests.
257    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
258    /// Outgoing events that are emitted to the handler.
259    outgoing: UnboundedSender<EngineApiEvent<N>>,
260    /// Channels to the persistence layer.
261    persistence: PersistenceHandle<N>,
262    /// Tracks the state changes of the persistence task.
263    persistence_state: PersistenceState,
264    /// Flag indicating the state of the node's backfill synchronization process.
265    backfill_sync_state: BackfillSyncState,
266    /// Keeps track of the state of the canonical chain that isn't persisted yet.
267    /// This is intended to be accessed from external sources, such as rpc.
268    canonical_in_memory_state: CanonicalInMemoryState<N>,
269    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
270    /// updates
271    payload_builder: PayloadBuilderHandle<T>,
272    /// Configuration settings.
273    config: TreeConfig,
274    /// Metrics for the engine api.
275    metrics: EngineApiMetrics,
276    /// The engine API variant of this handler
277    engine_kind: EngineApiKind,
278    /// The EVM configuration.
279    evm_config: C,
280}
281
282impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
283    for EngineApiTreeHandler<N, P, T, V, C>
284where
285    N: NodePrimitives,
286    C: Debug + ConfigureEvm<Primitives = N>,
287{
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        f.debug_struct("EngineApiTreeHandler")
290            .field("provider", &self.provider)
291            .field("consensus", &self.consensus)
292            .field("payload_validator", &self.payload_validator)
293            .field("state", &self.state)
294            .field("incoming_tx", &self.incoming_tx)
295            .field("persistence", &self.persistence)
296            .field("persistence_state", &self.persistence_state)
297            .field("backfill_sync_state", &self.backfill_sync_state)
298            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
299            .field("payload_builder", &self.payload_builder)
300            .field("config", &self.config)
301            .field("metrics", &self.metrics)
302            .field("engine_kind", &self.engine_kind)
303            .field("evm_config", &self.evm_config)
304            .finish()
305    }
306}
307
308impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
309where
310    N: NodePrimitives,
311    P: DatabaseProviderFactory
312        + BlockReader<Block = N::Block, Header = N::BlockHeader>
313        + StateProviderFactory
314        + StateReader<Receipt = N::Receipt>
315        + HashedPostStateProvider
316        + TrieReader
317        + Clone
318        + 'static,
319    <P as DatabaseProviderFactory>::Provider:
320        BlockReader<Block = N::Block, Header = N::BlockHeader>,
321    C: ConfigureEvm<Primitives = N> + 'static,
322    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
323    V: EngineValidator<T>,
324{
325    /// Creates a new [`EngineApiTreeHandler`].
326    #[expect(clippy::too_many_arguments)]
327    pub fn new(
328        provider: P,
329        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
330        payload_validator: V,
331        outgoing: UnboundedSender<EngineApiEvent<N>>,
332        state: EngineApiTreeState<N>,
333        canonical_in_memory_state: CanonicalInMemoryState<N>,
334        persistence: PersistenceHandle<N>,
335        persistence_state: PersistenceState,
336        payload_builder: PayloadBuilderHandle<T>,
337        config: TreeConfig,
338        engine_kind: EngineApiKind,
339        evm_config: C,
340    ) -> Self {
341        let (incoming_tx, incoming) = std::sync::mpsc::channel();
342
343        Self {
344            provider,
345            consensus,
346            payload_validator,
347            incoming,
348            outgoing,
349            persistence,
350            persistence_state,
351            backfill_sync_state: BackfillSyncState::Idle,
352            state,
353            canonical_in_memory_state,
354            payload_builder,
355            config,
356            metrics: Default::default(),
357            incoming_tx,
358            engine_kind,
359            evm_config,
360        }
361    }
362
363    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
364    /// own thread.
365    ///
366    /// Returns the sender through which incoming requests can be sent to the task and the receiver
367    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
368    #[expect(clippy::complexity)]
369    pub fn spawn_new(
370        provider: P,
371        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
372        payload_validator: V,
373        persistence: PersistenceHandle<N>,
374        payload_builder: PayloadBuilderHandle<T>,
375        canonical_in_memory_state: CanonicalInMemoryState<N>,
376        config: TreeConfig,
377        kind: EngineApiKind,
378        evm_config: C,
379    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
380    {
381        let best_block_number = provider.best_block_number().unwrap_or(0);
382        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
383
384        let persistence_state = PersistenceState {
385            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
386            rx: None,
387        };
388
389        let (tx, outgoing) = unbounded_channel();
390        let state = EngineApiTreeState::new(
391            config.block_buffer_limit(),
392            config.max_invalid_header_cache_length(),
393            header.num_hash(),
394            kind,
395        );
396
397        let task = Self::new(
398            provider,
399            consensus,
400            payload_validator,
401            tx,
402            state,
403            canonical_in_memory_state,
404            persistence,
405            persistence_state,
406            payload_builder,
407            config,
408            kind,
409            evm_config,
410        );
411        let incoming = task.incoming_tx.clone();
412        std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
413        (incoming, outgoing)
414    }
415
416    /// Returns a new [`Sender`] to send messages to this type.
417    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
418        self.incoming_tx.clone()
419    }
420
421    /// Run the engine API handler.
422    ///
423    /// This will block the current thread and process incoming messages.
424    pub fn run(mut self) {
425        loop {
426            match self.try_recv_engine_message() {
427                Ok(Some(msg)) => {
428                    debug!(target: "engine::tree", %msg, "received new engine message");
429                    match self.on_engine_message(msg) {
430                        Ok(ops::ControlFlow::Break(())) => return,
431                        Ok(ops::ControlFlow::Continue(())) => {}
432                        Err(fatal) => {
433                            error!(target: "engine::tree", %fatal, "insert block fatal error");
434                            return
435                        }
436                    }
437                }
438                Ok(None) => {
439                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440                }
441                Err(_err) => {
442                    error!(target: "engine::tree", "Engine channel disconnected");
443                    return
444                }
445            }
446
447            if let Err(err) = self.advance_persistence() {
448                error!(target: "engine::tree", %err, "Advancing persistence failed");
449                return
450            }
451        }
452    }
453
454    /// Invoked when previously requested blocks were downloaded.
455    ///
456    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
457    /// will execute the first batch and send the remaining blocks back through the channel so that
458    /// block request processing isn't blocked for a long time.
459    fn on_downloaded(
460        &mut self,
461        mut blocks: Vec<SealedBlock<N::Block>>,
462    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463        if blocks.is_empty() {
464            // nothing to execute
465            return Ok(None)
466        }
467
468        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470        for block in blocks.drain(..batch) {
471            if let Some(event) = self.on_downloaded_block(block)? {
472                let needs_backfill = event.is_backfill_action();
473                self.on_tree_event(event)?;
474                if needs_backfill {
475                    // can exit early if backfill is needed
476                    return Ok(None)
477                }
478            }
479        }
480
481        // if we still have blocks to execute, send them as a followup request
482        if !blocks.is_empty() {
483            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484        }
485
486        Ok(None)
487    }
488
489    /// When the Consensus layer receives a new block via the consensus gossip protocol,
490    /// the transactions in the block are sent to the execution layer in the form of a
491    /// [`PayloadTypes::ExecutionData`], for example
492    /// [`ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
493    /// Execution layer executes the transactions and validates the state in the block header,
494    /// then passes validation data back to Consensus layer, that adds the block to the head of
495    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
496    /// network in the form of a "Beacon block".
497    ///
498    /// These responses should adhere to the [Engine API Spec for
499    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
500    ///
501    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
502    /// returns an error if an internal error occurred.
503    #[instrument(
504        level = "debug",
505        target = "engine::tree",
506        skip_all,
507        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
508    )]
509    fn on_new_payload(
510        &mut self,
511        payload: T::ExecutionData,
512    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
513        trace!(target: "engine::tree", "invoked new payload");
514
515        // start timing for the new payload process
516        let start = Instant::now();
517
518        // Ensures that the given payload does not violate any consensus rules that concern the
519        // block's layout, like:
520        //    - missing or invalid base fee
521        //    - invalid extra data
522        //    - invalid transactions
523        //    - incorrect hash
524        //    - the versioned hashes passed with the payload do not exactly match transaction
525        //      versioned hashes
526        //    - the block does not contain blob transactions if it is pre-cancun
527        //
528        // This validates the following engine API rule:
529        //
530        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
531        //    validation by taking the following steps:
532        //
533        //   1. Obtain the actual array by concatenating blob versioned hashes lists
534        //      (`tx.blob_versioned_hashes`) of each [blob
535        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
536        //      in the payload, respecting the order of inclusion. If the payload has no blob
537        //      transactions the expected array **MUST** be `[]`.
538        //
539        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
540        //      null}` if the expected and the actual arrays don't match.
541        //
542        // This validation **MUST** be instantly run in all cases even during active sync process.
543
544        let num_hash = payload.num_hash();
545        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
546        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
547
548        let block_hash = num_hash.hash;
549
550        // Check for invalid ancestors
551        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
552            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
553            return Ok(TreeOutcome::new(status));
554        }
555
556        // record pre-execution phase duration
557        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
558
559        let status = if self.backfill_sync_state.is_idle() {
560            self.try_insert_payload(payload)?
561        } else {
562            self.try_buffer_payload(payload)?
563        };
564
565        let mut outcome = TreeOutcome::new(status);
566        // if the block is valid and it is the current sync target head, make it canonical
567        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
568            // Only create the canonical event if this block isn't already the canonical head
569            if self.state.tree_state.canonical_block_hash() != block_hash {
570                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
571                    sync_target_head: block_hash,
572                }));
573            }
574        }
575
576        // record total newPayload duration
577        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
578
579        Ok(outcome)
580    }
581
582    /// Processes a payload during normal sync operation.
583    ///
584    /// Returns:
585    /// - `Valid`: Payload successfully validated and inserted
586    /// - `Syncing`: Parent missing, payload buffered for later
587    /// - Error status: Payload is invalid
588    #[instrument(level = "debug", target = "engine::tree", skip_all)]
589    fn try_insert_payload(
590        &mut self,
591        payload: T::ExecutionData,
592    ) -> Result<PayloadStatus, InsertBlockFatalError> {
593        let block_hash = payload.block_hash();
594        let num_hash = payload.num_hash();
595        let parent_hash = payload.parent_hash();
596        let mut latest_valid_hash = None;
597
598        match self.insert_payload(payload) {
599            Ok(status) => {
600                let status = match status {
601                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
602                        latest_valid_hash = Some(block_hash);
603                        self.try_connect_buffered_blocks(num_hash)?;
604                        PayloadStatusEnum::Valid
605                    }
606                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
607                        latest_valid_hash = Some(block_hash);
608                        PayloadStatusEnum::Valid
609                    }
610                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
611                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
612                        // not known to be invalid, but we don't know anything else
613                        PayloadStatusEnum::Syncing
614                    }
615                };
616
617                Ok(PayloadStatus::new(status, latest_valid_hash))
618            }
619            Err(error) => match error {
620                InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
621                InsertPayloadError::Payload(error) => {
622                    Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
623                }
624            },
625        }
626    }
627
628    /// Stores a payload for later processing during backfill sync.
629    ///
630    /// During backfill, the node lacks the state needed to validate payloads,
631    /// so they are buffered (stored in memory) until their parent blocks are synced.
632    ///
633    /// Returns:
634    /// - `Syncing`: Payload successfully buffered
635    /// - Error status: Payload is malformed or invalid
636    fn try_buffer_payload(
637        &mut self,
638        payload: T::ExecutionData,
639    ) -> Result<PayloadStatus, InsertBlockFatalError> {
640        let parent_hash = payload.parent_hash();
641        let num_hash = payload.num_hash();
642
643        match self.payload_validator.convert_payload_to_block(payload) {
644            // if the block is well-formed, buffer it for later
645            Ok(block) => {
646                if let Err(error) = self.buffer_block(block) {
647                    Ok(self.on_insert_block_error(error)?)
648                } else {
649                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
650                }
651            }
652            Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
653        }
654    }
655
656    /// Returns the new chain for the given head.
657    ///
658    /// This also handles reorgs.
659    ///
660    /// Note: This does not update the tracked state and instead returns the new chain based on the
661    /// given head.
662    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
663        // get the executed new head block
664        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
665            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
666            self.metrics.engine.executed_new_block_cache_miss.increment(1);
667            return Ok(None)
668        };
669
670        let new_head_number = new_head_block.recovered_block().number();
671        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
672
673        let mut new_chain = vec![new_head_block.clone()];
674        let mut current_hash = new_head_block.recovered_block().parent_hash();
675        let mut current_number = new_head_number - 1;
676
677        // Walk back the new chain until we reach a block we know about
678        //
679        // This is only done for in-memory blocks, because we should not have persisted any blocks
680        // that are _above_ the current canonical head.
681        while current_number > current_canonical_number {
682            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
683            {
684                current_hash = block.recovered_block().parent_hash();
685                current_number -= 1;
686                new_chain.push(block);
687            } else {
688                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
689                // This should never happen as we're walking back a chain that should connect to
690                // the canonical chain
691                return Ok(None)
692            }
693        }
694
695        // If we have reached the current canonical head by walking back from the target, then we
696        // know this represents an extension of the canonical chain.
697        if current_hash == self.state.tree_state.current_canonical_head.hash {
698            new_chain.reverse();
699
700            // Simple extension of the current chain
701            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
702        }
703
704        // We have a reorg. Walk back both chains to find the fork point.
705        let mut old_chain = Vec::new();
706        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
707
708        // If the canonical chain is ahead of the new chain,
709        // gather all blocks until new head number.
710        while current_canonical_number > current_number {
711            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
712                old_hash = block.recovered_block().parent_hash();
713                old_chain.push(block);
714                current_canonical_number -= 1;
715            } else {
716                // This shouldn't happen as we're walking back the canonical chain
717                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
718                return Ok(None)
719            }
720        }
721
722        // Both new and old chain pointers are now at the same height.
723        debug_assert_eq!(current_number, current_canonical_number);
724
725        // Walk both chains from specified hashes at same height until
726        // a common ancestor (fork block) is reached.
727        while old_hash != current_hash {
728            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
729                old_hash = block.recovered_block().parent_hash();
730                old_chain.push(block);
731            } else {
732                // This shouldn't happen as we're walking back the canonical chain
733                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
734                return Ok(None)
735            }
736
737            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
738            {
739                current_hash = block.recovered_block().parent_hash();
740                new_chain.push(block);
741            } else {
742                // This shouldn't happen as we've already walked this path
743                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
744                return Ok(None)
745            }
746        }
747        new_chain.reverse();
748        old_chain.reverse();
749
750        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
751    }
752
753    /// Updates the latest block state to the specified canonical ancestor.
754    ///
755    /// This method ensures that the latest block tracks the given canonical header by resetting
756    ///
757    /// # Arguments
758    /// * `canonical_header` - The canonical header to set as the new head
759    ///
760    /// # Returns
761    /// * `ProviderResult<()>` - Ok(()) on success, error if state update fails
762    ///
763    /// Caution: This unwinds the canonical chain
764    fn update_latest_block_to_canonical_ancestor(
765        &mut self,
766        canonical_header: &SealedHeader<N::BlockHeader>,
767    ) -> ProviderResult<()> {
768        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
769        let current_head_number = self.state.tree_state.canonical_block_number();
770        let new_head_number = canonical_header.number();
771        let new_head_hash = canonical_header.hash();
772
773        // Update tree state with the new canonical head
774        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
775
776        // Handle the state update based on whether this is an unwind scenario
777        if new_head_number < current_head_number {
778            debug!(
779                target: "engine::tree",
780                current_head = current_head_number,
781                new_head = new_head_number,
782                new_head_hash = ?new_head_hash,
783                "FCU unwind detected: reverting to canonical ancestor"
784            );
785
786            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
787        } else {
788            debug!(
789                target: "engine::tree",
790                previous_head = current_head_number,
791                new_head = new_head_number,
792                new_head_hash = ?new_head_hash,
793                "Advancing latest block to canonical ancestor"
794            );
795            self.handle_chain_advance_or_same_height(canonical_header)
796        }
797    }
798
799    /// Handles chain unwind scenarios by collecting blocks to remove and performing an unwind back
800    /// to the canonical header
801    fn handle_canonical_chain_unwind(
802        &self,
803        current_head_number: u64,
804        canonical_header: &SealedHeader<N::BlockHeader>,
805    ) -> ProviderResult<()> {
806        let new_head_number = canonical_header.number();
807        debug!(
808            target: "engine::tree",
809            from = current_head_number,
810            to = new_head_number,
811            "Handling unwind: collecting blocks to remove from in-memory state"
812        );
813
814        // Collect blocks that need to be removed from memory
815        let old_blocks =
816            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
817
818        // Load and apply the canonical ancestor block
819        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
820    }
821
822    /// Collects blocks from memory that need to be removed during an unwind to a canonical block.
823    fn collect_blocks_for_canonical_unwind(
824        &self,
825        new_head_number: u64,
826        current_head_number: u64,
827    ) -> Vec<ExecutedBlock<N>> {
828        let mut old_blocks =
829            Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
830
831        for block_num in (new_head_number + 1)..=current_head_number {
832            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
833                let executed_block = block_state.block_ref().clone();
834                old_blocks.push(executed_block);
835                debug!(
836                    target: "engine::tree",
837                    block_number = block_num,
838                    "Collected block for removal from in-memory state"
839                );
840            }
841        }
842
843        if old_blocks.is_empty() {
844            debug!(
845                target: "engine::tree",
846                "No blocks found in memory to remove, will clear and reset state"
847            );
848        }
849
850        old_blocks
851    }
852
853    /// Applies the canonical ancestor block via a reorg operation.
854    fn apply_canonical_ancestor_via_reorg(
855        &self,
856        canonical_header: &SealedHeader<N::BlockHeader>,
857        old_blocks: Vec<ExecutedBlock<N>>,
858    ) -> ProviderResult<()> {
859        let new_head_hash = canonical_header.hash();
860        let new_head_number = canonical_header.number();
861
862        // Try to load the canonical ancestor's block
863        match self.canonical_block_by_hash(new_head_hash)? {
864            Some(executed_block) => {
865                // Perform the reorg to properly handle the unwind
866                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
867                    new: vec![executed_block],
868                    old: old_blocks,
869                });
870
871                // CRITICAL: Update the canonical head after the reorg
872                // This ensures get_canonical_head() returns the correct block
873                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
874
875                debug!(
876                    target: "engine::tree",
877                    block_number = new_head_number,
878                    block_hash = ?new_head_hash,
879                    "Successfully loaded canonical ancestor into memory via reorg"
880                );
881            }
882            None => {
883                // Fallback: update header only if block cannot be found
884                warn!(
885                    target: "engine::tree",
886                    block_hash = ?new_head_hash,
887                    "Could not find canonical ancestor block, updating header only"
888                );
889                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
890            }
891        }
892
893        Ok(())
894    }
895
896    /// Handles chain advance or same height scenarios.
897    fn handle_chain_advance_or_same_height(
898        &self,
899        canonical_header: &SealedHeader<N::BlockHeader>,
900    ) -> ProviderResult<()> {
901        let new_head_number = canonical_header.number();
902        let new_head_hash = canonical_header.hash();
903
904        // Update the canonical head header
905        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
906
907        // Load the block into memory if it's not already present
908        self.ensure_block_in_memory(new_head_number, new_head_hash)
909    }
910
911    /// Ensures a block is loaded into memory if not already present.
912    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
913        // Check if block is already in memory
914        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
915            return Ok(());
916        }
917
918        // Try to load the block from storage
919        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
920            self.canonical_in_memory_state
921                .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
922
923            debug!(
924                target: "engine::tree",
925                block_number,
926                block_hash = ?block_hash,
927                "Added canonical block to in-memory state"
928            );
929        }
930
931        Ok(())
932    }
933
934    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
935    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
936    /// chain.
937    ///
938    /// These responses should adhere to the [Engine API Spec for
939    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
940    ///
941    /// Returns an error if an internal error occurred like a database error.
942    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
943    fn on_forkchoice_updated(
944        &mut self,
945        state: ForkchoiceState,
946        attrs: Option<T::PayloadAttributes>,
947        version: EngineApiMessageVersion,
948    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
949        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
950
951        // Record metrics
952        self.record_forkchoice_metrics();
953
954        // Pre-validation of forkchoice state
955        if let Some(early_result) = self.validate_forkchoice_state(state)? {
956            return Ok(TreeOutcome::new(early_result));
957        }
958
959        // Return early if we are on the correct fork
960        if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
961            return Ok(result);
962        }
963
964        // Attempt to apply a chain update when the head differs from our canonical chain.
965        // This handles reorgs and chain extensions by making the specified head canonical.
966        if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
967            return Ok(result);
968        }
969
970        // Fallback that ensures to catch up to the network's state.
971        self.handle_missing_block(state)
972    }
973
974    /// Records metrics for forkchoice updated calls
975    fn record_forkchoice_metrics(&self) {
976        self.canonical_in_memory_state.on_forkchoice_update_received();
977    }
978
979    /// Pre-validates the forkchoice state and returns early if validation fails.
980    ///
981    /// Returns `Some(OnForkChoiceUpdated)` if validation fails and an early response should be
982    /// returned. Returns `None` if validation passes and processing should continue.
983    fn validate_forkchoice_state(
984        &mut self,
985        state: ForkchoiceState,
986    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
987        if state.head_block_hash.is_zero() {
988            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
989        }
990
991        // Check if the new head hash is connected to any ancestor that we previously marked as
992        // invalid
993        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
994        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
995            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
996        }
997
998        if !self.backfill_sync_state.is_idle() {
999            // We can only process new forkchoice updates if the pipeline is idle, since it requires
1000            // exclusive access to the database
1001            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1002            return Ok(Some(OnForkChoiceUpdated::syncing()));
1003        }
1004
1005        Ok(None)
1006    }
1007
1008    /// Handles the case where the forkchoice head is already canonical.
1009    ///
1010    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if the head is already canonical and
1011    /// processing is complete. Returns `None` if the head is not canonical and processing
1012    /// should continue.
1013    fn handle_canonical_head(
1014        &self,
1015        state: ForkchoiceState,
1016        attrs: &Option<T::PayloadAttributes>, // Changed to reference
1017        version: EngineApiMessageVersion,
1018    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1019        // Process the forkchoice update by trying to make the head block canonical
1020        //
1021        // We can only process this forkchoice update if:
1022        // - we have the `head` block
1023        // - the head block is part of a chain that is connected to the canonical chain. This
1024        //   includes reorgs.
1025        //
1026        // Performing a FCU involves:
1027        // - marking the FCU's head block as canonical
1028        // - updating in memory state to reflect the new canonical chain
1029        // - updating canonical state trackers
1030        // - emitting a canonicalization event for the new chain (including reorg)
1031        // - if we have payload attributes, delegate them to the payload service
1032
1033        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1034            return Ok(None);
1035        }
1036
1037        trace!(target: "engine::tree", "fcu head hash is already canonical");
1038
1039        // Update the safe and finalized blocks and ensure their values are valid
1040        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1041            // safe or finalized hashes are invalid
1042            return Ok(Some(TreeOutcome::new(outcome)));
1043        }
1044
1045        // Process payload attributes if the head is already canonical
1046        if let Some(attr) = attrs {
1047            let tip = self
1048                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1049                .ok_or_else(|| {
1050                    // If we can't find the canonical block, then something is wrong and we need
1051                    // to return an error
1052                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1053                })?;
1054            // Clone only when we actually need to process the attributes
1055            let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1056            return Ok(Some(TreeOutcome::new(updated)));
1057        }
1058
1059        // The head block is already canonical
1060        let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1061            PayloadStatusEnum::Valid,
1062            Some(state.head_block_hash),
1063        )));
1064        Ok(Some(outcome))
1065    }
1066
1067    /// Applies chain update for the new head block and processes payload attributes.
1068    ///
1069    /// This method handles the case where the forkchoice head differs from our current canonical
1070    /// head. It attempts to make the specified head block canonical by:
1071    /// - Checking if the head is already part of the canonical chain
1072    /// - Applying chain reorganizations (reorgs) if necessary
1073    /// - Processing payload attributes if provided
1074    /// - Returning the appropriate forkchoice update response
1075    ///
1076    /// Returns `Some(TreeOutcome<OnForkChoiceUpdated>)` if a chain update was successfully applied.
1077    /// Returns `None` if no chain update was needed or possible.
1078    fn apply_chain_update(
1079        &mut self,
1080        state: ForkchoiceState,
1081        attrs: &Option<T::PayloadAttributes>,
1082        version: EngineApiMessageVersion,
1083    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1084        // Check if the head is already part of the canonical chain
1085        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1086            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1087
1088            // For OpStack, or if explicitly configured, the proposers are allowed to reorg their
1089            // own chain at will, so we need to always trigger a new payload job if requested.
1090            if self.engine_kind.is_opstack() ||
1091                self.config.always_process_payload_attributes_on_canonical_head()
1092            {
1093                // We need to effectively unwind the _canonical_ chain to the FCU's head, which is
1094                // part of the canonical chain. We need to update the latest block state to reflect
1095                // the canonical ancestor. This ensures that state providers and the transaction
1096                // pool operate with the correct chain state after forkchoice update processing, and
1097                // new payloads built on the reorg'd head will be added to the tree immediately.
1098                if self.config.unwind_canonical_header() {
1099                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1100                }
1101
1102                if let Some(attr) = attrs {
1103                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1104                    // Clone only when we actually need to process the attributes
1105                    let updated = self.process_payload_attributes(
1106                        attr.clone(),
1107                        &canonical_header,
1108                        state,
1109                        version,
1110                    );
1111                    return Ok(Some(TreeOutcome::new(updated)));
1112                }
1113            }
1114
1115            // According to the Engine API specification, client software MAY skip an update of the
1116            // forkchoice state and MUST NOT begin a payload build process if
1117            // `forkchoiceState.headBlockHash` references a `VALID` ancestor of the head
1118            // of canonical chain, i.e. the ancestor passed payload validation process
1119            // and deemed `VALID`. In the case of such an event, client software MUST
1120            // return `{payloadStatus: {status: VALID, latestValidHash:
1121            // forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1122
1123            // The head block is already canonical and we're not processing payload attributes,
1124            // so we're not triggering a payload job and can return right away
1125
1126            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1127                PayloadStatusEnum::Valid,
1128                Some(state.head_block_hash),
1129            )));
1130            return Ok(Some(outcome));
1131        }
1132
1133        // Ensure we can apply a new chain update for the head block
1134        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1135            let tip = chain_update.tip().clone_sealed_header();
1136            self.on_canonical_chain_update(chain_update);
1137
1138            // Update the safe and finalized blocks and ensure their values are valid
1139            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1140                // safe or finalized hashes are invalid
1141                return Ok(Some(TreeOutcome::new(outcome)));
1142            }
1143
1144            if let Some(attr) = attrs {
1145                // Clone only when we actually need to process the attributes
1146                let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1147                return Ok(Some(TreeOutcome::new(updated)));
1148            }
1149
1150            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1151                PayloadStatusEnum::Valid,
1152                Some(state.head_block_hash),
1153            )));
1154            return Ok(Some(outcome));
1155        }
1156
1157        Ok(None)
1158    }
1159
1160    /// Handles the case where the head block is missing and needs to be downloaded.
1161    ///
1162    /// This is the fallback case when all other forkchoice update scenarios have been exhausted.
1163    /// Returns a `TreeOutcome` with syncing status and download event.
1164    fn handle_missing_block(
1165        &self,
1166        state: ForkchoiceState,
1167    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1168        // We don't have the block to perform the forkchoice update
1169        // We assume the FCU is valid and at least the head is missing,
1170        // so we need to start syncing to it
1171        //
1172        // find the appropriate target to sync to, if we don't have the safe block hash then we
1173        // start syncing to the safe block via backfill first
1174        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1175        // check that safe block is valid and missing
1176        !state.safe_block_hash.is_zero() &&
1177        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1178        {
1179            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1180            state.safe_block_hash
1181        } else {
1182            state.head_block_hash
1183        };
1184
1185        let target = self.lowest_buffered_ancestor_or(target);
1186        trace!(target: "engine::tree", %target, "downloading missing block");
1187
1188        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1189            PayloadStatusEnum::Syncing,
1190        )))
1191        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1192    }
1193
1194    /// Attempts to receive the next engine request.
1195    ///
1196    /// If there's currently no persistence action in progress, this will block until a new request
1197    /// is received. If there's a persistence action in progress, this will try to receive the
1198    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1199    /// received in time.
1200    ///
1201    /// Returns an error if the engine channel is disconnected.
1202    #[expect(clippy::type_complexity)]
1203    fn try_recv_engine_message(
1204        &self,
1205    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1206        if self.persistence_state.in_progress() {
1207            // try to receive the next request with a timeout to not block indefinitely
1208            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1209                Ok(msg) => Ok(Some(msg)),
1210                Err(err) => match err {
1211                    RecvTimeoutError::Timeout => Ok(None),
1212                    RecvTimeoutError::Disconnected => Err(RecvError),
1213                },
1214            }
1215        } else {
1216            self.incoming.recv().map(Some)
1217        }
1218    }
1219
1220    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
1221    /// the current persistence action while we're removing blocks.
1222    fn remove_blocks(&mut self, new_tip_num: u64) {
1223        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1224        if new_tip_num < self.persistence_state.last_persisted_block.number {
1225            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1226            let (tx, rx) = oneshot::channel();
1227            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1228            self.persistence_state.start_remove(new_tip_num, rx);
1229        }
1230    }
1231
1232    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
1233    /// the current persistence action while we're saving blocks.
1234    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1235        if blocks_to_persist.is_empty() {
1236            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1237            return
1238        }
1239
1240        // NOTE: checked non-empty above
1241        let highest_num_hash = blocks_to_persist
1242            .iter()
1243            .max_by_key(|block| block.recovered_block().number())
1244            .map(|b| b.recovered_block().num_hash())
1245            .expect("Checked non-empty persisting blocks");
1246
1247        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1248        let (tx, rx) = oneshot::channel();
1249        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1250
1251        self.persistence_state.start_save(highest_num_hash, rx);
1252    }
1253
1254    /// Attempts to advance the persistence state.
1255    ///
1256    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1257    /// or send a new persistence action if necessary.
1258    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1259        if self.persistence_state.in_progress() {
1260            let (mut rx, start_time, current_action) = self
1261                .persistence_state
1262                .rx
1263                .take()
1264                .expect("if a persistence task is in progress Receiver must be Some");
1265            // Check if persistence has complete
1266            match rx.try_recv() {
1267                Ok(last_persisted_hash_num) => {
1268                    self.on_persistence_complete(last_persisted_hash_num, start_time)?;
1269                }
1270                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1271                Err(TryRecvError::Empty) => {
1272                    self.persistence_state.rx = Some((rx, start_time, current_action))
1273                }
1274            }
1275        }
1276
1277        if !self.persistence_state.in_progress() {
1278            if let Some(new_tip_num) = self.find_disk_reorg()? {
1279                self.remove_blocks(new_tip_num)
1280            } else if self.should_persist() {
1281                let blocks_to_persist =
1282                    self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1283                self.persist_blocks(blocks_to_persist);
1284            }
1285        }
1286
1287        Ok(())
1288    }
1289
1290    /// Finishes termination by persisting all remaining blocks and signaling completion.
1291    ///
1292    /// This blocks until all persistence is complete. Always signals completion,
1293    /// even if an error occurs.
1294    fn finish_termination(
1295        &mut self,
1296        pending_termination: oneshot::Sender<()>,
1297    ) -> Result<(), AdvancePersistenceError> {
1298        trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1299        let result = self.persist_until_complete();
1300        let _ = pending_termination.send(());
1301        result
1302    }
1303
1304    /// Persists all remaining blocks until none are left.
1305    fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1306        loop {
1307            // Wait for any in-progress persistence to complete (blocking)
1308            if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
1309                let result = rx.blocking_recv().map_err(|_| TryRecvError::Closed)?;
1310                self.on_persistence_complete(result, start_time)?;
1311            }
1312
1313            let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1314
1315            if blocks_to_persist.is_empty() {
1316                debug!(target: "engine::tree", "persistence complete, signaling termination");
1317                return Ok(())
1318            }
1319
1320            debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1321            self.persist_blocks(blocks_to_persist);
1322        }
1323    }
1324
1325    /// Handles a completed persistence task.
1326    fn on_persistence_complete(
1327        &mut self,
1328        last_persisted_hash_num: Option<BlockNumHash>,
1329        start_time: Instant,
1330    ) -> Result<(), AdvancePersistenceError> {
1331        self.metrics.engine.persistence_duration.record(start_time.elapsed());
1332
1333        let Some(BlockNumHash {
1334            hash: last_persisted_block_hash,
1335            number: last_persisted_block_number,
1336        }) = last_persisted_hash_num
1337        else {
1338            // if this happened, then we persisted no blocks because we sent an empty vec of blocks
1339            warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1340            return Ok(())
1341        };
1342
1343        debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1344        self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1345        self.on_new_persisted_block()?;
1346        Ok(())
1347    }
1348
1349    /// Handles a message from the engine.
1350    ///
1351    /// Returns `ControlFlow::Break(())` if the engine should terminate.
1352    fn on_engine_message(
1353        &mut self,
1354        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1355    ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1356        match msg {
1357            FromEngine::Event(event) => match event {
1358                FromOrchestrator::BackfillSyncStarted => {
1359                    debug!(target: "engine::tree", "received backfill sync started event");
1360                    self.backfill_sync_state = BackfillSyncState::Active;
1361                }
1362                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1363                    self.on_backfill_sync_finished(ctrl)?;
1364                }
1365                FromOrchestrator::Terminate { tx } => {
1366                    debug!(target: "engine::tree", "received terminate request");
1367                    if let Err(err) = self.finish_termination(tx) {
1368                        error!(target: "engine::tree", %err, "Termination failed");
1369                    }
1370                    return Ok(ops::ControlFlow::Break(()))
1371                }
1372            },
1373            FromEngine::Request(request) => {
1374                match request {
1375                    EngineApiRequest::InsertExecutedBlock(block) => {
1376                        let block_num_hash = block.recovered_block().num_hash();
1377                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1378                            // outdated block that can be skipped
1379                            return Ok(ops::ControlFlow::Continue(()))
1380                        }
1381
1382                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1383                        let now = Instant::now();
1384
1385                        // if the parent is the canonical head, we can insert the block as the
1386                        // pending block
1387                        if self.state.tree_state.canonical_block_hash() ==
1388                            block.recovered_block().parent_hash()
1389                        {
1390                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1391                            self.canonical_in_memory_state.set_pending_block(block.clone());
1392                        }
1393
1394                        self.state.tree_state.insert_executed(block.clone());
1395                        self.payload_validator.on_inserted_executed_block(block.clone());
1396                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1397                        self.emit_event(EngineApiEvent::BeaconConsensus(
1398                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1399                        ));
1400                    }
1401                    EngineApiRequest::Beacon(request) => {
1402                        match request {
1403                            BeaconEngineMessage::ForkchoiceUpdated {
1404                                state,
1405                                payload_attrs,
1406                                tx,
1407                                version,
1408                            } => {
1409                                let has_attrs = payload_attrs.is_some();
1410
1411                                let start = Instant::now();
1412                                let mut output =
1413                                    self.on_forkchoice_updated(state, payload_attrs, version);
1414
1415                                if let Ok(res) = &mut output {
1416                                    // track last received forkchoice state
1417                                    self.state
1418                                        .forkchoice_state_tracker
1419                                        .set_latest(state, res.outcome.forkchoice_status());
1420
1421                                    // emit an event about the handled FCU
1422                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1423                                        state,
1424                                        res.outcome.forkchoice_status(),
1425                                    ));
1426
1427                                    // handle the event if any
1428                                    self.on_maybe_tree_event(res.event.take())?;
1429                                }
1430
1431                                self.metrics.engine.forkchoice_updated.update_response_metrics(
1432                                    start,
1433                                    &mut self.metrics.engine.new_payload.latest_at,
1434                                    has_attrs,
1435                                    &output,
1436                                );
1437
1438                                if let Err(err) =
1439                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1440                                {
1441                                    self.metrics
1442                                        .engine
1443                                        .failed_forkchoice_updated_response_deliveries
1444                                        .increment(1);
1445                                    error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1446                                }
1447                            }
1448                            BeaconEngineMessage::NewPayload { payload, tx } => {
1449                                let start = Instant::now();
1450                                let gas_used = payload.gas_used();
1451                                let num_hash = payload.num_hash();
1452                                let mut output = self.on_new_payload(payload);
1453                                self.metrics
1454                                    .engine
1455                                    .new_payload
1456                                    .update_response_metrics(start, &output, gas_used);
1457
1458                                let maybe_event =
1459                                    output.as_mut().ok().and_then(|out| out.event.take());
1460
1461                                // emit response
1462                                if let Err(err) =
1463                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1464                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1465                                    }))
1466                                {
1467                                    error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1468                                    self.metrics
1469                                        .engine
1470                                        .failed_new_payload_response_deliveries
1471                                        .increment(1);
1472                                }
1473
1474                                // handle the event if any
1475                                self.on_maybe_tree_event(maybe_event)?;
1476                            }
1477                        }
1478                    }
1479                }
1480            }
1481            FromEngine::DownloadedBlocks(blocks) => {
1482                if let Some(event) = self.on_downloaded(blocks)? {
1483                    self.on_tree_event(event)?;
1484                }
1485            }
1486        }
1487        Ok(ops::ControlFlow::Continue(()))
1488    }
1489
1490    /// Invoked if the backfill sync has finished to target.
1491    ///
1492    /// At this point we consider the block synced to the backfill target.
1493    ///
1494    /// Checks the tracked finalized block against the block on disk and requests another backfill
1495    /// run if the distance to the tip exceeds the threshold for another backfill run.
1496    ///
1497    /// This will also do the necessary housekeeping of the tree state, this includes:
1498    ///  - removing all blocks below the backfill height
1499    ///  - resetting the canonical in-memory state
1500    ///
1501    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1502    /// target block.
1503    fn on_backfill_sync_finished(
1504        &mut self,
1505        ctrl: ControlFlow,
1506    ) -> Result<(), InsertBlockFatalError> {
1507        debug!(target: "engine::tree", "received backfill sync finished event");
1508        self.backfill_sync_state = BackfillSyncState::Idle;
1509
1510        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1511        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1512            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1513            // update the `invalid_headers` cache with the new invalid header
1514            self.state.invalid_headers.insert(**bad_block);
1515
1516            // if this was an unwind then the target is the new height
1517            Some(*target)
1518        } else {
1519            // backfill height is the block number that the backfill finished at
1520            ctrl.block_number()
1521        };
1522
1523        // backfill height is the block number that the backfill finished at
1524        let Some(backfill_height) = backfill_height else { return Ok(()) };
1525
1526        // state house keeping after backfill sync
1527        // remove all executed blocks below the backfill height
1528        //
1529        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1530        // before that
1531        let Some(backfill_num_hash) = self
1532            .provider
1533            .block_hash(backfill_height)?
1534            .map(|hash| BlockNumHash { hash, number: backfill_height })
1535        else {
1536            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1537            return Ok(())
1538        };
1539
1540        if ctrl.is_unwind() {
1541            // the node reset so we need to clear everything above that height so that backfill
1542            // height is the new canonical block.
1543            self.state.tree_state.reset(backfill_num_hash)
1544        } else {
1545            self.state.tree_state.remove_until(
1546                backfill_num_hash,
1547                self.persistence_state.last_persisted_block.hash,
1548                Some(backfill_num_hash),
1549            );
1550        }
1551
1552        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1553        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1554
1555        // remove all buffered blocks below the backfill height
1556        self.state.buffer.remove_old_blocks(backfill_height);
1557        // we remove all entries because now we're synced to the backfill target and consider this
1558        // the canonical chain
1559        self.canonical_in_memory_state.clear_state();
1560
1561        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1562            // update the tracked chain height, after backfill sync both the canonical height and
1563            // persisted height are the same
1564            self.state.tree_state.set_canonical_head(new_head.num_hash());
1565            self.persistence_state.finish(new_head.hash(), new_head.number());
1566
1567            // update the tracked canonical head
1568            self.canonical_in_memory_state.set_canonical_head(new_head);
1569        }
1570
1571        // check if we need to run backfill again by comparing the most recent finalized height to
1572        // the backfill height
1573        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1574        else {
1575            return Ok(())
1576        };
1577        if sync_target_state.finalized_block_hash.is_zero() {
1578            // no finalized block, can't check distance
1579            return Ok(())
1580        }
1581        // get the block number of the finalized block, if we have it
1582        let newest_finalized = self
1583            .state
1584            .buffer
1585            .block(&sync_target_state.finalized_block_hash)
1586            .map(|block| block.number());
1587
1588        // The block number that the backfill finished at - if the progress or newest
1589        // finalized is None then we can't check the distance anyways.
1590        //
1591        // If both are Some, we perform another distance check and return the desired
1592        // backfill target
1593        if let Some(backfill_target) =
1594            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1595                // Determines whether or not we should run backfill again, in case
1596                // the new gap is still large enough and requires running backfill again
1597                self.backfill_sync_target(progress, finalized_number, None)
1598            })
1599        {
1600            // request another backfill run
1601            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1602                backfill_target.into(),
1603            )));
1604            return Ok(())
1605        };
1606
1607        // Check if there are more blocks to sync between current head and FCU target
1608        if let Some(lowest_buffered) =
1609            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1610        {
1611            let current_head_num = self.state.tree_state.current_canonical_head.number;
1612            let target_head_num = lowest_buffered.number();
1613
1614            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1615            {
1616                // There are blocks between current head and FCU target, download them
1617                debug!(
1618                    target: "engine::tree",
1619                    %current_head_num,
1620                    %target_head_num,
1621                    %distance,
1622                    "Backfill complete, downloading remaining blocks to reach FCU target"
1623                );
1624
1625                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1626                    lowest_buffered.parent_hash(),
1627                    distance,
1628                )));
1629                return Ok(());
1630            }
1631        }
1632
1633        // try to close the gap by executing buffered blocks that are child blocks of the new head
1634        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1635    }
1636
1637    /// Attempts to make the given target canonical.
1638    ///
1639    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1640    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1641        if let Some(chain_update) = self.on_new_head(target)? {
1642            self.on_canonical_chain_update(chain_update);
1643        }
1644
1645        Ok(())
1646    }
1647
1648    /// Convenience function to handle an optional tree event.
1649    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1650        if let Some(event) = event {
1651            self.on_tree_event(event)?;
1652        }
1653
1654        Ok(())
1655    }
1656
1657    /// Handles a tree event.
1658    ///
1659    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1660    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1661        match event {
1662            TreeEvent::TreeAction(action) => match action {
1663                TreeAction::MakeCanonical { sync_target_head } => {
1664                    self.make_canonical(sync_target_head)?;
1665                }
1666            },
1667            TreeEvent::BackfillAction(action) => {
1668                self.emit_event(EngineApiEvent::BackfillAction(action));
1669            }
1670            TreeEvent::Download(action) => {
1671                self.emit_event(EngineApiEvent::Download(action));
1672            }
1673        }
1674
1675        Ok(())
1676    }
1677
1678    /// Emits an outgoing event to the engine.
1679    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1680        let event = event.into();
1681
1682        if event.is_backfill_action() {
1683            debug_assert_eq!(
1684                self.backfill_sync_state,
1685                BackfillSyncState::Idle,
1686                "backfill action should only be emitted when backfill is idle"
1687            );
1688
1689            if self.persistence_state.in_progress() {
1690                // backfill sync and persisting data are mutually exclusive, so we can't start
1691                // backfill while we're still persisting
1692                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1693                return
1694            }
1695
1696            self.backfill_sync_state = BackfillSyncState::Pending;
1697            self.metrics.engine.pipeline_runs.increment(1);
1698            debug!(target: "engine::tree", "emitting backfill action event");
1699        }
1700
1701        let _ = self.outgoing.send(event).inspect_err(
1702            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1703        );
1704    }
1705
1706    /// Returns true if the canonical chain length minus the last persisted
1707    /// block is greater than or equal to the persistence threshold and
1708    /// backfill is not running.
1709    pub const fn should_persist(&self) -> bool {
1710        if !self.backfill_sync_state.is_idle() {
1711            // can't persist if backfill is running
1712            return false
1713        }
1714
1715        let min_block = self.persistence_state.last_persisted_block.number;
1716        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1717            self.config.persistence_threshold()
1718    }
1719
1720    /// Returns a batch of consecutive canonical blocks to persist in the range
1721    /// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
1722    fn get_canonical_blocks_to_persist(
1723        &self,
1724        target: PersistTarget,
1725    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1726        // We will calculate the state root using the database, so we need to be sure there are no
1727        // changes
1728        debug_assert!(!self.persistence_state.in_progress());
1729
1730        let mut blocks_to_persist = Vec::new();
1731        let mut current_hash = self.state.tree_state.canonical_block_hash();
1732        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1733        let canonical_head_number = self.state.tree_state.canonical_block_number();
1734
1735        let target_number = match target {
1736            PersistTarget::Head => canonical_head_number,
1737            PersistTarget::Threshold => {
1738                canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1739            }
1740        };
1741
1742        debug!(
1743            target: "engine::tree",
1744            ?current_hash,
1745            ?last_persisted_number,
1746            ?canonical_head_number,
1747            ?target_number,
1748            "Returning canonical blocks to persist"
1749        );
1750        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1751            if block.recovered_block().number() <= last_persisted_number {
1752                break;
1753            }
1754
1755            if block.recovered_block().number() <= target_number {
1756                blocks_to_persist.push(block.clone());
1757            }
1758
1759            current_hash = block.recovered_block().parent_hash();
1760        }
1761
1762        // Reverse the order so that the oldest block comes first
1763        blocks_to_persist.reverse();
1764
1765        Ok(blocks_to_persist)
1766    }
1767
1768    /// This clears the blocks from the in-memory tree state that have been persisted to the
1769    /// database.
1770    ///
1771    /// This also updates the canonical in-memory state to reflect the newest persisted block
1772    /// height.
1773    ///
1774    /// Assumes that `finish` has been called on the `persistence_state` at least once
1775    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1776        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1777        // state.
1778        if let Some(remove_above) = self.find_disk_reorg()? {
1779            self.remove_blocks(remove_above);
1780            return Ok(())
1781        }
1782
1783        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1784        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1785        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1786            number: self.persistence_state.last_persisted_block.number,
1787            hash: self.persistence_state.last_persisted_block.hash,
1788        });
1789        Ok(())
1790    }
1791
1792    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1793    ///
1794    /// Note: This function attempts to fetch the `ExecutedBlock` from either in-memory state
1795    /// or the database. If the required historical data (such as trie change sets) has been
1796    /// pruned for a given block, this operation will return an error. On archive nodes, it
1797    /// can retrieve any block.
1798    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1799        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1800        // check memory first
1801        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1802            return Ok(Some(block.clone()))
1803        }
1804
1805        let (block, senders) = self
1806            .provider
1807            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1808            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1809            .split_sealed();
1810        let execution_output = self
1811            .provider
1812            .get_state(block.header().number())?
1813            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1814        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1815        let trie_updates = self.provider.get_block_trie_updates(block.number())?;
1816
1817        let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
1818        let sorted_trie_updates = Arc::new(trie_updates);
1819        // Skip building trie input and anchor for DB-loaded blocks.
1820        let trie_data =
1821            ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
1822
1823        Ok(Some(ExecutedBlock::new(
1824            Arc::new(RecoveredBlock::new_sealed(block, senders)),
1825            Arc::new(execution_output),
1826            trie_data,
1827        )))
1828    }
1829
1830    /// Return sealed block header from in-memory state or database by hash.
1831    fn sealed_header_by_hash(
1832        &self,
1833        hash: B256,
1834    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1835        // check memory first
1836        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1837
1838        if header.is_some() {
1839            Ok(header)
1840        } else {
1841            self.provider.sealed_header_by_hash(hash)
1842        }
1843    }
1844
1845    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1846    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1847    /// not exist in the buffer, this returns the hash that is passed in.
1848    ///
1849    /// Returns the parent hash of the block itself if the block is buffered and has no other
1850    /// buffered ancestors.
1851    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1852        self.state
1853            .buffer
1854            .lowest_ancestor(&hash)
1855            .map(|block| block.parent_hash())
1856            .unwrap_or_else(|| hash)
1857    }
1858
1859    /// If validation fails, the response MUST contain the latest valid hash:
1860    ///
1861    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1862    ///     conditions:
1863    ///     - It is fully validated and deemed VALID
1864    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1865    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1866    ///     conditions are satisfied by a `PoW` block.
1867    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1868    ///     the above conditions.
1869    fn latest_valid_hash_for_invalid_payload(
1870        &mut self,
1871        parent_hash: B256,
1872    ) -> ProviderResult<Option<B256>> {
1873        // Check if parent exists in side chain or in canonical chain.
1874        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1875            return Ok(Some(parent_hash))
1876        }
1877
1878        // iterate over ancestors in the invalid cache
1879        // until we encounter the first valid ancestor
1880        let mut current_hash = parent_hash;
1881        let mut current_block = self.state.invalid_headers.get(&current_hash);
1882        while let Some(block_with_parent) = current_block {
1883            current_hash = block_with_parent.parent;
1884            current_block = self.state.invalid_headers.get(&current_hash);
1885
1886            // If current_header is None, then the current_hash does not have an invalid
1887            // ancestor in the cache, check its presence in blockchain tree
1888            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1889                return Ok(Some(current_hash))
1890            }
1891        }
1892        Ok(None)
1893    }
1894
1895    /// Prepares the invalid payload response for the given hash, checking the
1896    /// database for the parent hash and populating the payload status with the latest valid hash
1897    /// according to the engine api spec.
1898    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1899        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1900        // PoW block, which we need to identify by looking at the parent's block difficulty
1901        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1902            !parent.difficulty().is_zero()
1903        {
1904            parent_hash = B256::ZERO;
1905        }
1906
1907        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1908        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1909            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1910        })
1911        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1912    }
1913
1914    /// Returns true if the given hash is the last received sync target block.
1915    ///
1916    /// See [`ForkchoiceStateTracker::sync_target_state`]
1917    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1918        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1919            return target.head_block_hash == block_hash
1920        }
1921        false
1922    }
1923
1924    /// Returns true if the given hash is part of the last received sync target fork choice update.
1925    ///
1926    /// See [`ForkchoiceStateTracker::sync_target_state`]
1927    fn is_any_sync_target(&self, block_hash: B256) -> bool {
1928        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1929            return target.contains(block_hash)
1930        }
1931        false
1932    }
1933
1934    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1935    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1936    ///
1937    /// Returns a payload status response according to the engine API spec if the block is known to
1938    /// be invalid.
1939    fn check_invalid_ancestor_with_head(
1940        &mut self,
1941        check: B256,
1942        head: &SealedBlock<N::Block>,
1943    ) -> ProviderResult<Option<PayloadStatus>> {
1944        // check if the check hash was previously marked as invalid
1945        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1946
1947        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1948    }
1949
1950    /// Invoked when a new payload received is invalid.
1951    fn on_invalid_new_payload(
1952        &mut self,
1953        head: SealedBlock<N::Block>,
1954        invalid: BlockWithParent,
1955    ) -> ProviderResult<PayloadStatus> {
1956        // populate the latest valid hash field
1957        let status = self.prepare_invalid_response(invalid.parent)?;
1958
1959        // insert the head block into the invalid header cache
1960        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1961        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1962
1963        Ok(status)
1964    }
1965
1966    /// Finds any invalid ancestor for the given payload.
1967    ///
1968    /// This function walks up the chain of buffered ancestors from the payload's block
1969    /// hash and checks if any ancestor is marked as invalid in the tree state.
1970    ///
1971    /// The check works by:
1972    /// 1. Finding the lowest buffered ancestor for the given block hash
1973    /// 2. If the ancestor is the same as the block hash itself, using the parent hash instead
1974    /// 3. Checking if this ancestor is in the `invalid_headers` map
1975    ///
1976    /// Returns the invalid ancestor block info if found, or None if no invalid ancestor exists.
1977    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1978        let parent_hash = payload.parent_hash();
1979        let block_hash = payload.block_hash();
1980        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1981        if lowest_buffered_ancestor == block_hash {
1982            lowest_buffered_ancestor = parent_hash;
1983        }
1984
1985        // Check if the block has an invalid ancestor
1986        self.state.invalid_headers.get(&lowest_buffered_ancestor)
1987    }
1988
1989    /// Handles a payload that has an invalid ancestor.
1990    ///
1991    /// This function validates the payload and processes it according to whether it's
1992    /// well-formed or malformed:
1993    /// 1. **Well-formed payload**: The payload is marked as invalid since it descends from a
1994    ///    known-bad block, which violates consensus rules
1995    /// 2. **Malformed payload**: Returns an appropriate error status since the payload cannot be
1996    ///    validated due to its own structural issues
1997    fn handle_invalid_ancestor_payload(
1998        &mut self,
1999        payload: T::ExecutionData,
2000        invalid: BlockWithParent,
2001    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2002        let parent_hash = payload.parent_hash();
2003        let num_hash = payload.num_hash();
2004
2005        // Here we might have 2 cases
2006        // 1. the block is well formed and indeed links to an invalid header, meaning we should
2007        //    remember it as invalid
2008        // 2. the block is not well formed (i.e block hash is incorrect), and we should just return
2009        //    an error and forget it
2010        let block = match self.payload_validator.convert_payload_to_block(payload) {
2011            Ok(block) => block,
2012            Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2013        };
2014
2015        Ok(self.on_invalid_new_payload(block, invalid)?)
2016    }
2017
2018    /// Checks if the given `head` points to an invalid header, which requires a specific response
2019    /// to a forkchoice update.
2020    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2021        // check if the head was previously marked as invalid
2022        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2023
2024        // Try to prepare invalid response, but handle errors gracefully
2025        match self.prepare_invalid_response(header.parent) {
2026            Ok(status) => Ok(Some(status)),
2027            Err(err) => {
2028                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2029                // Return a basic invalid status without latest valid hash
2030                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2031                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2032                })))
2033            }
2034        }
2035    }
2036
2037    /// Validate if block is correct and satisfies all the consensus rules that concern the header
2038    /// and block body itself.
2039    fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2040        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2041            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2042            return Err(e)
2043        }
2044
2045        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2046            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2047            return Err(e)
2048        }
2049
2050        Ok(())
2051    }
2052
2053    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
2054    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2055    fn try_connect_buffered_blocks(
2056        &mut self,
2057        parent: BlockNumHash,
2058    ) -> Result<(), InsertBlockFatalError> {
2059        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2060
2061        if blocks.is_empty() {
2062            // nothing to append
2063            return Ok(())
2064        }
2065
2066        let now = Instant::now();
2067        let block_count = blocks.len();
2068        for child in blocks {
2069            let child_num_hash = child.num_hash();
2070            match self.insert_block(child) {
2071                Ok(res) => {
2072                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2073                    if self.is_any_sync_target(child_num_hash.hash) &&
2074                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2075                    {
2076                        debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2077                        // we just inserted a block that we know is part of the canonical chain, so
2078                        // we can make it canonical
2079                        self.make_canonical(child_num_hash.hash)?;
2080                    }
2081                }
2082                Err(err) => {
2083                    if let InsertPayloadError::Block(err) = err {
2084                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2085                        if let Err(fatal) = self.on_insert_block_error(err) {
2086                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2087                            return Err(fatal)
2088                        }
2089                    }
2090                }
2091            }
2092        }
2093
2094        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2095        Ok(())
2096    }
2097
2098    /// Pre-validates the block and inserts it into the buffer.
2099    fn buffer_block(
2100        &mut self,
2101        block: SealedBlock<N::Block>,
2102    ) -> Result<(), InsertBlockError<N::Block>> {
2103        if let Err(err) = self.validate_block(&block) {
2104            return Err(InsertBlockError::consensus_error(err, block))
2105        }
2106        self.state.buffer.insert_block(block);
2107        Ok(())
2108    }
2109
2110    /// Returns true if the distance from the local tip to the block is greater than the configured
2111    /// threshold.
2112    ///
2113    /// If the `local_tip` is greater than the `block`, then this will return false.
2114    #[inline]
2115    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2116        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2117    }
2118
2119    /// Returns how far the local tip is from the given block. If the local tip is at the same
2120    /// height or its block number is greater than the given block, this returns None.
2121    #[inline]
2122    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2123        if block > local_tip {
2124            Some(block - local_tip)
2125        } else {
2126            None
2127        }
2128    }
2129
2130    /// Returns the target hash to sync to if the distance from the local tip to the block is
2131    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
2132    /// that block already).
2133    ///
2134    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
2135    /// (missing) finalized block.
2136    fn backfill_sync_target(
2137        &self,
2138        canonical_tip_num: u64,
2139        target_block_number: u64,
2140        downloaded_block: Option<BlockNumHash>,
2141    ) -> Option<B256> {
2142        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2143
2144        // check if the downloaded block is the tracked finalized block
2145        let exceeds_backfill_threshold =
2146            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2147                // if we downloaded the finalized block we can now check how far we're off
2148                (Some(downloaded_block), Some(state))
2149                    if downloaded_block.hash == state.finalized_block_hash =>
2150                {
2151                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2152                }
2153                _ => match sync_target_state
2154                    .as_ref()
2155                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2156                {
2157                    Some(buffered_finalized) => {
2158                        // if we have buffered the finalized block, we should check how far we're
2159                        // off
2160                        self.exceeds_backfill_run_threshold(
2161                            canonical_tip_num,
2162                            buffered_finalized.number(),
2163                        )
2164                    }
2165                    None => {
2166                        // check if the distance exceeds the threshold for backfill sync
2167                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2168                    }
2169                },
2170            };
2171
2172        // if the number of missing blocks is greater than the max, trigger backfill
2173        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2174            // if we have already canonicalized the finalized block, we should skip backfill
2175            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2176                Err(err) => {
2177                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2178                }
2179                Ok(None) => {
2180                    // ensure the finalized block is known (not the zero hash)
2181                    if !state.finalized_block_hash.is_zero() {
2182                        // we don't have the block yet and the distance exceeds the allowed
2183                        // threshold
2184                        return Some(state.finalized_block_hash)
2185                    }
2186
2187                    // OPTIMISTIC SYNCING
2188                    //
2189                    // It can happen when the node is doing an
2190                    // optimistic sync, where the CL has no knowledge of the finalized hash,
2191                    // but is expecting the EL to sync as high
2192                    // as possible before finalizing.
2193                    //
2194                    // This usually doesn't happen on ETH mainnet since CLs use the more
2195                    // secure checkpoint syncing.
2196                    //
2197                    // However, optimism chains will do this. The risk of a reorg is however
2198                    // low.
2199                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2200                    return Some(state.head_block_hash)
2201                }
2202                Ok(Some(_)) => {
2203                    // we're fully synced to the finalized block
2204                }
2205            }
2206        }
2207
2208        None
2209    }
2210
2211    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
2212    /// happen if a reorg is happening while we are persisting a block.
2213    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2214        let mut canonical = self.state.tree_state.current_canonical_head;
2215        let mut persisted = self.persistence_state.last_persisted_block;
2216
2217        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2218            Ok(self
2219                .sealed_header_by_hash(num_hash.hash)?
2220                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2221                .parent_num_hash())
2222        };
2223
2224        // Happy path, canonical chain is ahead or equal to persisted chain.
2225        // Walk canonical chain back to make sure that it connects to persisted chain.
2226        while canonical.number > persisted.number {
2227            canonical = parent_num_hash(canonical)?;
2228        }
2229
2230        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
2231        if canonical == persisted {
2232            return Ok(None);
2233        }
2234
2235        // At this point, we know that `persisted` block can't be reached by walking the canonical
2236        // chain back. In this case we need to truncate it to the first canonical block it connects
2237        // to.
2238
2239        // Firstly, walk back until we reach the same height as `canonical`.
2240        while persisted.number > canonical.number {
2241            persisted = parent_num_hash(persisted)?;
2242        }
2243
2244        debug_assert_eq!(persisted.number, canonical.number);
2245
2246        // Now walk both chains back until we find a common ancestor.
2247        while persisted.hash != canonical.hash {
2248            canonical = parent_num_hash(canonical)?;
2249            persisted = parent_num_hash(persisted)?;
2250        }
2251
2252        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2253
2254        Ok(Some(persisted.number))
2255    }
2256
2257    /// Invoked when we the canonical chain has been updated.
2258    ///
2259    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2260    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2261        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2262        let start = Instant::now();
2263
2264        // update the tracked canonical head
2265        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2266
2267        let tip = chain_update.tip().clone_sealed_header();
2268        let notification = chain_update.to_chain_notification();
2269
2270        // reinsert any missing reorged blocks
2271        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2272            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2273            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2274            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2275
2276            self.update_reorg_metrics(old.len());
2277            self.reinsert_reorged_blocks(new.clone());
2278            self.reinsert_reorged_blocks(old.clone());
2279        }
2280
2281        // update the tracked in-memory state with the new chain
2282        self.canonical_in_memory_state.update_chain(chain_update);
2283        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2284
2285        // Update metrics based on new tip
2286        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2287
2288        // sends an event to all active listeners about the new canonical chain
2289        self.canonical_in_memory_state.notify_canon_state(notification);
2290
2291        // emit event
2292        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2293            Box::new(tip),
2294            start.elapsed(),
2295        ));
2296    }
2297
2298    /// This updates metrics based on the given reorg length.
2299    fn update_reorg_metrics(&self, old_chain_length: usize) {
2300        self.metrics.tree.reorgs.increment(1);
2301        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2302    }
2303
2304    /// This reinserts any blocks in the new chain that do not already exist in the tree
2305    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2306        for block in new_chain {
2307            if self
2308                .state
2309                .tree_state
2310                .executed_block_by_hash(block.recovered_block().hash())
2311                .is_none()
2312            {
2313                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2314                self.state.tree_state.insert_executed(block);
2315            }
2316        }
2317    }
2318
2319    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2320    ///
2321    /// This mainly compares the missing parent of the downloaded block with the current canonical
2322    /// tip, and decides whether or not backfill sync should be triggered.
2323    fn on_disconnected_downloaded_block(
2324        &self,
2325        downloaded_block: BlockNumHash,
2326        missing_parent: BlockNumHash,
2327        head: BlockNumHash,
2328    ) -> Option<TreeEvent> {
2329        // compare the missing parent with the canonical tip
2330        if let Some(target) =
2331            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2332        {
2333            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2334            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2335        }
2336
2337        // continue downloading the missing parent
2338        //
2339        // this happens if either:
2340        //  * the missing parent block num < canonical tip num
2341        //    * this case represents a missing block on a fork that is shorter than the canonical
2342        //      chain
2343        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2344        //    less than the backfill threshold
2345        //    * this case represents a potentially long range of blocks to download and execute
2346        let request = if let Some(distance) =
2347            self.distance_from_local_tip(head.number, missing_parent.number)
2348        {
2349            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2350            DownloadRequest::BlockRange(missing_parent.hash, distance)
2351        } else {
2352            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2353            // This happens when the missing parent is on an outdated
2354            // sidechain and we can only download the missing block itself
2355            DownloadRequest::single_block(missing_parent.hash)
2356        };
2357
2358        Some(TreeEvent::Download(request))
2359    }
2360
2361    /// Invoked with a block downloaded from the network
2362    ///
2363    /// Returns an event with the appropriate action to take, such as:
2364    ///  - download more missing blocks
2365    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2366    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2367    fn on_downloaded_block(
2368        &mut self,
2369        block: SealedBlock<N::Block>,
2370    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2371        let block_num_hash = block.num_hash();
2372        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2373        if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2374            return Ok(None)
2375        }
2376
2377        if !self.backfill_sync_state.is_idle() {
2378            return Ok(None)
2379        }
2380
2381        // try to append the block
2382        match self.insert_block(block) {
2383            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2384                // check if we just inserted a block that's part of sync targets,
2385                // i.e. head, safe, or finalized
2386                if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2387                    sync_target.contains(block_num_hash.hash)
2388                {
2389                    debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2390
2391                    // we just inserted a block that we know is part of the canonical chain, so we
2392                    // can make it canonical
2393                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2394                        sync_target_head: block_num_hash.hash,
2395                    })))
2396                }
2397                trace!(target: "engine::tree", "appended downloaded block");
2398                self.try_connect_buffered_blocks(block_num_hash)?;
2399            }
2400            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2401                // block is not connected to the canonical head, we need to download
2402                // its missing branch first
2403                return Ok(self.on_disconnected_downloaded_block(
2404                    block_num_hash,
2405                    missing_ancestor,
2406                    head,
2407                ))
2408            }
2409            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2410                trace!(target: "engine::tree", "downloaded block already executed");
2411            }
2412            Err(err) => {
2413                if let InsertPayloadError::Block(err) = err {
2414                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2415                    if let Err(fatal) = self.on_insert_block_error(err) {
2416                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2417                        return Err(fatal)
2418                    }
2419                }
2420            }
2421        }
2422        Ok(None)
2423    }
2424
2425    /// Inserts a payload into the tree and executes it.
2426    ///
2427    /// This function validates the payload's basic structure, then executes it using the
2428    /// payload validator. The execution includes running all transactions in the payload
2429    /// and validating the resulting state transitions.
2430    ///
2431    /// Returns `InsertPayloadOk` if the payload was successfully inserted and executed,
2432    /// or `InsertPayloadError` if validation or execution failed.
2433    fn insert_payload(
2434        &mut self,
2435        payload: T::ExecutionData,
2436    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2437        self.insert_block_or_payload(
2438            payload.block_with_parent(),
2439            payload,
2440            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2441            |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2442        )
2443    }
2444
2445    fn insert_block(
2446        &mut self,
2447        block: SealedBlock<N::Block>,
2448    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2449        self.insert_block_or_payload(
2450            block.block_with_parent(),
2451            block,
2452            |validator, block, ctx| validator.validate_block(block, ctx),
2453            |_, block| Ok(block),
2454        )
2455    }
2456
2457    /// Inserts a block or payload into the blockchain tree with full execution.
2458    ///
2459    /// This is a generic function that handles both blocks and payloads by accepting
2460    /// a block identifier, input data, and execution/validation functions. It performs
2461    /// comprehensive checks and execution:
2462    ///
2463    /// - Validates that the block doesn't already exist in the tree
2464    /// - Ensures parent state is available, buffering if necessary
2465    /// - Executes the block/payload using the provided execute function
2466    /// - Handles both canonical and fork chain insertions
2467    /// - Updates pending block state when appropriate
2468    /// - Emits consensus engine events and records metrics
2469    ///
2470    /// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
2471    /// `InsertPayloadOk::AlreadySeen` if the block already exists, or
2472    /// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2473    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2474    fn insert_block_or_payload<Input, Err>(
2475        &mut self,
2476        block_id: BlockWithParent,
2477        input: Input,
2478        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2479        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2480    ) -> Result<InsertPayloadOk, Err>
2481    where
2482        Err: From<InsertBlockError<N::Block>>,
2483    {
2484        let block_insert_start = Instant::now();
2485        let block_num_hash = block_id.block;
2486        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2487
2488        match self.sealed_header_by_hash(block_num_hash.hash) {
2489            Err(err) => {
2490                let block = convert_to_block(self, input)?;
2491                return Err(InsertBlockError::new(block, err.into()).into());
2492            }
2493            Ok(Some(_)) => {
2494                // We now assume that we already have this block in the tree. However, we need to
2495                // run the conversion to ensure that the block hash is valid.
2496                convert_to_block(self, input)?;
2497                return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2498            }
2499            _ => {}
2500        };
2501
2502        // Ensure that the parent state is available.
2503        match self.state_provider_builder(block_id.parent) {
2504            Err(err) => {
2505                let block = convert_to_block(self, input)?;
2506                return Err(InsertBlockError::new(block, err.into()).into());
2507            }
2508            Ok(None) => {
2509                let block = convert_to_block(self, input)?;
2510
2511                // we don't have the state required to execute this block, buffering it and find the
2512                // missing parent block
2513                let missing_ancestor = self
2514                    .state
2515                    .buffer
2516                    .lowest_ancestor(&block.parent_hash())
2517                    .map(|block| block.parent_num_hash())
2518                    .unwrap_or_else(|| block.parent_num_hash());
2519
2520                self.state.buffer.insert_block(block);
2521
2522                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2523                    head: self.state.tree_state.current_canonical_head,
2524                    missing_ancestor,
2525                }))
2526            }
2527            Ok(Some(_)) => {}
2528        }
2529
2530        // determine whether we are on a fork chain by comparing the block number with the
2531        // canonical head. This is a simple check that is sufficient for the event emission below.
2532        // A block is considered a fork if its number is less than or equal to the canonical head,
2533        // as this indicates there's already a canonical block at that height.
2534        let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2535
2536        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2537
2538        let start = Instant::now();
2539
2540        let executed = execute(&mut self.payload_validator, input, ctx)?;
2541
2542        // if the parent is the canonical head, we can insert the block as the pending block
2543        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2544        {
2545            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2546            self.canonical_in_memory_state.set_pending_block(executed.clone());
2547        }
2548
2549        self.state.tree_state.insert_executed(executed.clone());
2550        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2551
2552        // emit insert event
2553        let elapsed = start.elapsed();
2554        let engine_event = if is_fork {
2555            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2556        } else {
2557            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2558        };
2559        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2560
2561        self.metrics
2562            .engine
2563            .block_insert_total_duration
2564            .record(block_insert_start.elapsed().as_secs_f64());
2565        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2566        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2567    }
2568
2569    /// Handles an error that occurred while inserting a block.
2570    ///
2571    /// If this is a validation error this will mark the block as invalid.
2572    ///
2573    /// Returns the proper payload status response if the block is invalid.
2574    fn on_insert_block_error(
2575        &mut self,
2576        error: InsertBlockError<N::Block>,
2577    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2578        let (block, error) = error.split();
2579
2580        // if invalid block, we check the validation error. Otherwise return the fatal
2581        // error.
2582        let validation_err = error.ensure_validation_error()?;
2583
2584        // If the error was due to an invalid payload, the payload is added to the
2585        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2586        // returned.
2587        warn!(
2588            target: "engine::tree",
2589            invalid_hash=%block.hash(),
2590            invalid_number=block.number(),
2591            %validation_err,
2592            "Invalid block error on new payload",
2593        );
2594        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2595
2596        // keep track of the invalid header
2597        self.state.invalid_headers.insert(block.block_with_parent());
2598        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2599            Box::new(block),
2600        )));
2601
2602        Ok(PayloadStatus::new(
2603            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2604            latest_valid_hash,
2605        ))
2606    }
2607
2608    /// Handles a [`NewPayloadError`] by converting it to a [`PayloadStatus`].
2609    fn on_new_payload_error(
2610        &mut self,
2611        error: NewPayloadError,
2612        payload_num_hash: NumHash,
2613        parent_hash: B256,
2614    ) -> ProviderResult<PayloadStatus> {
2615        error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2616        // we need to convert the error to a payload status (response to the CL)
2617
2618        let latest_valid_hash =
2619            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2620                // Engine-API rules:
2621                // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
2622                // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
2623                None
2624            } else {
2625                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2626            };
2627
2628        let status = PayloadStatusEnum::from(error);
2629        Ok(PayloadStatus::new(status, latest_valid_hash))
2630    }
2631
2632    /// Attempts to find the header for the given block hash if it is canonical.
2633    pub fn find_canonical_header(
2634        &self,
2635        hash: B256,
2636    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2637        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2638
2639        if canonical.is_none() {
2640            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2641        }
2642
2643        Ok(canonical)
2644    }
2645
2646    /// Updates the tracked finalized block if we have it.
2647    fn update_finalized_block(
2648        &self,
2649        finalized_block_hash: B256,
2650    ) -> Result<(), OnForkChoiceUpdated> {
2651        if finalized_block_hash.is_zero() {
2652            return Ok(())
2653        }
2654
2655        match self.find_canonical_header(finalized_block_hash) {
2656            Ok(None) => {
2657                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2658                // if the finalized block is not known, we can't update the finalized block
2659                return Err(OnForkChoiceUpdated::invalid_state())
2660            }
2661            Ok(Some(finalized)) => {
2662                if Some(finalized.num_hash()) !=
2663                    self.canonical_in_memory_state.get_finalized_num_hash()
2664                {
2665                    // we're also persisting the finalized block on disk so we can reload it on
2666                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2667                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2668                    self.canonical_in_memory_state.set_finalized(finalized.clone());
2669                    // Update finalized block height metric
2670                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2671                }
2672            }
2673            Err(err) => {
2674                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2675            }
2676        }
2677
2678        Ok(())
2679    }
2680
2681    /// Updates the tracked safe block if we have it
2682    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2683        if safe_block_hash.is_zero() {
2684            return Ok(())
2685        }
2686
2687        match self.find_canonical_header(safe_block_hash) {
2688            Ok(None) => {
2689                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2690                // if the safe block is not known, we can't update the safe block
2691                return Err(OnForkChoiceUpdated::invalid_state())
2692            }
2693            Ok(Some(safe)) => {
2694                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2695                    // we're also persisting the safe block on disk so we can reload it on
2696                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2697                    let _ = self.persistence.save_safe_block_number(safe.number());
2698                    self.canonical_in_memory_state.set_safe(safe.clone());
2699                    // Update safe block height metric
2700                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
2701                }
2702            }
2703            Err(err) => {
2704                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2705            }
2706        }
2707
2708        Ok(())
2709    }
2710
2711    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2712    /// made canonical.
2713    ///
2714    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2715    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2716    ///
2717    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2718    /// are consistent with the head block.
2719    fn ensure_consistent_forkchoice_state(
2720        &self,
2721        state: ForkchoiceState,
2722    ) -> Result<(), OnForkChoiceUpdated> {
2723        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2724        // after the head block is canonicalized.
2725        //
2726        // This ensures that the finalized block is consistent with the head block, i.e. the
2727        // finalized block is an ancestor of the head block.
2728        self.update_finalized_block(state.finalized_block_hash)?;
2729
2730        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2731        // after the head block is canonicalized.
2732        //
2733        // This ensures that the safe block is consistent with the head block, i.e. the safe
2734        // block is an ancestor of the head block.
2735        self.update_safe_block(state.safe_block_hash)
2736    }
2737
2738    /// Validates the payload attributes with respect to the header and fork choice state.
2739    ///
2740    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2741    /// return an error if the payload attributes are invalid.
2742    fn process_payload_attributes(
2743        &self,
2744        attrs: T::PayloadAttributes,
2745        head: &N::BlockHeader,
2746        state: ForkchoiceState,
2747        version: EngineApiMessageVersion,
2748    ) -> OnForkChoiceUpdated {
2749        if let Err(err) =
2750            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2751        {
2752            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2753            return OnForkChoiceUpdated::invalid_payload_attributes()
2754        }
2755
2756        // 8. Client software MUST begin a payload build process building on top of
2757        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2758        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2759        //    The build process is specified in the Payload building section.
2760        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2761            state.head_block_hash,
2762            attrs,
2763            version as u8,
2764        ) {
2765            Ok(attributes) => {
2766                // send the payload to the builder and return the receiver for the pending payload
2767                // id, initiating payload job is handled asynchronously
2768                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2769
2770                // Client software MUST respond to this method call in the following way:
2771                // {
2772                //      payloadStatus: {
2773                //          status: VALID,
2774                //          latestValidHash: forkchoiceState.headBlockHash,
2775                //          validationError: null
2776                //      },
2777                //      payloadId: buildProcessId
2778                // }
2779                //
2780                // if the payload is deemed VALID and the build process has begun.
2781                OnForkChoiceUpdated::updated_with_pending_payload_id(
2782                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2783                    pending_payload_id,
2784                )
2785            }
2786            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2787        }
2788    }
2789
2790    /// Remove all blocks up to __and including__ the given block number.
2791    ///
2792    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2793    /// those which have a fork point at or below the finalized hash.
2794    ///
2795    /// Canonical blocks below the upper bound will still be removed.
2796    pub(crate) fn remove_before(
2797        &mut self,
2798        upper_bound: BlockNumHash,
2799        finalized_hash: Option<B256>,
2800    ) -> ProviderResult<()> {
2801        // first fetch the finalized block number and then call the remove_before method on
2802        // tree_state
2803        let num = if let Some(hash) = finalized_hash {
2804            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2805        } else {
2806            None
2807        };
2808
2809        self.state.tree_state.remove_until(
2810            upper_bound,
2811            self.persistence_state.last_persisted_block.hash,
2812            num,
2813        );
2814        Ok(())
2815    }
2816
2817    /// Returns a builder for creating state providers for the given hash.
2818    ///
2819    /// This is an optimization for parallel execution contexts where we want to avoid
2820    /// creating state providers in the critical path.
2821    pub fn state_provider_builder(
2822        &self,
2823        hash: B256,
2824    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2825    where
2826        P: BlockReader + StateProviderFactory + StateReader + Clone,
2827    {
2828        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2829            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2830            // the block leads back to the canonical chain
2831            return Ok(Some(StateProviderBuilder::new(
2832                self.provider.clone(),
2833                historical,
2834                Some(blocks),
2835            )))
2836        }
2837
2838        // Check if the block is persisted
2839        if let Some(header) = self.provider.header(hash)? {
2840            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2841            // For persisted blocks, we create a builder that will fetch state directly from the
2842            // database
2843            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2844        }
2845
2846        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2847        Ok(None)
2848    }
2849}
2850
2851/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2852/// variant.
2853///
2854/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2855/// is valid or not.
2856#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2857pub enum BlockStatus {
2858    /// The block is valid and block extends canonical chain.
2859    Valid,
2860    /// The block may be valid and has an unknown missing ancestor.
2861    Disconnected {
2862        /// Current canonical head.
2863        head: BlockNumHash,
2864        /// The lowest ancestor block that is not connected to the canonical chain.
2865        missing_ancestor: BlockNumHash,
2866    },
2867}
2868
2869/// How a payload was inserted if it was valid.
2870///
2871/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen`] is
2872/// returned, otherwise [`InsertPayloadOk::Inserted`] is returned.
2873#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2874pub enum InsertPayloadOk {
2875    /// The payload was valid, but we have already seen it.
2876    AlreadySeen(BlockStatus),
2877    /// The payload was valid and inserted into the tree.
2878    Inserted(BlockStatus),
2879}
2880
2881/// Target for block persistence.
2882#[derive(Debug, Clone, Copy)]
2883enum PersistTarget {
2884    /// Persist up to `canonical_head - memory_block_buffer_target`.
2885    Threshold,
2886    /// Persist all blocks up to and including the canonical head.
2887    Head,
2888}