1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    chain::FromOrchestrator,
4    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5    persistence::PersistenceHandle,
6    tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use reth_chain_state::{
17    CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21    BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22    ForkchoiceStateTracker, OnForkChoiceUpdated,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::{ConfigureEvm, OnStateHook};
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{
28    BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
29};
30use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
31use reth_provider::{
32    BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
33    StateProviderFactory, StateReader, TransactionVariant, TrieReader,
34};
35use reth_revm::database::StateProviderDatabase;
36use reth_stages_api::ControlFlow;
37use revm::state::EvmState;
38use state::TreeState;
39use std::{
40    fmt::Debug,
41    sync::{
42        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
43        Arc,
44    },
45    time::Instant,
46};
47use tokio::sync::{
48    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
49    oneshot::{self, error::TryRecvError},
50};
51use tracing::*;
52
53mod block_buffer;
54mod cached_state;
55pub mod error;
56mod instrumented_state;
57mod invalid_headers;
58mod metrics;
59mod payload_processor;
60pub mod payload_validator;
61mod persistence_state;
62pub mod precompile_cache;
63#[cfg(test)]
64mod tests;
65#[expect(unused)]
67mod trie_updates;
68
69use crate::tree::error::AdvancePersistenceError;
70pub use block_buffer::BlockBuffer;
71pub use invalid_headers::InvalidHeaderCache;
72pub use payload_processor::*;
73pub use payload_validator::{BasicEngineValidator, EngineValidator};
74pub use persistence_state::PersistenceState;
75pub use reth_engine_primitives::TreeConfig;
76
77pub mod state;
78
79pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
89
90#[derive(Clone, Debug)]
92pub struct StateProviderBuilder<N: NodePrimitives, P> {
93    provider_factory: P,
95    historical: B256,
97    overlay: Option<Vec<ExecutedBlock<N>>>,
99}
100
101impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
102    pub const fn new(
105        provider_factory: P,
106        historical: B256,
107        overlay: Option<Vec<ExecutedBlock<N>>>,
108    ) -> Self {
109        Self { provider_factory, historical, overlay }
110    }
111}
112
113impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
114where
115    P: BlockReader + StateProviderFactory + StateReader + Clone,
116{
117    pub fn build(&self) -> ProviderResult<StateProviderBox> {
119        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
120        if let Some(overlay) = self.overlay.clone() {
121            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
122        }
123        Ok(provider)
124    }
125}
126
127#[derive(Debug)]
131pub struct EngineApiTreeState<N: NodePrimitives> {
132    tree_state: TreeState<N>,
134    forkchoice_state_tracker: ForkchoiceStateTracker,
136    buffer: BlockBuffer<N::Block>,
138    invalid_headers: InvalidHeaderCache,
141}
142
143impl<N: NodePrimitives> EngineApiTreeState<N> {
144    fn new(
145        block_buffer_limit: u32,
146        max_invalid_header_cache_length: u32,
147        canonical_block: BlockNumHash,
148        engine_kind: EngineApiKind,
149    ) -> Self {
150        Self {
151            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
152            buffer: BlockBuffer::new(block_buffer_limit),
153            tree_state: TreeState::new(canonical_block, engine_kind),
154            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
155        }
156    }
157}
158
159#[derive(Debug)]
161pub struct TreeOutcome<T> {
162    pub outcome: T,
164    pub event: Option<TreeEvent>,
166}
167
168impl<T> TreeOutcome<T> {
169    pub const fn new(outcome: T) -> Self {
171        Self { outcome, event: None }
172    }
173
174    pub fn with_event(mut self, event: TreeEvent) -> Self {
176        self.event = Some(event);
177        self
178    }
179}
180
181#[derive(Debug)]
183pub enum TreeEvent {
184    TreeAction(TreeAction),
186    BackfillAction(BackfillAction),
188    Download(DownloadRequest),
190}
191
192impl TreeEvent {
193    const fn is_backfill_action(&self) -> bool {
195        matches!(self, Self::BackfillAction(_))
196    }
197}
198
199#[derive(Debug)]
201pub enum TreeAction {
202    MakeCanonical {
204        sync_target_head: B256,
206    },
207}
208
209struct MeteredStateHook {
211    metrics: reth_evm::metrics::ExecutorMetrics,
212    inner_hook: Box<dyn OnStateHook>,
213}
214
215impl OnStateHook for MeteredStateHook {
216    fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
217        let accounts = state.keys().len();
219        let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
220        let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
221
222        self.metrics.accounts_loaded_histogram.record(accounts as f64);
223        self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
224        self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
225
226        self.inner_hook.on_state(source, state);
228    }
229}
230
231pub struct EngineApiTreeHandler<N, P, T, V, C>
236where
237    N: NodePrimitives,
238    T: PayloadTypes,
239    C: ConfigureEvm<Primitives = N> + 'static,
240{
241    provider: P,
242    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
243    payload_validator: V,
244    state: EngineApiTreeState<N>,
246    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
255    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
257    outgoing: UnboundedSender<EngineApiEvent<N>>,
259    persistence: PersistenceHandle<N>,
261    persistence_state: PersistenceState,
263    backfill_sync_state: BackfillSyncState,
265    canonical_in_memory_state: CanonicalInMemoryState<N>,
268    payload_builder: PayloadBuilderHandle<T>,
271    config: TreeConfig,
273    metrics: EngineApiMetrics,
275    engine_kind: EngineApiKind,
277    evm_config: C,
279}
280
281impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
282    for EngineApiTreeHandler<N, P, T, V, C>
283where
284    N: NodePrimitives,
285    C: Debug + ConfigureEvm<Primitives = N>,
286{
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        f.debug_struct("EngineApiTreeHandler")
289            .field("provider", &self.provider)
290            .field("consensus", &self.consensus)
291            .field("payload_validator", &self.payload_validator)
292            .field("state", &self.state)
293            .field("incoming_tx", &self.incoming_tx)
294            .field("persistence", &self.persistence)
295            .field("persistence_state", &self.persistence_state)
296            .field("backfill_sync_state", &self.backfill_sync_state)
297            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
298            .field("payload_builder", &self.payload_builder)
299            .field("config", &self.config)
300            .field("metrics", &self.metrics)
301            .field("engine_kind", &self.engine_kind)
302            .field("evm_config", &self.evm_config)
303            .finish()
304    }
305}
306
307impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
308where
309    N: NodePrimitives,
310    P: DatabaseProviderFactory
311        + BlockReader<Block = N::Block, Header = N::BlockHeader>
312        + StateProviderFactory
313        + StateReader<Receipt = N::Receipt>
314        + HashedPostStateProvider
315        + TrieReader
316        + Clone
317        + 'static,
318    <P as DatabaseProviderFactory>::Provider:
319        BlockReader<Block = N::Block, Header = N::BlockHeader>,
320    C: ConfigureEvm<Primitives = N> + 'static,
321    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
322    V: EngineValidator<T>,
323{
324    #[expect(clippy::too_many_arguments)]
326    pub fn new(
327        provider: P,
328        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
329        payload_validator: V,
330        outgoing: UnboundedSender<EngineApiEvent<N>>,
331        state: EngineApiTreeState<N>,
332        canonical_in_memory_state: CanonicalInMemoryState<N>,
333        persistence: PersistenceHandle<N>,
334        persistence_state: PersistenceState,
335        payload_builder: PayloadBuilderHandle<T>,
336        config: TreeConfig,
337        engine_kind: EngineApiKind,
338        evm_config: C,
339    ) -> Self {
340        let (incoming_tx, incoming) = std::sync::mpsc::channel();
341
342        Self {
343            provider,
344            consensus,
345            payload_validator,
346            incoming,
347            outgoing,
348            persistence,
349            persistence_state,
350            backfill_sync_state: BackfillSyncState::Idle,
351            state,
352            canonical_in_memory_state,
353            payload_builder,
354            config,
355            metrics: Default::default(),
356            incoming_tx,
357            engine_kind,
358            evm_config,
359        }
360    }
361
362    #[expect(clippy::complexity)]
368    pub fn spawn_new(
369        provider: P,
370        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
371        payload_validator: V,
372        persistence: PersistenceHandle<N>,
373        payload_builder: PayloadBuilderHandle<T>,
374        canonical_in_memory_state: CanonicalInMemoryState<N>,
375        config: TreeConfig,
376        kind: EngineApiKind,
377        evm_config: C,
378    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
379    {
380        let best_block_number = provider.best_block_number().unwrap_or(0);
381        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
382
383        let persistence_state = PersistenceState {
384            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
385            rx: None,
386        };
387
388        let (tx, outgoing) = unbounded_channel();
389        let state = EngineApiTreeState::new(
390            config.block_buffer_limit(),
391            config.max_invalid_header_cache_length(),
392            header.num_hash(),
393            kind,
394        );
395
396        let task = Self::new(
397            provider,
398            consensus,
399            payload_validator,
400            tx,
401            state,
402            canonical_in_memory_state,
403            persistence,
404            persistence_state,
405            payload_builder,
406            config,
407            kind,
408            evm_config,
409        );
410        let incoming = task.incoming_tx.clone();
411        std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
412        (incoming, outgoing)
413    }
414
415    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
417        self.incoming_tx.clone()
418    }
419
420    pub fn run(mut self) {
424        loop {
425            match self.try_recv_engine_message() {
426                Ok(Some(msg)) => {
427                    debug!(target: "engine::tree", %msg, "received new engine message");
428                    if let Err(fatal) = self.on_engine_message(msg) {
429                        error!(target: "engine::tree", %fatal, "insert block fatal error");
430                        return
431                    }
432                }
433                Ok(None) => {
434                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
435                }
436                Err(_err) => {
437                    error!(target: "engine::tree", "Engine channel disconnected");
438                    return
439                }
440            }
441
442            if let Err(err) = self.advance_persistence() {
443                error!(target: "engine::tree", %err, "Advancing persistence failed");
444                return
445            }
446        }
447    }
448
449    fn on_downloaded(
455        &mut self,
456        mut blocks: Vec<RecoveredBlock<N::Block>>,
457    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
458        if blocks.is_empty() {
459            return Ok(None)
461        }
462
463        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
464        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
465        for block in blocks.drain(..batch) {
466            if let Some(event) = self.on_downloaded_block(block)? {
467                let needs_backfill = event.is_backfill_action();
468                self.on_tree_event(event)?;
469                if needs_backfill {
470                    return Ok(None)
472                }
473            }
474        }
475
476        if !blocks.is_empty() {
478            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
479        }
480
481        Ok(None)
482    }
483
484    #[instrument(
499        level = "debug",
500        target = "engine::tree",
501        skip_all,
502        fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
503    )]
504    fn on_new_payload(
505        &mut self,
506        payload: T::ExecutionData,
507    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508        trace!(target: "engine::tree", "invoked new payload");
509        self.metrics.engine.new_payload_messages.increment(1);
510
511        let start = Instant::now();
513
514        let num_hash = payload.num_hash();
541        let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
542        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
543
544        let block_hash = num_hash.hash;
545
546        if let Some(invalid) = self.find_invalid_ancestor(&payload) {
548            let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
549            return Ok(TreeOutcome::new(status));
550        }
551
552        self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
554
555        let status = if self.backfill_sync_state.is_idle() {
556            self.try_insert_payload(payload)?
557        } else {
558            self.try_buffer_payload(payload)?
559        };
560
561        let mut outcome = TreeOutcome::new(status);
562        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
564            if self.state.tree_state.canonical_block_hash() != block_hash {
566                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
567                    sync_target_head: block_hash,
568                }));
569            }
570        }
571
572        self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
574
575        Ok(outcome)
576    }
577
578    #[instrument(level = "debug", target = "engine::tree", skip_all)]
585    fn try_insert_payload(
586        &mut self,
587        payload: T::ExecutionData,
588    ) -> Result<PayloadStatus, InsertBlockFatalError> {
589        let block_hash = payload.block_hash();
590        let num_hash = payload.num_hash();
591        let parent_hash = payload.parent_hash();
592        let mut latest_valid_hash = None;
593
594        match self.insert_payload(payload) {
595            Ok(status) => {
596                let status = match status {
597                    InsertPayloadOk::Inserted(BlockStatus::Valid) => {
598                        latest_valid_hash = Some(block_hash);
599                        self.try_connect_buffered_blocks(num_hash)?;
600                        PayloadStatusEnum::Valid
601                    }
602                    InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
603                        latest_valid_hash = Some(block_hash);
604                        PayloadStatusEnum::Valid
605                    }
606                    InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
607                    InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
608                        PayloadStatusEnum::Syncing
610                    }
611                };
612
613                Ok(PayloadStatus::new(status, latest_valid_hash))
614            }
615            Err(error) => match error {
616                InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
617                InsertPayloadError::Payload(error) => {
618                    Ok(self.on_new_payload_error(error, parent_hash)?)
619                }
620            },
621        }
622    }
623
624    fn try_buffer_payload(
633        &mut self,
634        payload: T::ExecutionData,
635    ) -> Result<PayloadStatus, InsertBlockFatalError> {
636        let parent_hash = payload.parent_hash();
637
638        match self.payload_validator.ensure_well_formed_payload(payload) {
639            Ok(block) => {
641                if let Err(error) = self.buffer_block(block) {
642                    Ok(self.on_insert_block_error(error)?)
643                } else {
644                    Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
645                }
646            }
647            Err(error) => Ok(self.on_new_payload_error(error, parent_hash)?),
648        }
649    }
650
651    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
658        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
660            debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
661            self.metrics.engine.executed_new_block_cache_miss.increment(1);
662            return Ok(None)
663        };
664
665        let new_head_number = new_head_block.recovered_block().number();
666        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
667
668        let mut new_chain = vec![new_head_block.clone()];
669        let mut current_hash = new_head_block.recovered_block().parent_hash();
670        let mut current_number = new_head_number - 1;
671
672        while current_number > current_canonical_number {
677            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
678            {
679                current_hash = block.recovered_block().parent_hash();
680                current_number -= 1;
681                new_chain.push(block);
682            } else {
683                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
684                return Ok(None)
687            }
688        }
689
690        if current_hash == self.state.tree_state.current_canonical_head.hash {
693            new_chain.reverse();
694
695            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
697        }
698
699        let mut old_chain = Vec::new();
701        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
702
703        while current_canonical_number > current_number {
706            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
707                old_chain.push(block.clone());
708                old_hash = block.recovered_block().parent_hash();
709                current_canonical_number -= 1;
710            } else {
711                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
713                return Ok(None)
714            }
715        }
716
717        debug_assert_eq!(current_number, current_canonical_number);
719
720        while old_hash != current_hash {
723            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
724                old_hash = block.recovered_block().parent_hash();
725                old_chain.push(block);
726            } else {
727                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
729                return Ok(None)
730            }
731
732            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
733            {
734                current_hash = block.recovered_block().parent_hash();
735                new_chain.push(block);
736            } else {
737                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
739                return Ok(None)
740            }
741        }
742        new_chain.reverse();
743        old_chain.reverse();
744
745        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
746    }
747
748    fn update_latest_block_to_canonical_ancestor(
760        &mut self,
761        canonical_header: &SealedHeader<N::BlockHeader>,
762    ) -> ProviderResult<()> {
763        debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
764        let current_head_number = self.state.tree_state.canonical_block_number();
765        let new_head_number = canonical_header.number();
766        let new_head_hash = canonical_header.hash();
767
768        self.state.tree_state.set_canonical_head(canonical_header.num_hash());
770
771        if new_head_number < current_head_number {
773            debug!(
774                target: "engine::tree",
775                current_head = current_head_number,
776                new_head = new_head_number,
777                new_head_hash = ?new_head_hash,
778                "FCU unwind detected: reverting to canonical ancestor"
779            );
780
781            self.handle_canonical_chain_unwind(current_head_number, canonical_header)
782        } else {
783            debug!(
784                target: "engine::tree",
785                previous_head = current_head_number,
786                new_head = new_head_number,
787                new_head_hash = ?new_head_hash,
788                "Advancing latest block to canonical ancestor"
789            );
790            self.handle_chain_advance_or_same_height(canonical_header)
791        }
792    }
793
794    fn handle_canonical_chain_unwind(
797        &self,
798        current_head_number: u64,
799        canonical_header: &SealedHeader<N::BlockHeader>,
800    ) -> ProviderResult<()> {
801        let new_head_number = canonical_header.number();
802        debug!(
803            target: "engine::tree",
804            from = current_head_number,
805            to = new_head_number,
806            "Handling unwind: collecting blocks to remove from in-memory state"
807        );
808
809        let old_blocks =
811            self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
812
813        self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
815    }
816
817    fn collect_blocks_for_canonical_unwind(
819        &self,
820        new_head_number: u64,
821        current_head_number: u64,
822    ) -> Vec<ExecutedBlock<N>> {
823        let mut old_blocks = Vec::new();
824
825        for block_num in (new_head_number + 1)..=current_head_number {
826            if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
827                let executed_block = block_state.block_ref().clone();
828                old_blocks.push(executed_block);
829                debug!(
830                    target: "engine::tree",
831                    block_number = block_num,
832                    "Collected block for removal from in-memory state"
833                );
834            }
835        }
836
837        if old_blocks.is_empty() {
838            debug!(
839                target: "engine::tree",
840                "No blocks found in memory to remove, will clear and reset state"
841            );
842        }
843
844        old_blocks
845    }
846
847    fn apply_canonical_ancestor_via_reorg(
849        &self,
850        canonical_header: &SealedHeader<N::BlockHeader>,
851        old_blocks: Vec<ExecutedBlock<N>>,
852    ) -> ProviderResult<()> {
853        let new_head_hash = canonical_header.hash();
854        let new_head_number = canonical_header.number();
855
856        match self.canonical_block_by_hash(new_head_hash)? {
858            Some(executed_block) => {
859                self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
861                    new: vec![executed_block],
862                    old: old_blocks,
863                });
864
865                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
868
869                debug!(
870                    target: "engine::tree",
871                    block_number = new_head_number,
872                    block_hash = ?new_head_hash,
873                    "Successfully loaded canonical ancestor into memory via reorg"
874                );
875            }
876            None => {
877                warn!(
879                    target: "engine::tree",
880                    block_hash = ?new_head_hash,
881                    "Could not find canonical ancestor block, updating header only"
882                );
883                self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
884            }
885        }
886
887        Ok(())
888    }
889
890    fn handle_chain_advance_or_same_height(
892        &self,
893        canonical_header: &SealedHeader<N::BlockHeader>,
894    ) -> ProviderResult<()> {
895        let new_head_number = canonical_header.number();
896        let new_head_hash = canonical_header.hash();
897
898        self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
900
901        self.ensure_block_in_memory(new_head_number, new_head_hash)
903    }
904
905    fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
907        if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
909            return Ok(());
910        }
911
912        if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
914            self.canonical_in_memory_state
915                .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
916
917            debug!(
918                target: "engine::tree",
919                block_number,
920                block_hash = ?block_hash,
921                "Added canonical block to in-memory state"
922            );
923        }
924
925        Ok(())
926    }
927
928    fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
938        let target_hash = target.block.hash;
939        let canonical_head = self.state.tree_state.canonical_head();
941        let mut current_hash;
942        let mut current_block = target;
943        loop {
944            if current_block.block.hash == canonical_head.hash {
945                return Ok(false)
946            }
947            if current_block.block.number <= canonical_head.number {
949                break
950            }
951            current_hash = current_block.parent;
952
953            let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
954            current_block = next_block.block_with_parent();
955        }
956
957        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
959            return Ok(false)
960        }
961
962        if self.provider.block_number(target_hash)?.is_some() {
964            return Ok(false)
965        }
966
967        Ok(true)
968    }
969
970    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
979    fn on_forkchoice_updated(
980        &mut self,
981        state: ForkchoiceState,
982        attrs: Option<T::PayloadAttributes>,
983        version: EngineApiMessageVersion,
984    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
985        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
986
987        self.record_forkchoice_metrics(&attrs);
989
990        if let Some(early_result) = self.validate_forkchoice_state(state)? {
992            return Ok(TreeOutcome::new(early_result));
993        }
994
995        if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
997            return Ok(result);
998        }
999
1000        if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1003            return Ok(result);
1004        }
1005
1006        self.handle_missing_block(state)
1008    }
1009
1010    fn record_forkchoice_metrics(&self, attrs: &Option<T::PayloadAttributes>) {
1012        self.metrics.engine.forkchoice_updated_messages.increment(1);
1013        if attrs.is_some() {
1014            self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1015        }
1016        self.canonical_in_memory_state.on_forkchoice_update_received();
1017    }
1018
1019    fn validate_forkchoice_state(
1024        &mut self,
1025        state: ForkchoiceState,
1026    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1027        if state.head_block_hash.is_zero() {
1028            return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1029        }
1030
1031        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1034        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1035            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1036        }
1037
1038        if !self.backfill_sync_state.is_idle() {
1039            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1042            return Ok(Some(OnForkChoiceUpdated::syncing()));
1043        }
1044
1045        Ok(None)
1046    }
1047
1048    fn handle_canonical_head(
1054        &self,
1055        state: ForkchoiceState,
1056        attrs: &Option<T::PayloadAttributes>, version: EngineApiMessageVersion,
1058    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1059        if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1074            return Ok(None);
1075        }
1076
1077        trace!(target: "engine::tree", "fcu head hash is already canonical");
1078
1079        if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1081            return Ok(Some(TreeOutcome::new(outcome)));
1083        }
1084
1085        if let Some(attr) = attrs {
1087            let tip = self
1088                .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1089                .ok_or_else(|| {
1090                    ProviderError::HeaderNotFound(state.head_block_hash.into())
1093                })?;
1094            let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1096            return Ok(Some(TreeOutcome::new(updated)));
1097        }
1098
1099        let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1101            PayloadStatusEnum::Valid,
1102            Some(state.head_block_hash),
1103        )));
1104        Ok(Some(outcome))
1105    }
1106
1107    fn apply_chain_update(
1119        &mut self,
1120        state: ForkchoiceState,
1121        attrs: &Option<T::PayloadAttributes>,
1122        version: EngineApiMessageVersion,
1123    ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1124        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1126            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1127
1128            if self.engine_kind.is_opstack() ||
1131                self.config.always_process_payload_attributes_on_canonical_head()
1132            {
1133                if let Some(attr) = attrs {
1134                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1135                    let updated = self.process_payload_attributes(
1137                        attr.clone(),
1138                        &canonical_header,
1139                        state,
1140                        version,
1141                    );
1142                    return Ok(Some(TreeOutcome::new(updated)));
1143                }
1144
1145                if self.config.unwind_canonical_header() {
1153                    self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1154                }
1155            }
1156
1157            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1169                PayloadStatusEnum::Valid,
1170                Some(state.head_block_hash),
1171            )));
1172            return Ok(Some(outcome));
1173        }
1174
1175        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1177            let tip = chain_update.tip().clone_sealed_header();
1178            self.on_canonical_chain_update(chain_update);
1179
1180            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1182                return Ok(Some(TreeOutcome::new(outcome)));
1184            }
1185
1186            if let Some(attr) = attrs {
1187                let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1189                return Ok(Some(TreeOutcome::new(updated)));
1190            }
1191
1192            let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1193                PayloadStatusEnum::Valid,
1194                Some(state.head_block_hash),
1195            )));
1196            return Ok(Some(outcome));
1197        }
1198
1199        Ok(None)
1200    }
1201
1202    fn handle_missing_block(
1207        &self,
1208        state: ForkchoiceState,
1209    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1210        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1217        !state.safe_block_hash.is_zero() &&
1219        self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1220        {
1221            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1222            state.safe_block_hash
1223        } else {
1224            state.head_block_hash
1225        };
1226
1227        let target = self.lowest_buffered_ancestor_or(target);
1228        trace!(target: "engine::tree", %target, "downloading missing block");
1229
1230        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1231            PayloadStatusEnum::Syncing,
1232        )))
1233        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1234    }
1235
1236    #[expect(clippy::type_complexity)]
1245    fn try_recv_engine_message(
1246        &self,
1247    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1248        if self.persistence_state.in_progress() {
1249            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1251                Ok(msg) => Ok(Some(msg)),
1252                Err(err) => match err {
1253                    RecvTimeoutError::Timeout => Ok(None),
1254                    RecvTimeoutError::Disconnected => Err(RecvError),
1255                },
1256            }
1257        } else {
1258            self.incoming.recv().map(Some)
1259        }
1260    }
1261
1262    fn remove_blocks(&mut self, new_tip_num: u64) {
1265        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1266        if new_tip_num < self.persistence_state.last_persisted_block.number {
1267            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1268            let (tx, rx) = oneshot::channel();
1269            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1270            self.persistence_state.start_remove(new_tip_num, rx);
1271        }
1272    }
1273
1274    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1277        if blocks_to_persist.is_empty() {
1278            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1279            return
1280        }
1281
1282        let highest_num_hash = blocks_to_persist
1284            .iter()
1285            .max_by_key(|block| block.recovered_block().number())
1286            .map(|b| b.recovered_block().num_hash())
1287            .expect("Checked non-empty persisting blocks");
1288
1289        debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1290        let (tx, rx) = oneshot::channel();
1291        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1292
1293        self.persistence_state.start_save(highest_num_hash, rx);
1294    }
1295
1296    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1301        if self.persistence_state.in_progress() {
1302            let (mut rx, start_time, current_action) = self
1303                .persistence_state
1304                .rx
1305                .take()
1306                .expect("if a persistence task is in progress Receiver must be Some");
1307            match rx.try_recv() {
1309                Ok(last_persisted_hash_num) => {
1310                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1311                    let Some(BlockNumHash {
1312                        hash: last_persisted_block_hash,
1313                        number: last_persisted_block_number,
1314                    }) = last_persisted_hash_num
1315                    else {
1316                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1319                        return Ok(())
1320                    };
1321
1322                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1323                    self.persistence_state
1324                        .finish(last_persisted_block_hash, last_persisted_block_number);
1325                    self.on_new_persisted_block()?;
1326                }
1327                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1328                Err(TryRecvError::Empty) => {
1329                    self.persistence_state.rx = Some((rx, start_time, current_action))
1330                }
1331            }
1332        }
1333
1334        if !self.persistence_state.in_progress() {
1335            if let Some(new_tip_num) = self.find_disk_reorg()? {
1336                self.remove_blocks(new_tip_num)
1337            } else if self.should_persist() {
1338                let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1339                self.persist_blocks(blocks_to_persist);
1340            }
1341        }
1342
1343        Ok(())
1344    }
1345
1346    fn on_engine_message(
1348        &mut self,
1349        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1350    ) -> Result<(), InsertBlockFatalError> {
1351        match msg {
1352            FromEngine::Event(event) => match event {
1353                FromOrchestrator::BackfillSyncStarted => {
1354                    debug!(target: "engine::tree", "received backfill sync started event");
1355                    self.backfill_sync_state = BackfillSyncState::Active;
1356                }
1357                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1358                    self.on_backfill_sync_finished(ctrl)?;
1359                }
1360            },
1361            FromEngine::Request(request) => {
1362                match request {
1363                    EngineApiRequest::InsertExecutedBlock(block) => {
1364                        let block_num_hash = block.recovered_block().num_hash();
1365                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1366                            return Ok(())
1368                        }
1369
1370                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1371                        let now = Instant::now();
1372
1373                        if self.state.tree_state.canonical_block_hash() ==
1376                            block.recovered_block().parent_hash()
1377                        {
1378                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1379                            self.canonical_in_memory_state.set_pending_block(block.clone());
1380                        }
1381
1382                        self.state.tree_state.insert_executed(block.clone());
1383                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1384                        self.emit_event(EngineApiEvent::BeaconConsensus(
1385                            ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1386                        ));
1387                    }
1388                    EngineApiRequest::Beacon(request) => {
1389                        match request {
1390                            BeaconEngineMessage::ForkchoiceUpdated {
1391                                state,
1392                                payload_attrs,
1393                                tx,
1394                                version,
1395                            } => {
1396                                let mut output =
1397                                    self.on_forkchoice_updated(state, payload_attrs, version);
1398
1399                                if let Ok(res) = &mut output {
1400                                    self.state
1402                                        .forkchoice_state_tracker
1403                                        .set_latest(state, res.outcome.forkchoice_status());
1404
1405                                    self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1407                                        state,
1408                                        res.outcome.forkchoice_status(),
1409                                    ));
1410
1411                                    self.on_maybe_tree_event(res.event.take())?;
1413                                }
1414
1415                                if let Err(err) =
1416                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1417                                {
1418                                    self.metrics
1419                                        .engine
1420                                        .failed_forkchoice_updated_response_deliveries
1421                                        .increment(1);
1422                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1423                                }
1424                            }
1425                            BeaconEngineMessage::NewPayload { payload, tx } => {
1426                                let mut output = self.on_new_payload(payload);
1427
1428                                let maybe_event =
1429                                    output.as_mut().ok().and_then(|out| out.event.take());
1430
1431                                if let Err(err) =
1433                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1434                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1435                                    }))
1436                                {
1437                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1438                                    self.metrics
1439                                        .engine
1440                                        .failed_new_payload_response_deliveries
1441                                        .increment(1);
1442                                }
1443
1444                                self.on_maybe_tree_event(maybe_event)?;
1446                            }
1447                        }
1448                    }
1449                }
1450            }
1451            FromEngine::DownloadedBlocks(blocks) => {
1452                if let Some(event) = self.on_downloaded(blocks)? {
1453                    self.on_tree_event(event)?;
1454                }
1455            }
1456        }
1457        Ok(())
1458    }
1459
1460    fn on_backfill_sync_finished(
1474        &mut self,
1475        ctrl: ControlFlow,
1476    ) -> Result<(), InsertBlockFatalError> {
1477        debug!(target: "engine::tree", "received backfill sync finished event");
1478        self.backfill_sync_state = BackfillSyncState::Idle;
1479
1480        let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1482            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1483            self.state.invalid_headers.insert(**bad_block);
1485
1486            Some(*target)
1488        } else {
1489            ctrl.block_number()
1491        };
1492
1493        let Some(backfill_height) = backfill_height else { return Ok(()) };
1495
1496        let Some(backfill_num_hash) = self
1502            .provider
1503            .block_hash(backfill_height)?
1504            .map(|hash| BlockNumHash { hash, number: backfill_height })
1505        else {
1506            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1507            return Ok(())
1508        };
1509
1510        if ctrl.is_unwind() {
1511            self.state.tree_state.reset(backfill_num_hash)
1514        } else {
1515            self.state.tree_state.remove_until(
1516                backfill_num_hash,
1517                self.persistence_state.last_persisted_block.hash,
1518                Some(backfill_num_hash),
1519            );
1520        }
1521
1522        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1523        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1524
1525        self.state.buffer.remove_old_blocks(backfill_height);
1527        self.canonical_in_memory_state.clear_state();
1530
1531        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1532            self.state.tree_state.set_canonical_head(new_head.num_hash());
1535            self.persistence_state.finish(new_head.hash(), new_head.number());
1536
1537            self.canonical_in_memory_state.set_canonical_head(new_head);
1539        }
1540
1541        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1544        else {
1545            return Ok(())
1546        };
1547        if sync_target_state.finalized_block_hash.is_zero() {
1548            return Ok(())
1550        }
1551        let newest_finalized = self
1553            .state
1554            .buffer
1555            .block(&sync_target_state.finalized_block_hash)
1556            .map(|block| block.number());
1557
1558        if let Some(backfill_target) =
1564            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1565                self.backfill_sync_target(progress, finalized_number, None)
1568            })
1569        {
1570            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1572                backfill_target.into(),
1573            )));
1574            return Ok(())
1575        };
1576
1577        if let Some(lowest_buffered) =
1579            self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1580        {
1581            let current_head_num = self.state.tree_state.current_canonical_head.number;
1582            let target_head_num = lowest_buffered.number();
1583
1584            if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1585            {
1586                debug!(
1588                    target: "engine::tree",
1589                    %current_head_num,
1590                    %target_head_num,
1591                    %distance,
1592                    "Backfill complete, downloading remaining blocks to reach FCU target"
1593                );
1594
1595                self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1596                    lowest_buffered.parent_hash(),
1597                    distance,
1598                )));
1599                return Ok(());
1600            }
1601        }
1602
1603        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1605    }
1606
1607    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1611        if let Some(chain_update) = self.on_new_head(target)? {
1612            self.on_canonical_chain_update(chain_update);
1613        }
1614
1615        Ok(())
1616    }
1617
1618    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1620        if let Some(event) = event {
1621            self.on_tree_event(event)?;
1622        }
1623
1624        Ok(())
1625    }
1626
1627    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1631        match event {
1632            TreeEvent::TreeAction(action) => match action {
1633                TreeAction::MakeCanonical { sync_target_head } => {
1634                    self.make_canonical(sync_target_head)?;
1635                }
1636            },
1637            TreeEvent::BackfillAction(action) => {
1638                self.emit_event(EngineApiEvent::BackfillAction(action));
1639            }
1640            TreeEvent::Download(action) => {
1641                self.emit_event(EngineApiEvent::Download(action));
1642            }
1643        }
1644
1645        Ok(())
1646    }
1647
1648    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1650        let event = event.into();
1651
1652        if event.is_backfill_action() {
1653            debug_assert_eq!(
1654                self.backfill_sync_state,
1655                BackfillSyncState::Idle,
1656                "backfill action should only be emitted when backfill is idle"
1657            );
1658
1659            if self.persistence_state.in_progress() {
1660                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1663                return
1664            }
1665
1666            self.backfill_sync_state = BackfillSyncState::Pending;
1667            self.metrics.engine.pipeline_runs.increment(1);
1668            debug!(target: "engine::tree", "emitting backfill action event");
1669        }
1670
1671        let _ = self.outgoing.send(event).inspect_err(
1672            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1673        );
1674    }
1675
1676    pub const fn should_persist(&self) -> bool {
1680        if !self.backfill_sync_state.is_idle() {
1681            return false
1683        }
1684
1685        let min_block = self.persistence_state.last_persisted_block.number;
1686        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1687            self.config.persistence_threshold()
1688    }
1689
1690    fn get_canonical_blocks_to_persist(
1694        &self,
1695    ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1696        debug_assert!(!self.persistence_state.in_progress());
1699
1700        let mut blocks_to_persist = Vec::new();
1701        let mut current_hash = self.state.tree_state.canonical_block_hash();
1702        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1703        let canonical_head_number = self.state.tree_state.canonical_block_number();
1704
1705        let target_number =
1707            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1708
1709        debug!(
1710            target: "engine::tree",
1711            ?current_hash,
1712            ?last_persisted_number,
1713            ?canonical_head_number,
1714            ?target_number,
1715            "Returning canonical blocks to persist"
1716        );
1717        while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1718            if block.recovered_block().number() <= last_persisted_number {
1719                break;
1720            }
1721
1722            if block.recovered_block().number() <= target_number {
1723                blocks_to_persist.push(block.clone());
1724            }
1725
1726            current_hash = block.recovered_block().parent_hash();
1727        }
1728
1729        blocks_to_persist.reverse();
1731
1732        Ok(blocks_to_persist)
1733    }
1734
1735    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1743        if let Some(remove_above) = self.find_disk_reorg()? {
1746            self.remove_blocks(remove_above);
1747            return Ok(())
1748        }
1749
1750        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1751        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1752        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1753            number: self.persistence_state.last_persisted_block.number,
1754            hash: self.persistence_state.last_persisted_block.hash,
1755        });
1756        Ok(())
1757    }
1758
1759    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1767        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1768        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1770            return Ok(Some(block.clone()))
1771        }
1772
1773        let (block, senders) = self
1774            .provider
1775            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1776            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1777            .split_sealed();
1778        let execution_output = self
1779            .provider
1780            .get_state(block.header().number())?
1781            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1782        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1783        let trie_updates = self.provider.get_block_trie_updates(block.number())?;
1784
1785        Ok(Some(ExecutedBlock {
1786            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1787            execution_output: Arc::new(execution_output),
1788            hashed_state: Arc::new(hashed_state),
1789            trie_updates: Arc::new(trie_updates.into()),
1790        }))
1791    }
1792
1793    fn sealed_header_by_hash(
1795        &self,
1796        hash: B256,
1797    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1798        let header = self.state.tree_state.sealed_header_by_hash(&hash);
1800
1801        if header.is_some() {
1802            Ok(header)
1803        } else {
1804            self.provider.sealed_header_by_hash(hash)
1805        }
1806    }
1807
1808    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1815        self.state
1816            .buffer
1817            .lowest_ancestor(&hash)
1818            .map(|block| block.parent_hash())
1819            .unwrap_or_else(|| hash)
1820    }
1821
1822    fn latest_valid_hash_for_invalid_payload(
1833        &mut self,
1834        parent_hash: B256,
1835    ) -> ProviderResult<Option<B256>> {
1836        if self.sealed_header_by_hash(parent_hash)?.is_some() {
1838            return Ok(Some(parent_hash))
1839        }
1840
1841        let mut current_hash = parent_hash;
1844        let mut current_block = self.state.invalid_headers.get(¤t_hash);
1845        while let Some(block_with_parent) = current_block {
1846            current_hash = block_with_parent.parent;
1847            current_block = self.state.invalid_headers.get(¤t_hash);
1848
1849            if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1852                return Ok(Some(current_hash))
1853            }
1854        }
1855        Ok(None)
1856    }
1857
1858    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1862        if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1865            !parent.difficulty().is_zero()
1866        {
1867            parent_hash = B256::ZERO;
1868        }
1869
1870        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1871        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1872            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1873        })
1874        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1875    }
1876
1877    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1881        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1882            return target.head_block_hash == block_hash
1883        }
1884        false
1885    }
1886
1887    fn check_invalid_ancestor_with_head(
1893        &mut self,
1894        check: B256,
1895        head: &SealedBlock<N::Block>,
1896    ) -> ProviderResult<Option<PayloadStatus>> {
1897        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1899
1900        Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1901    }
1902
1903    fn on_invalid_new_payload(
1905        &mut self,
1906        head: SealedBlock<N::Block>,
1907        invalid: BlockWithParent,
1908    ) -> ProviderResult<PayloadStatus> {
1909        let status = self.prepare_invalid_response(invalid.parent)?;
1911
1912        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1914        self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1915
1916        Ok(status)
1917    }
1918
1919    fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1931        let parent_hash = payload.parent_hash();
1932        let block_hash = payload.block_hash();
1933        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1934        if lowest_buffered_ancestor == block_hash {
1935            lowest_buffered_ancestor = parent_hash;
1936        }
1937
1938        self.state.invalid_headers.get(&lowest_buffered_ancestor)
1940    }
1941
1942    fn handle_invalid_ancestor_payload(
1951        &mut self,
1952        payload: T::ExecutionData,
1953        invalid: BlockWithParent,
1954    ) -> Result<PayloadStatus, InsertBlockFatalError> {
1955        let parent_hash = payload.parent_hash();
1956
1957        let block = match self.payload_validator.ensure_well_formed_payload(payload) {
1963            Ok(block) => block,
1964            Err(error) => return Ok(self.on_new_payload_error(error, parent_hash)?),
1965        };
1966
1967        Ok(self.on_invalid_new_payload(block.into_sealed_block(), invalid)?)
1968    }
1969
1970    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1973        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1975
1976        match self.prepare_invalid_response(header.parent) {
1978            Ok(status) => Ok(Some(status)),
1979            Err(err) => {
1980                debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
1981                Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1983                    validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1984                })))
1985            }
1986        }
1987    }
1988
1989    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1992        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1993            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1994            return Err(e)
1995        }
1996
1997        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1998            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1999            return Err(e)
2000        }
2001
2002        Ok(())
2003    }
2004
2005    #[instrument(level = "debug", target = "engine::tree", skip(self))]
2007    fn try_connect_buffered_blocks(
2008        &mut self,
2009        parent: BlockNumHash,
2010    ) -> Result<(), InsertBlockFatalError> {
2011        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2012
2013        if blocks.is_empty() {
2014            return Ok(())
2016        }
2017
2018        let now = Instant::now();
2019        let block_count = blocks.len();
2020        for child in blocks {
2021            let child_num_hash = child.num_hash();
2022            match self.insert_block(child) {
2023                Ok(res) => {
2024                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2025                    if self.is_sync_target_head(child_num_hash.hash) &&
2026                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2027                    {
2028                        self.make_canonical(child_num_hash.hash)?;
2029                    }
2030                }
2031                Err(err) => {
2032                    if let InsertPayloadError::Block(err) = err {
2033                        debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2034                        if let Err(fatal) = self.on_insert_block_error(err) {
2035                            warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2036                            return Err(fatal)
2037                        }
2038                    }
2039                }
2040            }
2041        }
2042
2043        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2044        Ok(())
2045    }
2046
2047    fn buffer_block(
2049        &mut self,
2050        block: RecoveredBlock<N::Block>,
2051    ) -> Result<(), InsertBlockError<N::Block>> {
2052        if let Err(err) = self.validate_block(&block) {
2053            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
2054        }
2055        self.state.buffer.insert_block(block);
2056        Ok(())
2057    }
2058
2059    #[inline]
2064    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2065        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2066    }
2067
2068    #[inline]
2071    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2072        if block > local_tip {
2073            Some(block - local_tip)
2074        } else {
2075            None
2076        }
2077    }
2078
2079    fn backfill_sync_target(
2086        &self,
2087        canonical_tip_num: u64,
2088        target_block_number: u64,
2089        downloaded_block: Option<BlockNumHash>,
2090    ) -> Option<B256> {
2091        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2092
2093        let exceeds_backfill_threshold =
2095            match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2096                (Some(downloaded_block), Some(state))
2098                    if downloaded_block.hash == state.finalized_block_hash =>
2099                {
2100                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2101                }
2102                _ => match sync_target_state
2103                    .as_ref()
2104                    .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2105                {
2106                    Some(buffered_finalized) => {
2107                        self.exceeds_backfill_run_threshold(
2110                            canonical_tip_num,
2111                            buffered_finalized.number(),
2112                        )
2113                    }
2114                    None => {
2115                        self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2117                    }
2118                },
2119            };
2120
2121        if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2123            match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2125                Err(err) => {
2126                    warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2127                }
2128                Ok(None) => {
2129                    if !state.finalized_block_hash.is_zero() {
2131                        return Some(state.finalized_block_hash)
2134                    }
2135
2136                    debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2149                    return Some(state.head_block_hash)
2150                }
2151                Ok(Some(_)) => {
2152                    }
2154            }
2155        }
2156
2157        None
2158    }
2159
2160    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2163        let mut canonical = self.state.tree_state.current_canonical_head;
2164        let mut persisted = self.persistence_state.last_persisted_block;
2165
2166        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2167            Ok(self
2168                .sealed_header_by_hash(num_hash.hash)?
2169                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2170                .parent_num_hash())
2171        };
2172
2173        while canonical.number > persisted.number {
2176            canonical = parent_num_hash(canonical)?;
2177        }
2178
2179        if canonical == persisted {
2181            return Ok(None);
2182        }
2183
2184        while persisted.number > canonical.number {
2190            persisted = parent_num_hash(persisted)?;
2191        }
2192
2193        debug_assert_eq!(persisted.number, canonical.number);
2194
2195        while persisted.hash != canonical.hash {
2197            canonical = parent_num_hash(canonical)?;
2198            persisted = parent_num_hash(persisted)?;
2199        }
2200
2201        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2202
2203        Ok(Some(persisted.number))
2204    }
2205
2206    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2210        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2211        let start = Instant::now();
2212
2213        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2215
2216        let tip = chain_update.tip().clone_sealed_header();
2217        let notification = chain_update.to_chain_notification();
2218
2219        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2221            let new_first = new.first().map(|first| first.recovered_block().num_hash());
2222            let old_first = old.first().map(|first| first.recovered_block().num_hash());
2223            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2224
2225            self.update_reorg_metrics(old.len());
2226            self.reinsert_reorged_blocks(new.clone());
2227            self.reinsert_reorged_blocks(old.clone());
2228        }
2229
2230        self.canonical_in_memory_state.update_chain(chain_update);
2232        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2233
2234        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2236
2237        self.canonical_in_memory_state.notify_canon_state(notification);
2239
2240        self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2242            Box::new(tip),
2243            start.elapsed(),
2244        ));
2245    }
2246
2247    fn update_reorg_metrics(&self, old_chain_length: usize) {
2249        self.metrics.tree.reorgs.increment(1);
2250        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2251    }
2252
2253    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2255        for block in new_chain {
2256            if self
2257                .state
2258                .tree_state
2259                .executed_block_by_hash(block.recovered_block().hash())
2260                .is_none()
2261            {
2262                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2263                self.state.tree_state.insert_executed(block);
2264            }
2265        }
2266    }
2267
2268    fn on_disconnected_downloaded_block(
2273        &self,
2274        downloaded_block: BlockNumHash,
2275        missing_parent: BlockNumHash,
2276        head: BlockNumHash,
2277    ) -> Option<TreeEvent> {
2278        if let Some(target) =
2280            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2281        {
2282            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2283            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2284        }
2285
2286        let request = if let Some(distance) =
2296            self.distance_from_local_tip(head.number, missing_parent.number)
2297        {
2298            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2299            DownloadRequest::BlockRange(missing_parent.hash, distance)
2300        } else {
2301            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2302            DownloadRequest::single_block(missing_parent.hash)
2305        };
2306
2307        Some(TreeEvent::Download(request))
2308    }
2309
2310    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2316    fn on_downloaded_block(
2317        &mut self,
2318        block: RecoveredBlock<N::Block>,
2319    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2320        let block_num_hash = block.num_hash();
2321        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2322        if self
2323            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2324            .is_some()
2325        {
2326            return Ok(None)
2327        }
2328
2329        if !self.backfill_sync_state.is_idle() {
2330            return Ok(None)
2331        }
2332
2333        match self.insert_block(block) {
2335            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2336                if self.is_sync_target_head(block_num_hash.hash) {
2337                    trace!(target: "engine::tree", "appended downloaded sync target block");
2338
2339                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2342                        sync_target_head: block_num_hash.hash,
2343                    })))
2344                }
2345                trace!(target: "engine::tree", "appended downloaded block");
2346                self.try_connect_buffered_blocks(block_num_hash)?;
2347            }
2348            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2349                return Ok(self.on_disconnected_downloaded_block(
2352                    block_num_hash,
2353                    missing_ancestor,
2354                    head,
2355                ))
2356            }
2357            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2358                trace!(target: "engine::tree", "downloaded block already executed");
2359            }
2360            Err(err) => {
2361                if let InsertPayloadError::Block(err) = err {
2362                    debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2363                    if let Err(fatal) = self.on_insert_block_error(err) {
2364                        warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2365                        return Err(fatal)
2366                    }
2367                }
2368            }
2369        }
2370        Ok(None)
2371    }
2372
2373    fn insert_payload(
2382        &mut self,
2383        payload: T::ExecutionData,
2384    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2385        self.insert_block_or_payload(
2386            payload.block_with_parent(),
2387            payload,
2388            |validator, payload, ctx| validator.validate_payload(payload, ctx),
2389            |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2390        )
2391    }
2392
2393    fn insert_block(
2394        &mut self,
2395        block: RecoveredBlock<N::Block>,
2396    ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2397        self.insert_block_or_payload(
2398            block.block_with_parent(),
2399            block,
2400            |validator, block, ctx| validator.validate_block(block, ctx),
2401            |_, block| Ok(block),
2402        )
2403    }
2404
2405    #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2422    fn insert_block_or_payload<Input, Err>(
2423        &mut self,
2424        block_id: BlockWithParent,
2425        input: Input,
2426        execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2427        convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2428    ) -> Result<InsertPayloadOk, Err>
2429    where
2430        Err: From<InsertBlockError<N::Block>>,
2431    {
2432        let block_insert_start = Instant::now();
2433        let block_num_hash = block_id.block;
2434        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2435
2436        match self.sealed_header_by_hash(block_num_hash.hash) {
2437            Err(err) => {
2438                let block = convert_to_block(self, input)?;
2439                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2440            }
2441            Ok(Some(_)) => {
2442                convert_to_block(self, input)?;
2445                return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2446            }
2447            _ => {}
2448        };
2449
2450        match self.state_provider_builder(block_id.parent) {
2452            Err(err) => {
2453                let block = convert_to_block(self, input)?;
2454                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2455            }
2456            Ok(None) => {
2457                let block = convert_to_block(self, input)?;
2458
2459                let missing_ancestor = self
2462                    .state
2463                    .buffer
2464                    .lowest_ancestor(&block.parent_hash())
2465                    .map(|block| block.parent_num_hash())
2466                    .unwrap_or_else(|| block.parent_num_hash());
2467
2468                self.state.buffer.insert_block(block);
2469
2470                return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2471                    head: self.state.tree_state.current_canonical_head,
2472                    missing_ancestor,
2473                }))
2474            }
2475            Ok(Some(_)) => {}
2476        }
2477
2478        let is_fork = match self.is_fork(block_id) {
2480            Err(err) => {
2481                let block = convert_to_block(self, input)?;
2482                return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2483            }
2484            Ok(is_fork) => is_fork,
2485        };
2486
2487        let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2488
2489        let start = Instant::now();
2490
2491        let executed = execute(&mut self.payload_validator, input, ctx)?;
2492
2493        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2495        {
2496            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2497            self.canonical_in_memory_state.set_pending_block(executed.clone());
2498        }
2499
2500        self.state.tree_state.insert_executed(executed.clone());
2501        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2502
2503        let elapsed = start.elapsed();
2505        let engine_event = if is_fork {
2506            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2507        } else {
2508            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2509        };
2510        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2511
2512        self.metrics
2513            .engine
2514            .block_insert_total_duration
2515            .record(block_insert_start.elapsed().as_secs_f64());
2516        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2517        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2518    }
2519
2520    fn on_insert_block_error(
2526        &mut self,
2527        error: InsertBlockError<N::Block>,
2528    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2529        let (block, error) = error.split();
2530
2531        let validation_err = error.ensure_validation_error()?;
2534
2535        warn!(
2539            target: "engine::tree",
2540            invalid_hash=%block.hash(),
2541            invalid_number=block.number(),
2542            %validation_err,
2543            "Invalid block error on new payload",
2544        );
2545        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2546
2547        self.state.invalid_headers.insert(block.block_with_parent());
2549        self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2550            Box::new(block),
2551        )));
2552
2553        Ok(PayloadStatus::new(
2554            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2555            latest_valid_hash,
2556        ))
2557    }
2558
2559    fn on_new_payload_error(
2561        &mut self,
2562        error: NewPayloadError,
2563        parent_hash: B256,
2564    ) -> ProviderResult<PayloadStatus> {
2565        error!(target: "engine::tree", %error, "Invalid payload");
2566        let latest_valid_hash =
2569            if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2570                None
2574            } else {
2575                self.latest_valid_hash_for_invalid_payload(parent_hash)?
2576            };
2577
2578        let status = PayloadStatusEnum::from(error);
2579        Ok(PayloadStatus::new(status, latest_valid_hash))
2580    }
2581
2582    pub fn find_canonical_header(
2584        &self,
2585        hash: B256,
2586    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2587        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2588
2589        if canonical.is_none() {
2590            canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2591        }
2592
2593        Ok(canonical)
2594    }
2595
2596    fn update_finalized_block(
2598        &self,
2599        finalized_block_hash: B256,
2600    ) -> Result<(), OnForkChoiceUpdated> {
2601        if finalized_block_hash.is_zero() {
2602            return Ok(())
2603        }
2604
2605        match self.find_canonical_header(finalized_block_hash) {
2606            Ok(None) => {
2607                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2608                return Err(OnForkChoiceUpdated::invalid_state())
2610            }
2611            Ok(Some(finalized)) => {
2612                if Some(finalized.num_hash()) !=
2613                    self.canonical_in_memory_state.get_finalized_num_hash()
2614                {
2615                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2618                    self.canonical_in_memory_state.set_finalized(finalized.clone());
2619                    self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2621                }
2622            }
2623            Err(err) => {
2624                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2625            }
2626        }
2627
2628        Ok(())
2629    }
2630
2631    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2633        if safe_block_hash.is_zero() {
2634            return Ok(())
2635        }
2636
2637        match self.find_canonical_header(safe_block_hash) {
2638            Ok(None) => {
2639                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2640                return Err(OnForkChoiceUpdated::invalid_state())
2642            }
2643            Ok(Some(safe)) => {
2644                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2645                    let _ = self.persistence.save_safe_block_number(safe.number());
2648                    self.canonical_in_memory_state.set_safe(safe.clone());
2649                    self.metrics.tree.safe_block_height.set(safe.number() as f64);
2651                }
2652            }
2653            Err(err) => {
2654                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2655            }
2656        }
2657
2658        Ok(())
2659    }
2660
2661    fn ensure_consistent_forkchoice_state(
2670        &self,
2671        state: ForkchoiceState,
2672    ) -> Result<(), OnForkChoiceUpdated> {
2673        self.update_finalized_block(state.finalized_block_hash)?;
2679
2680        self.update_safe_block(state.safe_block_hash)
2686    }
2687
2688    fn process_payload_attributes(
2693        &self,
2694        attrs: T::PayloadAttributes,
2695        head: &N::BlockHeader,
2696        state: ForkchoiceState,
2697        version: EngineApiMessageVersion,
2698    ) -> OnForkChoiceUpdated {
2699        if let Err(err) =
2700            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2701        {
2702            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2703            return OnForkChoiceUpdated::invalid_payload_attributes()
2704        }
2705
2706        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2711            state.head_block_hash,
2712            attrs,
2713            version as u8,
2714        ) {
2715            Ok(attributes) => {
2716                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2719
2720                OnForkChoiceUpdated::updated_with_pending_payload_id(
2732                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2733                    pending_payload_id,
2734                )
2735            }
2736            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2737        }
2738    }
2739
2740    pub(crate) fn remove_before(
2747        &mut self,
2748        upper_bound: BlockNumHash,
2749        finalized_hash: Option<B256>,
2750    ) -> ProviderResult<()> {
2751        let num = if let Some(hash) = finalized_hash {
2754            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2755        } else {
2756            None
2757        };
2758
2759        self.state.tree_state.remove_until(
2760            upper_bound,
2761            self.persistence_state.last_persisted_block.hash,
2762            num,
2763        );
2764        Ok(())
2765    }
2766
2767    pub fn state_provider_builder(
2772        &self,
2773        hash: B256,
2774    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2775    where
2776        P: BlockReader + StateProviderFactory + StateReader + Clone,
2777    {
2778        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2779            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2780            return Ok(Some(StateProviderBuilder::new(
2782                self.provider.clone(),
2783                historical,
2784                Some(blocks),
2785            )))
2786        }
2787
2788        if let Some(header) = self.provider.header(hash)? {
2790            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2791            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2794        }
2795
2796        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2797        Ok(None)
2798    }
2799}
2800
2801#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2807pub enum BlockStatus {
2808    Valid,
2810    Disconnected {
2812        head: BlockNumHash,
2814        missing_ancestor: BlockNumHash,
2816    },
2817}
2818
2819#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2824pub enum InsertPayloadOk {
2825    AlreadySeen(BlockStatus),
2827    Inserted(BlockStatus),
2829}