1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use persistence_state::CurrentPersistenceAction;
17use reth_chain_state::{
18 CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
19 MemoryOverlayStateProvider, NewCanonicalChain,
20};
21use reth_consensus::{Consensus, FullConsensus};
22use reth_engine_primitives::{
23 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
24 ForkchoiceStateTracker, OnForkChoiceUpdated,
25};
26use reth_errors::{ConsensusError, ProviderResult};
27use reth_evm::{ConfigureEvm, OnStateHook};
28use reth_payload_builder::PayloadBuilderHandle;
29use reth_payload_primitives::{
30 BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
31};
32use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
33use reth_provider::{
34 providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
35 HashedPostStateProvider, ProviderError, StateProviderBox, StateProviderFactory, StateReader,
36 StateRootProvider, TransactionVariant,
37};
38use reth_revm::database::StateProviderDatabase;
39use reth_stages_api::ControlFlow;
40use reth_trie::{HashedPostState, TrieInput};
41use reth_trie_db::DatabaseHashedPostState;
42use revm::state::EvmState;
43use state::TreeState;
44use std::{
45 fmt::Debug,
46 sync::{
47 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
48 Arc,
49 },
50 time::Instant,
51};
52use tokio::sync::{
53 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
54 oneshot::{self, error::TryRecvError},
55};
56use tracing::*;
57
58mod block_buffer;
59mod cached_state;
60pub mod error;
61mod instrumented_state;
62mod invalid_headers;
63mod metrics;
64mod payload_processor;
65pub mod payload_validator;
66mod persistence_state;
67pub mod precompile_cache;
68#[cfg(test)]
69mod tests;
70#[expect(unused)]
72mod trie_updates;
73
74use crate::tree::error::AdvancePersistenceError;
75pub use block_buffer::BlockBuffer;
76pub use invalid_headers::InvalidHeaderCache;
77pub use payload_processor::*;
78pub use payload_validator::{BasicEngineValidator, EngineValidator};
79pub use persistence_state::PersistenceState;
80pub use reth_engine_primitives::TreeConfig;
81use reth_trie::KeccakKeyHasher;
82
83pub mod state;
84
85pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
95
96#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99 provider_factory: P,
101 historical: B256,
103 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108 pub const fn new(
111 provider_factory: P,
112 historical: B256,
113 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
114 ) -> Self {
115 Self { provider_factory, historical, overlay }
116 }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121 P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123 pub fn build(&self) -> ProviderResult<StateProviderBox> {
125 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126 if let Some(overlay) = self.overlay.clone() {
127 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128 }
129 Ok(provider)
130 }
131}
132
133#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138 tree_state: TreeState<N>,
140 forkchoice_state_tracker: ForkchoiceStateTracker,
142 buffer: BlockBuffer<N::Block>,
144 invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150 fn new(
151 block_buffer_limit: u32,
152 max_invalid_header_cache_length: u32,
153 canonical_block: BlockNumHash,
154 engine_kind: EngineApiKind,
155 ) -> Self {
156 Self {
157 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158 buffer: BlockBuffer::new(block_buffer_limit),
159 tree_state: TreeState::new(canonical_block, engine_kind),
160 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161 }
162 }
163}
164
165#[derive(Debug)]
167pub struct TreeOutcome<T> {
168 pub outcome: T,
170 pub event: Option<TreeEvent>,
172}
173
174impl<T> TreeOutcome<T> {
175 pub const fn new(outcome: T) -> Self {
177 Self { outcome, event: None }
178 }
179
180 pub fn with_event(mut self, event: TreeEvent) -> Self {
182 self.event = Some(event);
183 self
184 }
185}
186
187#[derive(Debug)]
189pub enum TreeEvent {
190 TreeAction(TreeAction),
192 BackfillAction(BackfillAction),
194 Download(DownloadRequest),
196}
197
198impl TreeEvent {
199 const fn is_backfill_action(&self) -> bool {
201 matches!(self, Self::BackfillAction(_))
202 }
203}
204
205#[derive(Debug)]
207pub enum TreeAction {
208 MakeCanonical {
210 sync_target_head: B256,
212 },
213}
214
215struct MeteredStateHook {
217 metrics: reth_evm::metrics::ExecutorMetrics,
218 inner_hook: Box<dyn OnStateHook>,
219}
220
221impl OnStateHook for MeteredStateHook {
222 fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
223 let accounts = state.keys().len();
225 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
226 let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
227
228 self.metrics.accounts_loaded_histogram.record(accounts as f64);
229 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
230 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
231
232 self.inner_hook.on_state(source, state);
234 }
235}
236
237pub struct EngineApiTreeHandler<N, P, T, V, C>
242where
243 N: NodePrimitives,
244 T: PayloadTypes,
245 C: ConfigureEvm<Primitives = N> + 'static,
246{
247 provider: P,
248 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
249 payload_validator: V,
250 state: EngineApiTreeState<N>,
252 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
261 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
263 outgoing: UnboundedSender<EngineApiEvent<N>>,
265 persistence: PersistenceHandle<N>,
267 persistence_state: PersistenceState,
269 backfill_sync_state: BackfillSyncState,
271 canonical_in_memory_state: CanonicalInMemoryState<N>,
274 payload_builder: PayloadBuilderHandle<T>,
277 config: TreeConfig,
279 metrics: EngineApiMetrics,
281 engine_kind: EngineApiKind,
283 evm_config: C,
285}
286
287impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
288 for EngineApiTreeHandler<N, P, T, V, C>
289where
290 N: NodePrimitives,
291 C: Debug + ConfigureEvm<Primitives = N>,
292{
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 f.debug_struct("EngineApiTreeHandler")
295 .field("provider", &self.provider)
296 .field("consensus", &self.consensus)
297 .field("payload_validator", &self.payload_validator)
298 .field("state", &self.state)
299 .field("incoming_tx", &self.incoming_tx)
300 .field("persistence", &self.persistence)
301 .field("persistence_state", &self.persistence_state)
302 .field("backfill_sync_state", &self.backfill_sync_state)
303 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
304 .field("payload_builder", &self.payload_builder)
305 .field("config", &self.config)
306 .field("metrics", &self.metrics)
307 .field("engine_kind", &self.engine_kind)
308 .field("evm_config", &self.evm_config)
309 .finish()
310 }
311}
312
313impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
314where
315 N: NodePrimitives,
316 P: DatabaseProviderFactory
317 + BlockReader<Block = N::Block, Header = N::BlockHeader>
318 + StateProviderFactory
319 + StateReader<Receipt = N::Receipt>
320 + HashedPostStateProvider
321 + Clone
322 + 'static,
323 <P as DatabaseProviderFactory>::Provider:
324 BlockReader<Block = N::Block, Header = N::BlockHeader>,
325 C: ConfigureEvm<Primitives = N> + 'static,
326 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
327 V: EngineValidator<T>,
328{
329 #[expect(clippy::too_many_arguments)]
331 pub fn new(
332 provider: P,
333 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
334 payload_validator: V,
335 outgoing: UnboundedSender<EngineApiEvent<N>>,
336 state: EngineApiTreeState<N>,
337 canonical_in_memory_state: CanonicalInMemoryState<N>,
338 persistence: PersistenceHandle<N>,
339 persistence_state: PersistenceState,
340 payload_builder: PayloadBuilderHandle<T>,
341 config: TreeConfig,
342 engine_kind: EngineApiKind,
343 evm_config: C,
344 ) -> Self {
345 let (incoming_tx, incoming) = std::sync::mpsc::channel();
346
347 Self {
348 provider,
349 consensus,
350 payload_validator,
351 incoming,
352 outgoing,
353 persistence,
354 persistence_state,
355 backfill_sync_state: BackfillSyncState::Idle,
356 state,
357 canonical_in_memory_state,
358 payload_builder,
359 config,
360 metrics: Default::default(),
361 incoming_tx,
362 engine_kind,
363 evm_config,
364 }
365 }
366
367 #[expect(clippy::complexity)]
373 pub fn spawn_new(
374 provider: P,
375 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
376 payload_validator: V,
377 persistence: PersistenceHandle<N>,
378 payload_builder: PayloadBuilderHandle<T>,
379 canonical_in_memory_state: CanonicalInMemoryState<N>,
380 config: TreeConfig,
381 kind: EngineApiKind,
382 evm_config: C,
383 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
384 {
385 let best_block_number = provider.best_block_number().unwrap_or(0);
386 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
387
388 let persistence_state = PersistenceState {
389 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
390 rx: None,
391 };
392
393 let (tx, outgoing) = unbounded_channel();
394 let state = EngineApiTreeState::new(
395 config.block_buffer_limit(),
396 config.max_invalid_header_cache_length(),
397 header.num_hash(),
398 kind,
399 );
400
401 let task = Self::new(
402 provider,
403 consensus,
404 payload_validator,
405 tx,
406 state,
407 canonical_in_memory_state,
408 persistence,
409 persistence_state,
410 payload_builder,
411 config,
412 kind,
413 evm_config,
414 );
415 let incoming = task.incoming_tx.clone();
416 std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
417 (incoming, outgoing)
418 }
419
420 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
422 self.incoming_tx.clone()
423 }
424
425 pub fn run(mut self) {
429 loop {
430 match self.try_recv_engine_message() {
431 Ok(Some(msg)) => {
432 debug!(target: "engine::tree", %msg, "received new engine message");
433 if let Err(fatal) = self.on_engine_message(msg) {
434 error!(target: "engine::tree", %fatal, "insert block fatal error");
435 return
436 }
437 }
438 Ok(None) => {
439 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440 }
441 Err(_err) => {
442 error!(target: "engine::tree", "Engine channel disconnected");
443 return
444 }
445 }
446
447 if let Err(err) = self.advance_persistence() {
448 error!(target: "engine::tree", %err, "Advancing persistence failed");
449 return
450 }
451 }
452 }
453
454 fn on_downloaded(
460 &mut self,
461 mut blocks: Vec<RecoveredBlock<N::Block>>,
462 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463 if blocks.is_empty() {
464 return Ok(None)
466 }
467
468 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470 for block in blocks.drain(..batch) {
471 if let Some(event) = self.on_downloaded_block(block)? {
472 let needs_backfill = event.is_backfill_action();
473 self.on_tree_event(event)?;
474 if needs_backfill {
475 return Ok(None)
477 }
478 }
479 }
480
481 if !blocks.is_empty() {
483 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484 }
485
486 Ok(None)
487 }
488
489 #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
504 fn on_new_payload(
505 &mut self,
506 payload: T::ExecutionData,
507 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508 trace!(target: "engine::tree", "invoked new payload");
509 self.metrics.engine.new_payload_messages.increment(1);
510
511 let start = Instant::now();
513
514 let parent_hash = payload.parent_hash();
540
541 let num_hash = payload.num_hash();
542 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
543 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
544
545 let block_hash = num_hash.hash;
546 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
547 if lowest_buffered_ancestor == block_hash {
548 lowest_buffered_ancestor = parent_hash;
549 }
550
551 if let Some(invalid) = self.state.invalid_headers.get(&lowest_buffered_ancestor) {
553 let block = match self.payload_validator.ensure_well_formed_payload(payload) {
559 Ok(block) => block,
560 Err(error) => {
561 let status = self.on_new_payload_error(error, parent_hash)?;
562 return Ok(TreeOutcome::new(status))
563 }
564 };
565
566 let status = self.on_invalid_new_payload(block.into_sealed_block(), invalid)?;
567 return Ok(TreeOutcome::new(status))
568 }
569 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
571
572 let status = if self.backfill_sync_state.is_idle() {
573 let mut latest_valid_hash = None;
574 match self.insert_payload(payload) {
575 Ok(status) => {
576 let status = match status {
577 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
578 latest_valid_hash = Some(block_hash);
579 self.try_connect_buffered_blocks(num_hash)?;
580 PayloadStatusEnum::Valid
581 }
582 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
583 latest_valid_hash = Some(block_hash);
584 PayloadStatusEnum::Valid
585 }
586 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
587 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
588 PayloadStatusEnum::Syncing
590 }
591 };
592
593 PayloadStatus::new(status, latest_valid_hash)
594 }
595 Err(error) => match error {
596 InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
597 InsertPayloadError::Payload(error) => {
598 self.on_new_payload_error(error, parent_hash)?
599 }
600 },
601 }
602 } else {
603 match self.payload_validator.ensure_well_formed_payload(payload) {
604 Ok(block) => {
606 if let Err(error) = self.buffer_block(block) {
607 self.on_insert_block_error(error)?
608 } else {
609 PayloadStatus::from_status(PayloadStatusEnum::Syncing)
610 }
611 }
612 Err(error) => self.on_new_payload_error(error, parent_hash)?,
613 }
614 };
615
616 let mut outcome = TreeOutcome::new(status);
617 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
619 if self.state.tree_state.canonical_block_hash() != block_hash {
621 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
622 sync_target_head: block_hash,
623 }));
624 }
625 }
626
627 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
629
630 Ok(outcome)
631 }
632
633 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
640 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
642 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
643 self.metrics.engine.executed_new_block_cache_miss.increment(1);
644 return Ok(None)
645 };
646
647 let new_head_number = new_head_block.recovered_block().number();
648 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
649
650 let mut new_chain = vec![new_head_block.clone()];
651 let mut current_hash = new_head_block.recovered_block().parent_hash();
652 let mut current_number = new_head_number - 1;
653
654 while current_number > current_canonical_number {
659 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
660 {
661 current_hash = block.recovered_block().parent_hash();
662 current_number -= 1;
663 new_chain.push(block);
664 } else {
665 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
666 return Ok(None)
669 }
670 }
671
672 if current_hash == self.state.tree_state.current_canonical_head.hash {
675 new_chain.reverse();
676
677 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
679 }
680
681 let mut old_chain = Vec::new();
683 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
684
685 while current_canonical_number > current_number {
688 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
689 old_chain.push(block.clone());
690 old_hash = block.recovered_block().parent_hash();
691 current_canonical_number -= 1;
692 } else {
693 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
695 return Ok(None)
696 }
697 }
698
699 debug_assert_eq!(current_number, current_canonical_number);
701
702 while old_hash != current_hash {
705 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
706 old_hash = block.recovered_block().parent_hash();
707 old_chain.push(block);
708 } else {
709 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
711 return Ok(None)
712 }
713
714 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
715 {
716 current_hash = block.recovered_block().parent_hash();
717 new_chain.push(block);
718 } else {
719 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
721 return Ok(None)
722 }
723 }
724 new_chain.reverse();
725 old_chain.reverse();
726
727 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
728 }
729
730 fn update_latest_block_to_canonical_ancestor(
742 &mut self,
743 canonical_header: &SealedHeader<N::BlockHeader>,
744 ) -> ProviderResult<()> {
745 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
746 let current_head_number = self.state.tree_state.canonical_block_number();
747 let new_head_number = canonical_header.number();
748 let new_head_hash = canonical_header.hash();
749
750 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
752
753 if new_head_number < current_head_number {
755 debug!(
756 target: "engine::tree",
757 current_head = current_head_number,
758 new_head = new_head_number,
759 new_head_hash = ?new_head_hash,
760 "FCU unwind detected: reverting to canonical ancestor"
761 );
762
763 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
764 } else {
765 debug!(
766 target: "engine::tree",
767 previous_head = current_head_number,
768 new_head = new_head_number,
769 new_head_hash = ?new_head_hash,
770 "Advancing latest block to canonical ancestor"
771 );
772 self.handle_chain_advance_or_same_height(canonical_header)
773 }
774 }
775
776 fn handle_canonical_chain_unwind(
779 &self,
780 current_head_number: u64,
781 canonical_header: &SealedHeader<N::BlockHeader>,
782 ) -> ProviderResult<()> {
783 let new_head_number = canonical_header.number();
784 debug!(
785 target: "engine::tree",
786 from = current_head_number,
787 to = new_head_number,
788 "Handling unwind: collecting blocks to remove from in-memory state"
789 );
790
791 let old_blocks =
793 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
794
795 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
797 }
798
799 fn collect_blocks_for_canonical_unwind(
801 &self,
802 new_head_number: u64,
803 current_head_number: u64,
804 ) -> Vec<ExecutedBlock<N>> {
805 let mut old_blocks = Vec::new();
806
807 for block_num in (new_head_number + 1)..=current_head_number {
808 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
809 let executed_block = block_state.block_ref().block.clone();
810 old_blocks.push(executed_block);
811 debug!(
812 target: "engine::tree",
813 block_number = block_num,
814 "Collected block for removal from in-memory state"
815 );
816 }
817 }
818
819 if old_blocks.is_empty() {
820 debug!(
821 target: "engine::tree",
822 "No blocks found in memory to remove, will clear and reset state"
823 );
824 }
825
826 old_blocks
827 }
828
829 fn apply_canonical_ancestor_via_reorg(
831 &self,
832 canonical_header: &SealedHeader<N::BlockHeader>,
833 old_blocks: Vec<ExecutedBlock<N>>,
834 ) -> ProviderResult<()> {
835 let new_head_hash = canonical_header.hash();
836 let new_head_number = canonical_header.number();
837
838 match self.canonical_block_by_hash(new_head_hash)? {
840 Some(executed_block) => {
841 let block_with_trie = ExecutedBlockWithTrieUpdates {
842 block: executed_block,
843 trie: ExecutedTrieUpdates::Missing,
844 };
845
846 self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
848 new: vec![block_with_trie],
849 old: old_blocks,
850 });
851
852 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
855
856 debug!(
857 target: "engine::tree",
858 block_number = new_head_number,
859 block_hash = ?new_head_hash,
860 "Successfully loaded canonical ancestor into memory via reorg"
861 );
862 }
863 None => {
864 warn!(
866 target: "engine::tree",
867 block_hash = ?new_head_hash,
868 "Could not find canonical ancestor block, updating header only"
869 );
870 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
871 }
872 }
873
874 Ok(())
875 }
876
877 fn handle_chain_advance_or_same_height(
879 &self,
880 canonical_header: &SealedHeader<N::BlockHeader>,
881 ) -> ProviderResult<()> {
882 let new_head_number = canonical_header.number();
883 let new_head_hash = canonical_header.hash();
884
885 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
887
888 self.ensure_block_in_memory(new_head_number, new_head_hash)
890 }
891
892 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
894 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
896 return Ok(());
897 }
898
899 if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
901 let block_with_trie = ExecutedBlockWithTrieUpdates {
902 block: executed_block,
903 trie: ExecutedTrieUpdates::Missing,
904 };
905
906 self.canonical_in_memory_state
907 .update_chain(NewCanonicalChain::Commit { new: vec![block_with_trie] });
908
909 debug!(
910 target: "engine::tree",
911 block_number,
912 block_hash = ?block_hash,
913 "Added canonical block to in-memory state"
914 );
915 }
916
917 Ok(())
918 }
919
920 fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
930 let target_hash = target.block.hash;
931 let canonical_head = self.state.tree_state.canonical_head();
933 let mut current_hash;
934 let mut current_block = target;
935 loop {
936 if current_block.block.hash == canonical_head.hash {
937 return Ok(false)
938 }
939 if current_block.block.number <= canonical_head.number {
941 break
942 }
943 current_hash = current_block.parent;
944
945 let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
946 current_block = next_block.block_with_parent();
947 }
948
949 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
951 return Ok(false)
952 }
953
954 if self.provider.block_number(target_hash)?.is_some() {
956 return Ok(false)
957 }
958
959 Ok(true)
960 }
961
962 fn persisting_kind_for(&self, block: BlockWithParent) -> PersistingKind {
964 let Some(action) = self.persistence_state.current_action() else {
966 return PersistingKind::NotPersisting
967 };
968 let CurrentPersistenceAction::SavingBlocks { highest } = action else {
970 return PersistingKind::PersistingNotDescendant
971 };
972
973 if block.block.number > highest.number &&
976 self.state.tree_state.is_descendant(*highest, block)
977 {
978 return PersistingKind::PersistingDescendant
979 }
980
981 PersistingKind::PersistingNotDescendant
983 }
984
985 #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
994 fn on_forkchoice_updated(
995 &mut self,
996 state: ForkchoiceState,
997 attrs: Option<T::PayloadAttributes>,
998 version: EngineApiMessageVersion,
999 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1000 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1001 self.metrics.engine.forkchoice_updated_messages.increment(1);
1002 if attrs.is_some() {
1003 self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1004 }
1005 self.canonical_in_memory_state.on_forkchoice_update_received();
1006
1007 if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1008 return Ok(TreeOutcome::new(on_updated))
1009 }
1010
1011 let valid_outcome = |head| {
1012 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1013 PayloadStatusEnum::Valid,
1014 Some(head),
1015 )))
1016 };
1017
1018 if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1034 trace!(target: "engine::tree", "fcu head hash is already canonical");
1035
1036 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1038 return Ok(TreeOutcome::new(outcome))
1040 }
1041
1042 if let Some(attr) = attrs {
1044 let tip = self
1045 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1046 .ok_or_else(|| {
1047 ProviderError::HeaderNotFound(state.head_block_hash.into())
1050 })?;
1051 let updated = self.process_payload_attributes(attr, &tip, state, version);
1052 return Ok(TreeOutcome::new(updated))
1053 }
1054
1055 return Ok(valid_outcome(state.head_block_hash))
1057 }
1058
1059 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1061 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1062
1063 if self.engine_kind.is_opstack() ||
1066 self.config.always_process_payload_attributes_on_canonical_head()
1067 {
1068 if let Some(attr) = attrs {
1069 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1070 let updated =
1071 self.process_payload_attributes(attr, &canonical_header, state, version);
1072 return Ok(TreeOutcome::new(updated))
1073 }
1074
1075 if self.config.unwind_canonical_header() {
1082 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1083 }
1084 }
1085
1086 return Ok(valid_outcome(state.head_block_hash))
1096 }
1097
1098 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1100 let tip = chain_update.tip().clone_sealed_header();
1101 self.on_canonical_chain_update(chain_update);
1102
1103 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1105 return Ok(TreeOutcome::new(outcome))
1107 }
1108
1109 if let Some(attr) = attrs {
1110 let updated = self.process_payload_attributes(attr, &tip, state, version);
1111 return Ok(TreeOutcome::new(updated))
1112 }
1113
1114 return Ok(valid_outcome(state.head_block_hash))
1115 }
1116
1117 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1124 !state.safe_block_hash.is_zero() &&
1126 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1127 {
1128 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1129 state.safe_block_hash
1130 } else {
1131 state.head_block_hash
1132 };
1133
1134 let target = self.lowest_buffered_ancestor_or(target);
1135 trace!(target: "engine::tree", %target, "downloading missing block");
1136
1137 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1138 PayloadStatusEnum::Syncing,
1139 )))
1140 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1141 }
1142
1143 #[expect(clippy::type_complexity)]
1152 fn try_recv_engine_message(
1153 &self,
1154 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1155 if self.persistence_state.in_progress() {
1156 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1158 Ok(msg) => Ok(Some(msg)),
1159 Err(err) => match err {
1160 RecvTimeoutError::Timeout => Ok(None),
1161 RecvTimeoutError::Disconnected => Err(RecvError),
1162 },
1163 }
1164 } else {
1165 self.incoming.recv().map(Some)
1166 }
1167 }
1168
1169 fn remove_blocks(&mut self, new_tip_num: u64) {
1172 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1173 if new_tip_num < self.persistence_state.last_persisted_block.number {
1174 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1175 let (tx, rx) = oneshot::channel();
1176 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1177 self.persistence_state.start_remove(new_tip_num, rx);
1178 }
1179 }
1180
1181 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1184 if blocks_to_persist.is_empty() {
1185 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1186 return
1187 }
1188
1189 let highest_num_hash = blocks_to_persist
1191 .iter()
1192 .max_by_key(|block| block.recovered_block().number())
1193 .map(|b| b.recovered_block().num_hash())
1194 .expect("Checked non-empty persisting blocks");
1195
1196 debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1197 let (tx, rx) = oneshot::channel();
1198 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1199
1200 self.persistence_state.start_save(highest_num_hash, rx);
1201 }
1202
1203 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1208 if self.persistence_state.in_progress() {
1209 let (mut rx, start_time, current_action) = self
1210 .persistence_state
1211 .rx
1212 .take()
1213 .expect("if a persistence task is in progress Receiver must be Some");
1214 match rx.try_recv() {
1216 Ok(last_persisted_hash_num) => {
1217 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1218 let Some(BlockNumHash {
1219 hash: last_persisted_block_hash,
1220 number: last_persisted_block_number,
1221 }) = last_persisted_hash_num
1222 else {
1223 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1226 return Ok(())
1227 };
1228
1229 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1230 self.persistence_state
1231 .finish(last_persisted_block_hash, last_persisted_block_number);
1232 self.on_new_persisted_block()?;
1233 }
1234 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1235 Err(TryRecvError::Empty) => {
1236 self.persistence_state.rx = Some((rx, start_time, current_action))
1237 }
1238 }
1239 }
1240
1241 if !self.persistence_state.in_progress() {
1242 if let Some(new_tip_num) = self.find_disk_reorg()? {
1243 self.remove_blocks(new_tip_num)
1244 } else if self.should_persist() {
1245 let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1246 self.persist_blocks(blocks_to_persist);
1247 }
1248 }
1249
1250 Ok(())
1251 }
1252
1253 fn on_engine_message(
1255 &mut self,
1256 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1257 ) -> Result<(), InsertBlockFatalError> {
1258 match msg {
1259 FromEngine::Event(event) => match event {
1260 FromOrchestrator::BackfillSyncStarted => {
1261 debug!(target: "engine::tree", "received backfill sync started event");
1262 self.backfill_sync_state = BackfillSyncState::Active;
1263 }
1264 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1265 self.on_backfill_sync_finished(ctrl)?;
1266 }
1267 },
1268 FromEngine::Request(request) => {
1269 match request {
1270 EngineApiRequest::InsertExecutedBlock(block) => {
1271 let block_num_hash = block.recovered_block().num_hash();
1272 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1273 return Ok(())
1275 }
1276
1277 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1278 let now = Instant::now();
1279
1280 if self.state.tree_state.canonical_block_hash() ==
1283 block.recovered_block().parent_hash()
1284 {
1285 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1286 self.canonical_in_memory_state.set_pending_block(block.clone());
1287 }
1288
1289 self.state.tree_state.insert_executed(block.clone());
1290 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1291 self.emit_event(EngineApiEvent::BeaconConsensus(
1292 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1293 ));
1294 }
1295 EngineApiRequest::Beacon(request) => {
1296 match request {
1297 BeaconEngineMessage::ForkchoiceUpdated {
1298 state,
1299 payload_attrs,
1300 tx,
1301 version,
1302 } => {
1303 let mut output =
1304 self.on_forkchoice_updated(state, payload_attrs, version);
1305
1306 if let Ok(res) = &mut output {
1307 self.state
1309 .forkchoice_state_tracker
1310 .set_latest(state, res.outcome.forkchoice_status());
1311
1312 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1314 state,
1315 res.outcome.forkchoice_status(),
1316 ));
1317
1318 self.on_maybe_tree_event(res.event.take())?;
1320 }
1321
1322 if let Err(err) =
1323 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1324 {
1325 self.metrics
1326 .engine
1327 .failed_forkchoice_updated_response_deliveries
1328 .increment(1);
1329 error!(target: "engine::tree", "Failed to send event: {err:?}");
1330 }
1331 }
1332 BeaconEngineMessage::NewPayload { payload, tx } => {
1333 let mut output = self.on_new_payload(payload);
1334
1335 let maybe_event =
1336 output.as_mut().ok().and_then(|out| out.event.take());
1337
1338 if let Err(err) =
1340 tx.send(output.map(|o| o.outcome).map_err(|e| {
1341 BeaconOnNewPayloadError::Internal(Box::new(e))
1342 }))
1343 {
1344 error!(target: "engine::tree", "Failed to send event: {err:?}");
1345 self.metrics
1346 .engine
1347 .failed_new_payload_response_deliveries
1348 .increment(1);
1349 }
1350
1351 self.on_maybe_tree_event(maybe_event)?;
1353 }
1354 }
1355 }
1356 }
1357 }
1358 FromEngine::DownloadedBlocks(blocks) => {
1359 if let Some(event) = self.on_downloaded(blocks)? {
1360 self.on_tree_event(event)?;
1361 }
1362 }
1363 }
1364 Ok(())
1365 }
1366
1367 fn on_backfill_sync_finished(
1381 &mut self,
1382 ctrl: ControlFlow,
1383 ) -> Result<(), InsertBlockFatalError> {
1384 debug!(target: "engine::tree", "received backfill sync finished event");
1385 self.backfill_sync_state = BackfillSyncState::Idle;
1386
1387 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1389 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1390 self.state.invalid_headers.insert(**bad_block);
1392
1393 Some(*target)
1395 } else {
1396 ctrl.block_number()
1398 };
1399
1400 let Some(backfill_height) = backfill_height else { return Ok(()) };
1402
1403 let Some(backfill_num_hash) = self
1409 .provider
1410 .block_hash(backfill_height)?
1411 .map(|hash| BlockNumHash { hash, number: backfill_height })
1412 else {
1413 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1414 return Ok(())
1415 };
1416
1417 if ctrl.is_unwind() {
1418 self.state.tree_state.reset(backfill_num_hash)
1421 } else {
1422 self.state.tree_state.remove_until(
1423 backfill_num_hash,
1424 self.persistence_state.last_persisted_block.hash,
1425 Some(backfill_num_hash),
1426 );
1427 }
1428
1429 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1430 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1431
1432 self.state.buffer.remove_old_blocks(backfill_height);
1434 self.canonical_in_memory_state.clear_state();
1437
1438 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1439 self.state.tree_state.set_canonical_head(new_head.num_hash());
1442 self.persistence_state.finish(new_head.hash(), new_head.number());
1443
1444 self.canonical_in_memory_state.set_canonical_head(new_head);
1446 }
1447
1448 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1451 else {
1452 return Ok(())
1453 };
1454 if sync_target_state.finalized_block_hash.is_zero() {
1455 return Ok(())
1457 }
1458 let newest_finalized = self
1460 .state
1461 .buffer
1462 .block(&sync_target_state.finalized_block_hash)
1463 .map(|block| block.number());
1464
1465 if let Some(backfill_target) =
1471 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1472 self.backfill_sync_target(progress, finalized_number, None)
1475 })
1476 {
1477 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1479 backfill_target.into(),
1480 )));
1481 return Ok(())
1482 };
1483
1484 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1486 }
1487
1488 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1492 if let Some(chain_update) = self.on_new_head(target)? {
1493 self.on_canonical_chain_update(chain_update);
1494 }
1495
1496 Ok(())
1497 }
1498
1499 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1501 if let Some(event) = event {
1502 self.on_tree_event(event)?;
1503 }
1504
1505 Ok(())
1506 }
1507
1508 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1512 match event {
1513 TreeEvent::TreeAction(action) => match action {
1514 TreeAction::MakeCanonical { sync_target_head } => {
1515 self.make_canonical(sync_target_head)?;
1516 }
1517 },
1518 TreeEvent::BackfillAction(action) => {
1519 self.emit_event(EngineApiEvent::BackfillAction(action));
1520 }
1521 TreeEvent::Download(action) => {
1522 self.emit_event(EngineApiEvent::Download(action));
1523 }
1524 }
1525
1526 Ok(())
1527 }
1528
1529 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1531 let event = event.into();
1532
1533 if event.is_backfill_action() {
1534 debug_assert_eq!(
1535 self.backfill_sync_state,
1536 BackfillSyncState::Idle,
1537 "backfill action should only be emitted when backfill is idle"
1538 );
1539
1540 if self.persistence_state.in_progress() {
1541 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1544 return
1545 }
1546
1547 self.backfill_sync_state = BackfillSyncState::Pending;
1548 self.metrics.engine.pipeline_runs.increment(1);
1549 debug!(target: "engine::tree", "emitting backfill action event");
1550 }
1551
1552 let _ = self.outgoing.send(event).inspect_err(
1553 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1554 );
1555 }
1556
1557 pub const fn should_persist(&self) -> bool {
1561 if !self.backfill_sync_state.is_idle() {
1562 return false
1564 }
1565
1566 let min_block = self.persistence_state.last_persisted_block.number;
1567 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1568 self.config.persistence_threshold()
1569 }
1570
1571 fn get_canonical_blocks_to_persist(
1583 &mut self,
1584 ) -> Result<Vec<ExecutedBlockWithTrieUpdates<N>>, AdvancePersistenceError> {
1585 debug_assert!(!self.persistence_state.in_progress());
1588
1589 let mut blocks_to_persist = Vec::new();
1590 let mut current_hash = self.state.tree_state.canonical_block_hash();
1591 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1592 let canonical_head_number = self.state.tree_state.canonical_block_number();
1593 let all_blocks_have_trie_updates = self
1594 .state
1595 .tree_state
1596 .blocks_by_hash
1597 .values()
1598 .all(|block| block.trie_updates().is_some());
1599
1600 let target_number = if all_blocks_have_trie_updates {
1601 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1603 } else {
1604 canonical_head_number
1606 };
1607
1608 debug!(
1609 target: "engine::tree",
1610 ?current_hash,
1611 ?last_persisted_number,
1612 ?canonical_head_number,
1613 ?all_blocks_have_trie_updates,
1614 ?target_number,
1615 "Returning canonical blocks to persist"
1616 );
1617 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1618 if block.recovered_block().number() <= last_persisted_number {
1619 break;
1620 }
1621
1622 if block.recovered_block().number() <= target_number {
1623 blocks_to_persist.push(block.clone());
1624 }
1625
1626 current_hash = block.recovered_block().parent_hash();
1627 }
1628
1629 blocks_to_persist.reverse();
1631
1632 for block in &mut blocks_to_persist {
1634 if block.trie.is_present() {
1635 continue
1636 }
1637
1638 debug!(
1639 target: "engine::tree",
1640 block = ?block.recovered_block().num_hash(),
1641 "Calculating trie updates before persisting"
1642 );
1643
1644 let provider = self
1645 .state_provider_builder(block.recovered_block().parent_hash())?
1646 .ok_or(AdvancePersistenceError::MissingAncestor(
1647 block.recovered_block().parent_hash(),
1648 ))?
1649 .build()?;
1650
1651 let mut trie_input = self.compute_trie_input(
1652 self.persisting_kind_for(block.recovered_block.block_with_parent()),
1653 self.provider.database_provider_ro()?,
1654 block.recovered_block().parent_hash(),
1655 None,
1656 )?;
1657 trie_input.append_ref(block.hashed_state());
1659 let (_root, updates) = provider.state_root_from_nodes_with_updates(trie_input)?;
1660 debug_assert_eq!(_root, block.recovered_block().state_root());
1661
1662 let trie_updates = Arc::new(updates);
1664 let tree_state_block = self
1665 .state
1666 .tree_state
1667 .blocks_by_hash
1668 .get_mut(&block.recovered_block().hash())
1669 .expect("blocks to persist are constructed from tree state blocks");
1670 tree_state_block.trie.set_present(trie_updates.clone());
1671 block.trie.set_present(trie_updates);
1672 }
1673
1674 Ok(blocks_to_persist)
1675 }
1676
1677 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1685 if let Some(remove_above) = self.find_disk_reorg()? {
1688 self.remove_blocks(remove_above);
1689 return Ok(())
1690 }
1691
1692 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1693 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1694 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1695 number: self.persistence_state.last_persisted_block.number,
1696 hash: self.persistence_state.last_persisted_block.hash,
1697 });
1698 Ok(())
1699 }
1700
1701 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1709 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1710 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1712 return Ok(Some(block.block.clone()))
1713 }
1714
1715 let (block, senders) = self
1716 .provider
1717 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1718 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1719 .split_sealed();
1720 let execution_output = self
1721 .provider
1722 .get_state(block.header().number())?
1723 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1724 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1725
1726 Ok(Some(ExecutedBlock {
1727 recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1728 execution_output: Arc::new(execution_output),
1729 hashed_state: Arc::new(hashed_state),
1730 }))
1731 }
1732
1733 fn sealed_header_by_hash(
1735 &self,
1736 hash: B256,
1737 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1738 let header = self.state.tree_state.sealed_header_by_hash(&hash);
1740
1741 if header.is_some() {
1742 Ok(header)
1743 } else {
1744 self.provider.sealed_header_by_hash(hash)
1745 }
1746 }
1747
1748 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1755 self.state
1756 .buffer
1757 .lowest_ancestor(&hash)
1758 .map(|block| block.parent_hash())
1759 .unwrap_or_else(|| hash)
1760 }
1761
1762 fn latest_valid_hash_for_invalid_payload(
1773 &mut self,
1774 parent_hash: B256,
1775 ) -> ProviderResult<Option<B256>> {
1776 if self.sealed_header_by_hash(parent_hash)?.is_some() {
1778 return Ok(Some(parent_hash))
1779 }
1780
1781 let mut current_hash = parent_hash;
1784 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1785 while let Some(block_with_parent) = current_block {
1786 current_hash = block_with_parent.parent;
1787 current_block = self.state.invalid_headers.get(¤t_hash);
1788
1789 if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1792 return Ok(Some(current_hash))
1793 }
1794 }
1795 Ok(None)
1796 }
1797
1798 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1802 if let Some(parent) = self.sealed_header_by_hash(parent_hash)? {
1805 if !parent.difficulty().is_zero() {
1806 parent_hash = B256::ZERO;
1807 }
1808 }
1809
1810 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1811 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1812 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1813 })
1814 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1815 }
1816
1817 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1821 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1822 return target.head_block_hash == block_hash
1823 }
1824 false
1825 }
1826
1827 fn check_invalid_ancestor_with_head(
1833 &mut self,
1834 check: B256,
1835 head: &SealedBlock<N::Block>,
1836 ) -> ProviderResult<Option<PayloadStatus>> {
1837 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1839
1840 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1841 }
1842
1843 fn on_invalid_new_payload(
1845 &mut self,
1846 head: SealedBlock<N::Block>,
1847 invalid: BlockWithParent,
1848 ) -> ProviderResult<PayloadStatus> {
1849 let status = self.prepare_invalid_response(invalid.parent)?;
1851
1852 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1854 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1855
1856 Ok(status)
1857 }
1858
1859 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1862 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1864 Ok(Some(self.prepare_invalid_response(header.parent)?))
1866 }
1867
1868 fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1871 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1872 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1873 return Err(e)
1874 }
1875
1876 if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1877 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1878 return Err(e)
1879 }
1880
1881 Ok(())
1882 }
1883
1884 #[instrument(level = "trace", skip(self), target = "engine::tree")]
1886 fn try_connect_buffered_blocks(
1887 &mut self,
1888 parent: BlockNumHash,
1889 ) -> Result<(), InsertBlockFatalError> {
1890 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1891
1892 if blocks.is_empty() {
1893 return Ok(())
1895 }
1896
1897 let now = Instant::now();
1898 let block_count = blocks.len();
1899 for child in blocks {
1900 let child_num_hash = child.num_hash();
1901 match self.insert_block(child) {
1902 Ok(res) => {
1903 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1904 if self.is_sync_target_head(child_num_hash.hash) &&
1905 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1906 {
1907 self.make_canonical(child_num_hash.hash)?;
1908 }
1909 }
1910 Err(err) => {
1911 if let InsertPayloadError::Block(err) = err {
1912 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1913 if let Err(fatal) = self.on_insert_block_error(err) {
1914 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1915 return Err(fatal)
1916 }
1917 }
1918 }
1919 }
1920 }
1921
1922 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1923 Ok(())
1924 }
1925
1926 fn buffer_block(
1928 &mut self,
1929 block: RecoveredBlock<N::Block>,
1930 ) -> Result<(), InsertBlockError<N::Block>> {
1931 if let Err(err) = self.validate_block(&block) {
1932 return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1933 }
1934 self.state.buffer.insert_block(block);
1935 Ok(())
1936 }
1937
1938 #[inline]
1943 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1944 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1945 }
1946
1947 #[inline]
1950 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1951 if block > local_tip {
1952 Some(block - local_tip)
1953 } else {
1954 None
1955 }
1956 }
1957
1958 fn backfill_sync_target(
1965 &self,
1966 canonical_tip_num: u64,
1967 target_block_number: u64,
1968 downloaded_block: Option<BlockNumHash>,
1969 ) -> Option<B256> {
1970 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1971
1972 let mut exceeds_backfill_threshold = if let Some(buffered_finalized) = sync_target_state
1974 .as_ref()
1975 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1976 {
1977 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number())
1980 } else {
1981 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
1983 };
1984
1985 if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1988 if downloaded_block.hash == state.finalized_block_hash {
1989 exceeds_backfill_threshold =
1991 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1992 }
1993 }
1994
1995 if exceeds_backfill_threshold {
1997 if let Some(state) = sync_target_state {
1998 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2000 Err(err) => {
2001 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2002 }
2003 Ok(None) => {
2004 if !state.finalized_block_hash.is_zero() {
2006 return Some(state.finalized_block_hash)
2009 }
2010
2011 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2024 return Some(state.head_block_hash)
2025 }
2026 Ok(Some(_)) => {
2027 }
2029 }
2030 }
2031 }
2032
2033 None
2034 }
2035
2036 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2039 let mut canonical = self.state.tree_state.current_canonical_head;
2040 let mut persisted = self.persistence_state.last_persisted_block;
2041
2042 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2043 Ok(self
2044 .sealed_header_by_hash(num_hash.hash)?
2045 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2046 .parent_num_hash())
2047 };
2048
2049 while canonical.number > persisted.number {
2052 canonical = parent_num_hash(canonical)?;
2053 }
2054
2055 if canonical == persisted {
2057 return Ok(None);
2058 }
2059
2060 while persisted.number > canonical.number {
2066 persisted = parent_num_hash(persisted)?;
2067 }
2068
2069 debug_assert_eq!(persisted.number, canonical.number);
2070
2071 while persisted.hash != canonical.hash {
2073 canonical = parent_num_hash(canonical)?;
2074 persisted = parent_num_hash(persisted)?;
2075 }
2076
2077 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2078
2079 Ok(Some(persisted.number))
2080 }
2081
2082 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2086 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2087 let start = Instant::now();
2088
2089 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2091
2092 let tip = chain_update.tip().clone_sealed_header();
2093 let notification = chain_update.to_chain_notification();
2094
2095 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2097 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2098 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2099 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2100
2101 self.update_reorg_metrics(old.len());
2102 self.reinsert_reorged_blocks(new.clone());
2103 let old = old
2106 .iter()
2107 .filter_map(|block| {
2108 let trie = self
2109 .state
2110 .tree_state
2111 .persisted_trie_updates
2112 .get(&block.recovered_block.hash())?
2113 .1
2114 .clone();
2115 Some(ExecutedBlockWithTrieUpdates {
2116 block: block.clone(),
2117 trie: ExecutedTrieUpdates::Present(trie),
2118 })
2119 })
2120 .collect::<Vec<_>>();
2121 self.reinsert_reorged_blocks(old);
2122 }
2123
2124 self.canonical_in_memory_state.update_chain(chain_update);
2126 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2127
2128 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2130
2131 self.canonical_in_memory_state.notify_canon_state(notification);
2133
2134 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2136 Box::new(tip),
2137 start.elapsed(),
2138 ));
2139 }
2140
2141 fn update_reorg_metrics(&self, old_chain_length: usize) {
2143 self.metrics.tree.reorgs.increment(1);
2144 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2145 }
2146
2147 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2149 for block in new_chain {
2150 if self
2151 .state
2152 .tree_state
2153 .executed_block_by_hash(block.recovered_block().hash())
2154 .is_none()
2155 {
2156 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2157 self.state.tree_state.insert_executed(block);
2158 }
2159 }
2160 }
2161
2162 fn on_disconnected_downloaded_block(
2167 &self,
2168 downloaded_block: BlockNumHash,
2169 missing_parent: BlockNumHash,
2170 head: BlockNumHash,
2171 ) -> Option<TreeEvent> {
2172 if let Some(target) =
2174 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2175 {
2176 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2177 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2178 }
2179
2180 let request = if let Some(distance) =
2190 self.distance_from_local_tip(head.number, missing_parent.number)
2191 {
2192 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2193 DownloadRequest::BlockRange(missing_parent.hash, distance)
2194 } else {
2195 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2196 DownloadRequest::single_block(missing_parent.hash)
2199 };
2200
2201 Some(TreeEvent::Download(request))
2202 }
2203
2204 #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2210 fn on_downloaded_block(
2211 &mut self,
2212 block: RecoveredBlock<N::Block>,
2213 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2214 let block_num_hash = block.num_hash();
2215 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2216 if self
2217 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2218 .is_some()
2219 {
2220 return Ok(None)
2221 }
2222
2223 if !self.backfill_sync_state.is_idle() {
2224 return Ok(None)
2225 }
2226
2227 match self.insert_block(block) {
2229 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2230 if self.is_sync_target_head(block_num_hash.hash) {
2231 trace!(target: "engine::tree", "appended downloaded sync target block");
2232
2233 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2236 sync_target_head: block_num_hash.hash,
2237 })))
2238 }
2239 trace!(target: "engine::tree", "appended downloaded block");
2240 self.try_connect_buffered_blocks(block_num_hash)?;
2241 }
2242 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2243 return Ok(self.on_disconnected_downloaded_block(
2246 block_num_hash,
2247 missing_ancestor,
2248 head,
2249 ))
2250 }
2251 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2252 trace!(target: "engine::tree", "downloaded block already executed");
2253 }
2254 Err(err) => {
2255 if let InsertPayloadError::Block(err) = err {
2256 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2257 if let Err(fatal) = self.on_insert_block_error(err) {
2258 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2259 return Err(fatal)
2260 }
2261 }
2262 }
2263 }
2264 Ok(None)
2265 }
2266
2267 fn insert_payload(
2268 &mut self,
2269 payload: T::ExecutionData,
2270 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2271 self.insert_block_or_payload(
2272 payload.block_with_parent(),
2273 payload,
2274 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2275 |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2276 )
2277 }
2278
2279 fn insert_block(
2280 &mut self,
2281 block: RecoveredBlock<N::Block>,
2282 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2283 self.insert_block_or_payload(
2284 block.block_with_parent(),
2285 block,
2286 |validator, block, ctx| validator.validate_block(block, ctx),
2287 |_, block| Ok(block),
2288 )
2289 }
2290
2291 fn insert_block_or_payload<Input, Err>(
2292 &mut self,
2293 block_id: BlockWithParent,
2294 input: Input,
2295 execute: impl FnOnce(
2296 &mut V,
2297 Input,
2298 TreeCtx<'_, N>,
2299 ) -> Result<ExecutedBlockWithTrieUpdates<N>, Err>,
2300 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2301 ) -> Result<InsertPayloadOk, Err>
2302 where
2303 Err: From<InsertBlockError<N::Block>>,
2304 {
2305 let block_insert_start = Instant::now();
2306 let block_num_hash = block_id.block;
2307 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2308
2309 match self.sealed_header_by_hash(block_num_hash.hash) {
2310 Err(err) => {
2311 let block = convert_to_block(self, input)?;
2312 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2313 }
2314 Ok(Some(_)) => {
2315 convert_to_block(self, input)?;
2318 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2319 }
2320 _ => {}
2321 };
2322
2323 match self.state_provider_builder(block_id.parent) {
2325 Err(err) => {
2326 let block = convert_to_block(self, input)?;
2327 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2328 }
2329 Ok(None) => {
2330 let block = convert_to_block(self, input)?;
2331
2332 let missing_ancestor = self
2335 .state
2336 .buffer
2337 .lowest_ancestor(&block.parent_hash())
2338 .map(|block| block.parent_num_hash())
2339 .unwrap_or_else(|| block.parent_num_hash());
2340
2341 self.state.buffer.insert_block(block);
2342
2343 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2344 head: self.state.tree_state.current_canonical_head,
2345 missing_ancestor,
2346 }))
2347 }
2348 Ok(Some(_)) => {}
2349 }
2350
2351 let is_fork = match self.is_fork(block_id) {
2353 Err(err) => {
2354 let block = convert_to_block(self, input)?;
2355 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2356 }
2357 Ok(is_fork) => is_fork,
2358 };
2359
2360 let ctx =
2361 TreeCtx::new(&mut self.state, &self.persistence_state, &self.canonical_in_memory_state);
2362
2363 let start = Instant::now();
2364
2365 let executed = execute(&mut self.payload_validator, input, ctx)?;
2366
2367 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2369 {
2370 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2371 self.canonical_in_memory_state.set_pending_block(executed.clone());
2372 }
2373
2374 self.state.tree_state.insert_executed(executed.clone());
2375 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2376
2377 let elapsed = start.elapsed();
2379 let engine_event = if is_fork {
2380 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2381 } else {
2382 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2383 };
2384 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2385
2386 self.metrics
2387 .engine
2388 .block_insert_total_duration
2389 .record(block_insert_start.elapsed().as_secs_f64());
2390 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2391 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2392 }
2393
2394 fn compute_trie_input<TP: DBProvider + BlockNumReader>(
2410 &self,
2411 persisting_kind: PersistingKind,
2412 provider: TP,
2413 parent_hash: B256,
2414 allocated_trie_input: Option<TrieInput>,
2415 ) -> ProviderResult<TrieInput> {
2416 let mut input = allocated_trie_input.unwrap_or_default();
2418
2419 let best_block_number = provider.best_block_number()?;
2420
2421 let (mut historical, mut blocks) = self
2422 .state
2423 .tree_state
2424 .blocks_by_hash(parent_hash)
2425 .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2426
2427 if persisting_kind.is_descendant() {
2430 while let Some(block) = blocks.last() {
2432 let recovered_block = block.recovered_block();
2433 if recovered_block.number() <= best_block_number {
2434 blocks.pop();
2437 } else {
2438 break
2441 }
2442 }
2443
2444 historical = if let Some(block) = blocks.last() {
2445 (block.recovered_block().number() - 1).into()
2448 } else {
2449 parent_hash.into()
2451 };
2452 }
2453
2454 if blocks.is_empty() {
2455 debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2456 } else {
2457 debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2458 }
2459
2460 let block_number = provider
2462 .convert_hash_or_number(historical)?
2463 .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2464
2465 let revert_state = if block_number == best_block_number {
2467 debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2470 HashedPostState::default()
2471 } else {
2472 let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
2473 provider.tx_ref(),
2474 block_number + 1,
2475 )
2476 .map_err(ProviderError::from)?;
2477 debug!(
2478 target: "engine::tree",
2479 block_number,
2480 best_block_number,
2481 accounts = revert_state.accounts.len(),
2482 storages = revert_state.storages.len(),
2483 "Non-empty revert state"
2484 );
2485 revert_state
2486 };
2487 input.append(revert_state);
2488
2489 input.extend_with_blocks(
2491 blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
2492 );
2493
2494 Ok(input)
2495 }
2496
2497 fn on_insert_block_error(
2503 &mut self,
2504 error: InsertBlockError<N::Block>,
2505 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2506 let (block, error) = error.split();
2507
2508 let validation_err = error.ensure_validation_error()?;
2511
2512 warn!(
2516 target: "engine::tree",
2517 invalid_hash=%block.hash(),
2518 invalid_number=block.number(),
2519 %validation_err,
2520 "Invalid block error on new payload",
2521 );
2522 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2523
2524 self.state.invalid_headers.insert(block.block_with_parent());
2526 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2527 Box::new(block),
2528 )));
2529
2530 Ok(PayloadStatus::new(
2531 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2532 latest_valid_hash,
2533 ))
2534 }
2535
2536 fn on_new_payload_error(
2538 &mut self,
2539 error: NewPayloadError,
2540 parent_hash: B256,
2541 ) -> ProviderResult<PayloadStatus> {
2542 error!(target: "engine::tree", %error, "Invalid payload");
2543 let latest_valid_hash =
2546 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2547 None
2551 } else {
2552 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2553 };
2554
2555 let status = PayloadStatusEnum::from(error);
2556 Ok(PayloadStatus::new(status, latest_valid_hash))
2557 }
2558
2559 pub fn find_canonical_header(
2561 &self,
2562 hash: B256,
2563 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2564 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2565
2566 if canonical.is_none() {
2567 canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2568 }
2569
2570 Ok(canonical)
2571 }
2572
2573 fn update_finalized_block(
2575 &self,
2576 finalized_block_hash: B256,
2577 ) -> Result<(), OnForkChoiceUpdated> {
2578 if finalized_block_hash.is_zero() {
2579 return Ok(())
2580 }
2581
2582 match self.find_canonical_header(finalized_block_hash) {
2583 Ok(None) => {
2584 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2585 return Err(OnForkChoiceUpdated::invalid_state())
2587 }
2588 Ok(Some(finalized)) => {
2589 if Some(finalized.num_hash()) !=
2590 self.canonical_in_memory_state.get_finalized_num_hash()
2591 {
2592 let _ = self.persistence.save_finalized_block_number(finalized.number());
2595 self.canonical_in_memory_state.set_finalized(finalized);
2596 }
2597 }
2598 Err(err) => {
2599 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2600 }
2601 }
2602
2603 Ok(())
2604 }
2605
2606 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2608 if safe_block_hash.is_zero() {
2609 return Ok(())
2610 }
2611
2612 match self.find_canonical_header(safe_block_hash) {
2613 Ok(None) => {
2614 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2615 return Err(OnForkChoiceUpdated::invalid_state())
2617 }
2618 Ok(Some(safe)) => {
2619 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2620 let _ = self.persistence.save_safe_block_number(safe.number());
2623 self.canonical_in_memory_state.set_safe(safe);
2624 }
2625 }
2626 Err(err) => {
2627 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2628 }
2629 }
2630
2631 Ok(())
2632 }
2633
2634 fn ensure_consistent_forkchoice_state(
2643 &self,
2644 state: ForkchoiceState,
2645 ) -> Result<(), OnForkChoiceUpdated> {
2646 self.update_finalized_block(state.finalized_block_hash)?;
2652
2653 self.update_safe_block(state.safe_block_hash)
2659 }
2660
2661 fn pre_validate_forkchoice_update(
2666 &mut self,
2667 state: ForkchoiceState,
2668 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2669 if state.head_block_hash.is_zero() {
2670 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2671 }
2672
2673 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2676 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2677 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2678 }
2679
2680 if !self.backfill_sync_state.is_idle() {
2681 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2684 return Ok(Some(OnForkChoiceUpdated::syncing()))
2685 }
2686
2687 Ok(None)
2688 }
2689
2690 fn process_payload_attributes(
2695 &self,
2696 attrs: T::PayloadAttributes,
2697 head: &N::BlockHeader,
2698 state: ForkchoiceState,
2699 version: EngineApiMessageVersion,
2700 ) -> OnForkChoiceUpdated {
2701 if let Err(err) =
2702 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2703 {
2704 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2705 return OnForkChoiceUpdated::invalid_payload_attributes()
2706 }
2707
2708 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2713 state.head_block_hash,
2714 attrs,
2715 version as u8,
2716 ) {
2717 Ok(attributes) => {
2718 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2721
2722 OnForkChoiceUpdated::updated_with_pending_payload_id(
2734 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2735 pending_payload_id,
2736 )
2737 }
2738 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2739 }
2740 }
2741
2742 pub(crate) fn remove_before(
2749 &mut self,
2750 upper_bound: BlockNumHash,
2751 finalized_hash: Option<B256>,
2752 ) -> ProviderResult<()> {
2753 let num = if let Some(hash) = finalized_hash {
2756 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2757 } else {
2758 None
2759 };
2760
2761 self.state.tree_state.remove_until(
2762 upper_bound,
2763 self.persistence_state.last_persisted_block.hash,
2764 num,
2765 );
2766 Ok(())
2767 }
2768
2769 pub fn state_provider_builder(
2774 &self,
2775 hash: B256,
2776 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2777 where
2778 P: BlockReader + StateProviderFactory + StateReader + Clone,
2779 {
2780 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2781 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2782 return Ok(Some(StateProviderBuilder::new(
2784 self.provider.clone(),
2785 historical,
2786 Some(blocks),
2787 )))
2788 }
2789
2790 if let Some(header) = self.provider.header(&hash)? {
2792 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2793 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2796 }
2797
2798 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2799 Ok(None)
2800 }
2801}
2802
2803#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2809pub enum BlockStatus {
2810 Valid,
2812 Disconnected {
2814 head: BlockNumHash,
2816 missing_ancestor: BlockNumHash,
2818 },
2819}
2820
2821#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2826pub enum InsertPayloadOk {
2827 AlreadySeen(BlockStatus),
2829 Inserted(BlockStatus),
2831}
2832
2833#[derive(Debug, Clone, Copy)]
2835pub enum PersistingKind {
2836 NotPersisting,
2838 PersistingNotDescendant,
2840 PersistingDescendant,
2842}
2843
2844impl PersistingKind {
2845 pub const fn can_run_parallel_state_root(&self) -> bool {
2850 matches!(self, Self::NotPersisting | Self::PersistingDescendant)
2851 }
2852
2853 pub const fn is_descendant(&self) -> bool {
2856 matches!(self, Self::PersistingDescendant)
2857 }
2858}