1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use persistence_state::CurrentPersistenceAction;
17use reth_chain_state::{
18 CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
19 MemoryOverlayStateProvider, NewCanonicalChain,
20};
21use reth_consensus::{Consensus, FullConsensus};
22use reth_engine_primitives::{
23 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
24 ForkchoiceStateTracker, OnForkChoiceUpdated,
25};
26use reth_errors::{ConsensusError, ProviderResult};
27use reth_evm::{ConfigureEvm, OnStateHook};
28use reth_payload_builder::PayloadBuilderHandle;
29use reth_payload_primitives::{
30 BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
31};
32use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
33use reth_provider::{
34 providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
35 HashedPostStateProvider, ProviderError, StateProviderBox, StateProviderFactory, StateReader,
36 StateRootProvider, TransactionVariant,
37};
38use reth_revm::database::StateProviderDatabase;
39use reth_stages_api::ControlFlow;
40use reth_trie::{HashedPostState, TrieInput};
41use reth_trie_db::DatabaseHashedPostState;
42use revm::state::EvmState;
43use state::TreeState;
44use std::{
45 fmt::Debug,
46 sync::{
47 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
48 Arc,
49 },
50 time::Instant,
51};
52use tokio::sync::{
53 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
54 oneshot::{self, error::TryRecvError},
55};
56use tracing::*;
57
58mod block_buffer;
59mod cached_state;
60pub mod error;
61mod instrumented_state;
62mod invalid_headers;
63mod metrics;
64mod payload_processor;
65pub mod payload_validator;
66mod persistence_state;
67pub mod precompile_cache;
68#[cfg(test)]
69mod tests;
70#[expect(unused)]
72mod trie_updates;
73
74use crate::tree::error::AdvancePersistenceError;
75pub use block_buffer::BlockBuffer;
76pub use invalid_headers::InvalidHeaderCache;
77pub use payload_processor::*;
78pub use payload_validator::{BasicEngineValidator, EngineValidator};
79pub use persistence_state::PersistenceState;
80pub use reth_engine_primitives::TreeConfig;
81use reth_trie::KeccakKeyHasher;
82
83pub mod state;
84
85pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
95
96#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99 provider_factory: P,
101 historical: B256,
103 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108 pub const fn new(
111 provider_factory: P,
112 historical: B256,
113 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
114 ) -> Self {
115 Self { provider_factory, historical, overlay }
116 }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121 P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123 pub fn build(&self) -> ProviderResult<StateProviderBox> {
125 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126 if let Some(overlay) = self.overlay.clone() {
127 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128 }
129 Ok(provider)
130 }
131}
132
133#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138 tree_state: TreeState<N>,
140 forkchoice_state_tracker: ForkchoiceStateTracker,
142 buffer: BlockBuffer<N::Block>,
144 invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150 fn new(
151 block_buffer_limit: u32,
152 max_invalid_header_cache_length: u32,
153 canonical_block: BlockNumHash,
154 engine_kind: EngineApiKind,
155 ) -> Self {
156 Self {
157 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158 buffer: BlockBuffer::new(block_buffer_limit),
159 tree_state: TreeState::new(canonical_block, engine_kind),
160 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161 }
162 }
163}
164
165#[derive(Debug)]
167pub struct TreeOutcome<T> {
168 pub outcome: T,
170 pub event: Option<TreeEvent>,
172}
173
174impl<T> TreeOutcome<T> {
175 pub const fn new(outcome: T) -> Self {
177 Self { outcome, event: None }
178 }
179
180 pub fn with_event(mut self, event: TreeEvent) -> Self {
182 self.event = Some(event);
183 self
184 }
185}
186
187#[derive(Debug)]
189pub enum TreeEvent {
190 TreeAction(TreeAction),
192 BackfillAction(BackfillAction),
194 Download(DownloadRequest),
196}
197
198impl TreeEvent {
199 const fn is_backfill_action(&self) -> bool {
201 matches!(self, Self::BackfillAction(_))
202 }
203}
204
205#[derive(Debug)]
207pub enum TreeAction {
208 MakeCanonical {
210 sync_target_head: B256,
212 },
213}
214
215struct MeteredStateHook {
217 metrics: reth_evm::metrics::ExecutorMetrics,
218 inner_hook: Box<dyn OnStateHook>,
219}
220
221impl OnStateHook for MeteredStateHook {
222 fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
223 let accounts = state.keys().len();
225 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
226 let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
227
228 self.metrics.accounts_loaded_histogram.record(accounts as f64);
229 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
230 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
231
232 self.inner_hook.on_state(source, state);
234 }
235}
236
237pub struct EngineApiTreeHandler<N, P, T, V, C>
242where
243 N: NodePrimitives,
244 T: PayloadTypes,
245 C: ConfigureEvm<Primitives = N> + 'static,
246{
247 provider: P,
248 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
249 payload_validator: V,
250 state: EngineApiTreeState<N>,
252 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
261 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
263 outgoing: UnboundedSender<EngineApiEvent<N>>,
265 persistence: PersistenceHandle<N>,
267 persistence_state: PersistenceState,
269 backfill_sync_state: BackfillSyncState,
271 canonical_in_memory_state: CanonicalInMemoryState<N>,
274 payload_builder: PayloadBuilderHandle<T>,
277 config: TreeConfig,
279 metrics: EngineApiMetrics,
281 engine_kind: EngineApiKind,
283 evm_config: C,
285}
286
287impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
288 for EngineApiTreeHandler<N, P, T, V, C>
289where
290 N: NodePrimitives,
291 C: Debug + ConfigureEvm<Primitives = N>,
292{
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 f.debug_struct("EngineApiTreeHandler")
295 .field("provider", &self.provider)
296 .field("consensus", &self.consensus)
297 .field("payload_validator", &self.payload_validator)
298 .field("state", &self.state)
299 .field("incoming_tx", &self.incoming_tx)
300 .field("persistence", &self.persistence)
301 .field("persistence_state", &self.persistence_state)
302 .field("backfill_sync_state", &self.backfill_sync_state)
303 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
304 .field("payload_builder", &self.payload_builder)
305 .field("config", &self.config)
306 .field("metrics", &self.metrics)
307 .field("engine_kind", &self.engine_kind)
308 .field("evm_config", &self.evm_config)
309 .finish()
310 }
311}
312
313impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
314where
315 N: NodePrimitives,
316 P: DatabaseProviderFactory
317 + BlockReader<Block = N::Block, Header = N::BlockHeader>
318 + StateProviderFactory
319 + StateReader<Receipt = N::Receipt>
320 + HashedPostStateProvider
321 + Clone
322 + 'static,
323 <P as DatabaseProviderFactory>::Provider:
324 BlockReader<Block = N::Block, Header = N::BlockHeader>,
325 C: ConfigureEvm<Primitives = N> + 'static,
326 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
327 V: EngineValidator<T>,
328{
329 #[expect(clippy::too_many_arguments)]
331 pub fn new(
332 provider: P,
333 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
334 payload_validator: V,
335 outgoing: UnboundedSender<EngineApiEvent<N>>,
336 state: EngineApiTreeState<N>,
337 canonical_in_memory_state: CanonicalInMemoryState<N>,
338 persistence: PersistenceHandle<N>,
339 persistence_state: PersistenceState,
340 payload_builder: PayloadBuilderHandle<T>,
341 config: TreeConfig,
342 engine_kind: EngineApiKind,
343 evm_config: C,
344 ) -> Self {
345 let (incoming_tx, incoming) = std::sync::mpsc::channel();
346
347 Self {
348 provider,
349 consensus,
350 payload_validator,
351 incoming,
352 outgoing,
353 persistence,
354 persistence_state,
355 backfill_sync_state: BackfillSyncState::Idle,
356 state,
357 canonical_in_memory_state,
358 payload_builder,
359 config,
360 metrics: Default::default(),
361 incoming_tx,
362 engine_kind,
363 evm_config,
364 }
365 }
366
367 #[expect(clippy::complexity)]
373 pub fn spawn_new(
374 provider: P,
375 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
376 payload_validator: V,
377 persistence: PersistenceHandle<N>,
378 payload_builder: PayloadBuilderHandle<T>,
379 canonical_in_memory_state: CanonicalInMemoryState<N>,
380 config: TreeConfig,
381 kind: EngineApiKind,
382 evm_config: C,
383 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
384 {
385 let best_block_number = provider.best_block_number().unwrap_or(0);
386 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
387
388 let persistence_state = PersistenceState {
389 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
390 rx: None,
391 };
392
393 let (tx, outgoing) = unbounded_channel();
394 let state = EngineApiTreeState::new(
395 config.block_buffer_limit(),
396 config.max_invalid_header_cache_length(),
397 header.num_hash(),
398 kind,
399 );
400
401 let task = Self::new(
402 provider,
403 consensus,
404 payload_validator,
405 tx,
406 state,
407 canonical_in_memory_state,
408 persistence,
409 persistence_state,
410 payload_builder,
411 config,
412 kind,
413 evm_config,
414 );
415 let incoming = task.incoming_tx.clone();
416 std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
417 (incoming, outgoing)
418 }
419
420 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
422 self.incoming_tx.clone()
423 }
424
425 pub fn run(mut self) {
429 loop {
430 match self.try_recv_engine_message() {
431 Ok(Some(msg)) => {
432 debug!(target: "engine::tree", %msg, "received new engine message");
433 if let Err(fatal) = self.on_engine_message(msg) {
434 error!(target: "engine::tree", %fatal, "insert block fatal error");
435 return
436 }
437 }
438 Ok(None) => {
439 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440 }
441 Err(_err) => {
442 error!(target: "engine::tree", "Engine channel disconnected");
443 return
444 }
445 }
446
447 if let Err(err) = self.advance_persistence() {
448 error!(target: "engine::tree", %err, "Advancing persistence failed");
449 return
450 }
451 }
452 }
453
454 fn on_downloaded(
460 &mut self,
461 mut blocks: Vec<RecoveredBlock<N::Block>>,
462 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463 if blocks.is_empty() {
464 return Ok(None)
466 }
467
468 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470 for block in blocks.drain(..batch) {
471 if let Some(event) = self.on_downloaded_block(block)? {
472 let needs_backfill = event.is_backfill_action();
473 self.on_tree_event(event)?;
474 if needs_backfill {
475 return Ok(None)
477 }
478 }
479 }
480
481 if !blocks.is_empty() {
483 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484 }
485
486 Ok(None)
487 }
488
489 #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
504 fn on_new_payload(
505 &mut self,
506 payload: T::ExecutionData,
507 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508 trace!(target: "engine::tree", "invoked new payload");
509 self.metrics.engine.new_payload_messages.increment(1);
510
511 let validation_start = Instant::now();
512
513 let parent_hash = payload.parent_hash();
539
540 self.metrics
541 .block_validation
542 .record_payload_validation(validation_start.elapsed().as_secs_f64());
543
544 let num_hash = payload.num_hash();
545 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
546 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
547
548 let block_hash = num_hash.hash;
549 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
550 if lowest_buffered_ancestor == block_hash {
551 lowest_buffered_ancestor = parent_hash;
552 }
553
554 if let Some(invalid) = self.state.invalid_headers.get(&lowest_buffered_ancestor) {
556 let block = match self.payload_validator.ensure_well_formed_payload(payload) {
562 Ok(block) => block,
563 Err(error) => {
564 let status = self.on_new_payload_error(error, parent_hash)?;
565 return Ok(TreeOutcome::new(status))
566 }
567 };
568
569 let status = self.on_invalid_new_payload(block.into_sealed_block(), invalid)?;
570 return Ok(TreeOutcome::new(status))
571 }
572
573 let status = if self.backfill_sync_state.is_idle() {
574 let mut latest_valid_hash = None;
575 match self.insert_payload(payload) {
576 Ok(status) => {
577 let status = match status {
578 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
579 latest_valid_hash = Some(block_hash);
580 self.try_connect_buffered_blocks(num_hash)?;
581 PayloadStatusEnum::Valid
582 }
583 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
584 latest_valid_hash = Some(block_hash);
585 PayloadStatusEnum::Valid
586 }
587 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
588 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
589 PayloadStatusEnum::Syncing
591 }
592 };
593
594 PayloadStatus::new(status, latest_valid_hash)
595 }
596 Err(error) => match error {
597 InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
598 InsertPayloadError::Payload(error) => {
599 self.on_new_payload_error(error, parent_hash)?
600 }
601 },
602 }
603 } else {
604 match self.payload_validator.ensure_well_formed_payload(payload) {
605 Ok(block) => {
607 if let Err(error) = self.buffer_block(block) {
608 self.on_insert_block_error(error)?
609 } else {
610 PayloadStatus::from_status(PayloadStatusEnum::Syncing)
611 }
612 }
613 Err(error) => self.on_new_payload_error(error, parent_hash)?,
614 }
615 };
616
617 let mut outcome = TreeOutcome::new(status);
618 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
620 if self.state.tree_state.canonical_block_hash() != block_hash {
622 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
623 sync_target_head: block_hash,
624 }));
625 }
626 }
627
628 Ok(outcome)
629 }
630
631 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
638 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
640 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
641 self.metrics.engine.executed_new_block_cache_miss.increment(1);
642 return Ok(None)
643 };
644
645 let new_head_number = new_head_block.recovered_block().number();
646 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
647
648 let mut new_chain = vec![new_head_block.clone()];
649 let mut current_hash = new_head_block.recovered_block().parent_hash();
650 let mut current_number = new_head_number - 1;
651
652 while current_number > current_canonical_number {
657 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
658 {
659 current_hash = block.recovered_block().parent_hash();
660 current_number -= 1;
661 new_chain.push(block);
662 } else {
663 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
664 return Ok(None);
667 }
668 }
669
670 if current_hash == self.state.tree_state.current_canonical_head.hash {
673 new_chain.reverse();
674
675 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
677 }
678
679 let mut old_chain = Vec::new();
681 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
682
683 while current_canonical_number > current_number {
686 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
687 old_chain.push(block.clone());
688 old_hash = block.recovered_block().parent_hash();
689 current_canonical_number -= 1;
690 } else {
691 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
693 return Ok(None);
694 }
695 }
696
697 debug_assert_eq!(current_number, current_canonical_number);
699
700 while old_hash != current_hash {
703 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
704 old_hash = block.recovered_block().parent_hash();
705 old_chain.push(block);
706 } else {
707 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
709 return Ok(None);
710 }
711
712 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
713 {
714 current_hash = block.recovered_block().parent_hash();
715 new_chain.push(block);
716 } else {
717 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
719 return Ok(None);
720 }
721 }
722 new_chain.reverse();
723 old_chain.reverse();
724
725 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
726 }
727
728 fn update_latest_block_to_canonical_ancestor(
740 &mut self,
741 canonical_header: &SealedHeader<N::BlockHeader>,
742 ) -> ProviderResult<()> {
743 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
744 let current_head_number = self.state.tree_state.canonical_block_number();
745 let new_head_number = canonical_header.number();
746 let new_head_hash = canonical_header.hash();
747
748 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
750
751 if new_head_number < current_head_number {
753 debug!(
754 target: "engine::tree",
755 current_head = current_head_number,
756 new_head = new_head_number,
757 new_head_hash = ?new_head_hash,
758 "FCU unwind detected: reverting to canonical ancestor"
759 );
760
761 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
762 } else {
763 debug!(
764 target: "engine::tree",
765 previous_head = current_head_number,
766 new_head = new_head_number,
767 new_head_hash = ?new_head_hash,
768 "Advancing latest block to canonical ancestor"
769 );
770 self.handle_chain_advance_or_same_height(canonical_header)
771 }
772 }
773
774 fn handle_canonical_chain_unwind(
777 &self,
778 current_head_number: u64,
779 canonical_header: &SealedHeader<N::BlockHeader>,
780 ) -> ProviderResult<()> {
781 let new_head_number = canonical_header.number();
782 debug!(
783 target: "engine::tree",
784 from = current_head_number,
785 to = new_head_number,
786 "Handling unwind: collecting blocks to remove from in-memory state"
787 );
788
789 let old_blocks =
791 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
792
793 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
795 }
796
797 fn collect_blocks_for_canonical_unwind(
799 &self,
800 new_head_number: u64,
801 current_head_number: u64,
802 ) -> Vec<ExecutedBlock<N>> {
803 let mut old_blocks = Vec::new();
804
805 for block_num in (new_head_number + 1)..=current_head_number {
806 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
807 let executed_block = block_state.block_ref().block.clone();
808 old_blocks.push(executed_block);
809 debug!(
810 target: "engine::tree",
811 block_number = block_num,
812 "Collected block for removal from in-memory state"
813 );
814 }
815 }
816
817 if old_blocks.is_empty() {
818 debug!(
819 target: "engine::tree",
820 "No blocks found in memory to remove, will clear and reset state"
821 );
822 }
823
824 old_blocks
825 }
826
827 fn apply_canonical_ancestor_via_reorg(
829 &self,
830 canonical_header: &SealedHeader<N::BlockHeader>,
831 old_blocks: Vec<ExecutedBlock<N>>,
832 ) -> ProviderResult<()> {
833 let new_head_hash = canonical_header.hash();
834 let new_head_number = canonical_header.number();
835
836 match self.canonical_block_by_hash(new_head_hash)? {
838 Some(executed_block) => {
839 let block_with_trie = ExecutedBlockWithTrieUpdates {
840 block: executed_block,
841 trie: ExecutedTrieUpdates::Missing,
842 };
843
844 self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
846 new: vec![block_with_trie],
847 old: old_blocks,
848 });
849
850 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
853
854 debug!(
855 target: "engine::tree",
856 block_number = new_head_number,
857 block_hash = ?new_head_hash,
858 "Successfully loaded canonical ancestor into memory via reorg"
859 );
860 }
861 None => {
862 warn!(
864 target: "engine::tree",
865 block_hash = ?new_head_hash,
866 "Could not find canonical ancestor block, updating header only"
867 );
868 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
869 }
870 }
871
872 Ok(())
873 }
874
875 fn handle_chain_advance_or_same_height(
877 &self,
878 canonical_header: &SealedHeader<N::BlockHeader>,
879 ) -> ProviderResult<()> {
880 let new_head_number = canonical_header.number();
881 let new_head_hash = canonical_header.hash();
882
883 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
885
886 self.ensure_block_in_memory(new_head_number, new_head_hash)
888 }
889
890 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
892 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
894 return Ok(());
895 }
896
897 if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
899 let block_with_trie = ExecutedBlockWithTrieUpdates {
900 block: executed_block,
901 trie: ExecutedTrieUpdates::Missing,
902 };
903
904 self.canonical_in_memory_state
905 .update_chain(NewCanonicalChain::Commit { new: vec![block_with_trie] });
906
907 debug!(
908 target: "engine::tree",
909 block_number,
910 block_hash = ?block_hash,
911 "Added canonical block to in-memory state"
912 );
913 }
914
915 Ok(())
916 }
917
918 fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
928 let target_hash = target.block.hash;
929 let canonical_head = self.state.tree_state.canonical_head();
931 let mut current_hash;
932 let mut current_block = target;
933 loop {
934 if current_block.block.hash == canonical_head.hash {
935 return Ok(false)
936 }
937 if current_block.block.number <= canonical_head.number {
939 break
940 }
941 current_hash = current_block.parent;
942
943 let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
944 current_block = next_block.block_with_parent();
945 }
946
947 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
949 return Ok(false)
950 }
951
952 if self.provider.block_number(target_hash)?.is_some() {
954 return Ok(false)
955 }
956
957 Ok(true)
958 }
959
960 fn persisting_kind_for(&self, block: BlockWithParent) -> PersistingKind {
962 let Some(action) = self.persistence_state.current_action() else {
964 return PersistingKind::NotPersisting
965 };
966 let CurrentPersistenceAction::SavingBlocks { highest } = action else {
968 return PersistingKind::PersistingNotDescendant
969 };
970
971 if block.block.number > highest.number &&
974 self.state.tree_state.is_descendant(*highest, block)
975 {
976 return PersistingKind::PersistingDescendant
977 }
978
979 PersistingKind::PersistingNotDescendant
981 }
982
983 #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
992 fn on_forkchoice_updated(
993 &mut self,
994 state: ForkchoiceState,
995 attrs: Option<T::PayloadAttributes>,
996 version: EngineApiMessageVersion,
997 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
998 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
999 self.metrics.engine.forkchoice_updated_messages.increment(1);
1000 if attrs.is_some() {
1001 self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1002 }
1003 self.canonical_in_memory_state.on_forkchoice_update_received();
1004
1005 if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1006 return Ok(TreeOutcome::new(on_updated))
1007 }
1008
1009 let valid_outcome = |head| {
1010 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1011 PayloadStatusEnum::Valid,
1012 Some(head),
1013 )))
1014 };
1015
1016 if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1032 trace!(target: "engine::tree", "fcu head hash is already canonical");
1033
1034 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1036 return Ok(TreeOutcome::new(outcome))
1038 }
1039
1040 if let Some(attr) = attrs {
1042 let tip = self
1043 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1044 .ok_or_else(|| {
1045 ProviderError::HeaderNotFound(state.head_block_hash.into())
1048 })?;
1049 let updated = self.process_payload_attributes(attr, &tip, state, version);
1050 return Ok(TreeOutcome::new(updated))
1051 }
1052
1053 return Ok(valid_outcome(state.head_block_hash))
1055 }
1056
1057 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1059 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1060
1061 if self.engine_kind.is_opstack() ||
1064 self.config.always_process_payload_attributes_on_canonical_head()
1065 {
1066 if let Some(attr) = attrs {
1067 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1068 let updated =
1069 self.process_payload_attributes(attr, &canonical_header, state, version);
1070 return Ok(TreeOutcome::new(updated))
1071 }
1072
1073 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1080 }
1081
1082 return Ok(valid_outcome(state.head_block_hash))
1092 }
1093
1094 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1096 let tip = chain_update.tip().clone_sealed_header();
1097 self.on_canonical_chain_update(chain_update);
1098
1099 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1101 return Ok(TreeOutcome::new(outcome))
1103 }
1104
1105 if let Some(attr) = attrs {
1106 let updated = self.process_payload_attributes(attr, &tip, state, version);
1107 return Ok(TreeOutcome::new(updated))
1108 }
1109
1110 return Ok(valid_outcome(state.head_block_hash))
1111 }
1112
1113 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1120 !state.safe_block_hash.is_zero() &&
1122 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1123 {
1124 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1125 state.safe_block_hash
1126 } else {
1127 state.head_block_hash
1128 };
1129
1130 let target = self.lowest_buffered_ancestor_or(target);
1131 trace!(target: "engine::tree", %target, "downloading missing block");
1132
1133 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1134 PayloadStatusEnum::Syncing,
1135 )))
1136 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1137 }
1138
1139 #[expect(clippy::type_complexity)]
1148 fn try_recv_engine_message(
1149 &self,
1150 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1151 if self.persistence_state.in_progress() {
1152 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1154 Ok(msg) => Ok(Some(msg)),
1155 Err(err) => match err {
1156 RecvTimeoutError::Timeout => Ok(None),
1157 RecvTimeoutError::Disconnected => Err(RecvError),
1158 },
1159 }
1160 } else {
1161 self.incoming.recv().map(Some)
1162 }
1163 }
1164
1165 fn remove_blocks(&mut self, new_tip_num: u64) {
1168 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1169 if new_tip_num < self.persistence_state.last_persisted_block.number {
1170 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1171 let (tx, rx) = oneshot::channel();
1172 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1173 self.persistence_state.start_remove(new_tip_num, rx);
1174 }
1175 }
1176
1177 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1180 if blocks_to_persist.is_empty() {
1181 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1182 return
1183 }
1184
1185 let highest_num_hash = blocks_to_persist
1187 .iter()
1188 .max_by_key(|block| block.recovered_block().number())
1189 .map(|b| b.recovered_block().num_hash())
1190 .expect("Checked non-empty persisting blocks");
1191
1192 debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1193 let (tx, rx) = oneshot::channel();
1194 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1195
1196 self.persistence_state.start_save(highest_num_hash, rx);
1197 }
1198
1199 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1204 if self.persistence_state.in_progress() {
1205 let (mut rx, start_time, current_action) = self
1206 .persistence_state
1207 .rx
1208 .take()
1209 .expect("if a persistence task is in progress Receiver must be Some");
1210 match rx.try_recv() {
1212 Ok(last_persisted_hash_num) => {
1213 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1214 let Some(BlockNumHash {
1215 hash: last_persisted_block_hash,
1216 number: last_persisted_block_number,
1217 }) = last_persisted_hash_num
1218 else {
1219 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1222 return Ok(())
1223 };
1224
1225 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1226 self.persistence_state
1227 .finish(last_persisted_block_hash, last_persisted_block_number);
1228 self.on_new_persisted_block()?;
1229 }
1230 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1231 Err(TryRecvError::Empty) => {
1232 self.persistence_state.rx = Some((rx, start_time, current_action))
1233 }
1234 }
1235 }
1236
1237 if !self.persistence_state.in_progress() {
1238 if let Some(new_tip_num) = self.find_disk_reorg()? {
1239 self.remove_blocks(new_tip_num)
1240 } else if self.should_persist() {
1241 let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1242 self.persist_blocks(blocks_to_persist);
1243 }
1244 }
1245
1246 Ok(())
1247 }
1248
1249 fn on_engine_message(
1251 &mut self,
1252 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1253 ) -> Result<(), InsertBlockFatalError> {
1254 match msg {
1255 FromEngine::Event(event) => match event {
1256 FromOrchestrator::BackfillSyncStarted => {
1257 debug!(target: "engine::tree", "received backfill sync started event");
1258 self.backfill_sync_state = BackfillSyncState::Active;
1259 }
1260 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1261 self.on_backfill_sync_finished(ctrl)?;
1262 }
1263 },
1264 FromEngine::Request(request) => {
1265 match request {
1266 EngineApiRequest::InsertExecutedBlock(block) => {
1267 let block_num_hash = block.recovered_block().num_hash();
1268 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1269 return Ok(())
1271 }
1272
1273 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1274 let now = Instant::now();
1275
1276 if self.state.tree_state.canonical_block_hash() ==
1279 block.recovered_block().parent_hash()
1280 {
1281 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1282 self.canonical_in_memory_state.set_pending_block(block.clone());
1283 }
1284
1285 self.state.tree_state.insert_executed(block.clone());
1286 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1287 self.emit_event(EngineApiEvent::BeaconConsensus(
1288 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1289 ));
1290 }
1291 EngineApiRequest::Beacon(request) => {
1292 match request {
1293 BeaconEngineMessage::ForkchoiceUpdated {
1294 state,
1295 payload_attrs,
1296 tx,
1297 version,
1298 } => {
1299 let mut output =
1300 self.on_forkchoice_updated(state, payload_attrs, version);
1301
1302 if let Ok(res) = &mut output {
1303 self.state
1305 .forkchoice_state_tracker
1306 .set_latest(state, res.outcome.forkchoice_status());
1307
1308 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1310 state,
1311 res.outcome.forkchoice_status(),
1312 ));
1313
1314 self.on_maybe_tree_event(res.event.take())?;
1316 }
1317
1318 if let Err(err) =
1319 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1320 {
1321 self.metrics
1322 .engine
1323 .failed_forkchoice_updated_response_deliveries
1324 .increment(1);
1325 error!(target: "engine::tree", "Failed to send event: {err:?}");
1326 }
1327 }
1328 BeaconEngineMessage::NewPayload { payload, tx } => {
1329 let mut output = self.on_new_payload(payload);
1330
1331 let maybe_event =
1332 output.as_mut().ok().and_then(|out| out.event.take());
1333
1334 if let Err(err) =
1336 tx.send(output.map(|o| o.outcome).map_err(|e| {
1337 BeaconOnNewPayloadError::Internal(Box::new(e))
1338 }))
1339 {
1340 error!(target: "engine::tree", "Failed to send event: {err:?}");
1341 self.metrics
1342 .engine
1343 .failed_new_payload_response_deliveries
1344 .increment(1);
1345 }
1346
1347 self.on_maybe_tree_event(maybe_event)?;
1349 }
1350 }
1351 }
1352 }
1353 }
1354 FromEngine::DownloadedBlocks(blocks) => {
1355 if let Some(event) = self.on_downloaded(blocks)? {
1356 self.on_tree_event(event)?;
1357 }
1358 }
1359 }
1360 Ok(())
1361 }
1362
1363 fn on_backfill_sync_finished(
1377 &mut self,
1378 ctrl: ControlFlow,
1379 ) -> Result<(), InsertBlockFatalError> {
1380 debug!(target: "engine::tree", "received backfill sync finished event");
1381 self.backfill_sync_state = BackfillSyncState::Idle;
1382
1383 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1385 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1386 self.state.invalid_headers.insert(**bad_block);
1388
1389 Some(*target)
1391 } else {
1392 ctrl.block_number()
1394 };
1395
1396 let Some(backfill_height) = backfill_height else { return Ok(()) };
1398
1399 let Some(backfill_num_hash) = self
1405 .provider
1406 .block_hash(backfill_height)?
1407 .map(|hash| BlockNumHash { hash, number: backfill_height })
1408 else {
1409 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1410 return Ok(())
1411 };
1412
1413 if ctrl.is_unwind() {
1414 self.state.tree_state.reset(backfill_num_hash)
1417 } else {
1418 self.state.tree_state.remove_until(
1419 backfill_num_hash,
1420 self.persistence_state.last_persisted_block.hash,
1421 Some(backfill_num_hash),
1422 );
1423 }
1424
1425 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1426 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1427
1428 self.state.buffer.remove_old_blocks(backfill_height);
1430 self.canonical_in_memory_state.clear_state();
1433
1434 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1435 self.state.tree_state.set_canonical_head(new_head.num_hash());
1438 self.persistence_state.finish(new_head.hash(), new_head.number());
1439
1440 self.canonical_in_memory_state.set_canonical_head(new_head);
1442 }
1443
1444 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1447 else {
1448 return Ok(())
1449 };
1450 if sync_target_state.finalized_block_hash.is_zero() {
1451 return Ok(())
1453 }
1454 let newest_finalized = self
1456 .state
1457 .buffer
1458 .block(&sync_target_state.finalized_block_hash)
1459 .map(|block| block.number());
1460
1461 if let Some(backfill_target) =
1467 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1468 self.backfill_sync_target(progress, finalized_number, None)
1471 })
1472 {
1473 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1475 backfill_target.into(),
1476 )));
1477 return Ok(())
1478 };
1479
1480 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1482 }
1483
1484 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1488 if let Some(chain_update) = self.on_new_head(target)? {
1489 self.on_canonical_chain_update(chain_update);
1490 }
1491
1492 Ok(())
1493 }
1494
1495 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1497 if let Some(event) = event {
1498 self.on_tree_event(event)?;
1499 }
1500
1501 Ok(())
1502 }
1503
1504 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1508 match event {
1509 TreeEvent::TreeAction(action) => match action {
1510 TreeAction::MakeCanonical { sync_target_head } => {
1511 self.make_canonical(sync_target_head)?;
1512 }
1513 },
1514 TreeEvent::BackfillAction(action) => {
1515 self.emit_event(EngineApiEvent::BackfillAction(action));
1516 }
1517 TreeEvent::Download(action) => {
1518 self.emit_event(EngineApiEvent::Download(action));
1519 }
1520 }
1521
1522 Ok(())
1523 }
1524
1525 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1527 let event = event.into();
1528
1529 if event.is_backfill_action() {
1530 debug_assert_eq!(
1531 self.backfill_sync_state,
1532 BackfillSyncState::Idle,
1533 "backfill action should only be emitted when backfill is idle"
1534 );
1535
1536 if self.persistence_state.in_progress() {
1537 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1540 return
1541 }
1542
1543 self.backfill_sync_state = BackfillSyncState::Pending;
1544 self.metrics.engine.pipeline_runs.increment(1);
1545 debug!(target: "engine::tree", "emitting backfill action event");
1546 }
1547
1548 let _ = self.outgoing.send(event).inspect_err(
1549 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1550 );
1551 }
1552
1553 pub const fn should_persist(&self) -> bool {
1557 if !self.backfill_sync_state.is_idle() {
1558 return false
1560 }
1561
1562 let min_block = self.persistence_state.last_persisted_block.number;
1563 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1564 self.config.persistence_threshold()
1565 }
1566
1567 fn get_canonical_blocks_to_persist(
1576 &mut self,
1577 ) -> Result<Vec<ExecutedBlockWithTrieUpdates<N>>, AdvancePersistenceError> {
1578 debug_assert!(!self.persistence_state.in_progress());
1581
1582 let mut blocks_to_persist = Vec::new();
1583 let mut current_hash = self.state.tree_state.canonical_block_hash();
1584 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1585
1586 let canonical_head_number = self.state.tree_state.canonical_block_number();
1587
1588 let target_number =
1589 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1590
1591 debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1592 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1593 if block.recovered_block().number() <= last_persisted_number {
1594 break;
1595 }
1596
1597 if block.recovered_block().number() <= target_number {
1598 blocks_to_persist.push(block.clone());
1599 }
1600
1601 current_hash = block.recovered_block().parent_hash();
1602 }
1603
1604 blocks_to_persist.reverse();
1606
1607 for block in &mut blocks_to_persist {
1609 if block.trie.is_present() {
1610 continue
1611 }
1612
1613 debug!(
1614 target: "engine::tree",
1615 block = ?block.recovered_block().num_hash(),
1616 "Calculating trie updates before persisting"
1617 );
1618
1619 let provider = self
1620 .state_provider_builder(block.recovered_block().parent_hash())?
1621 .ok_or(AdvancePersistenceError::MissingAncestor(
1622 block.recovered_block().parent_hash(),
1623 ))?
1624 .build()?;
1625
1626 let mut trie_input = self.compute_trie_input(
1627 self.persisting_kind_for(block.recovered_block.block_with_parent()),
1628 self.provider.database_provider_ro()?,
1629 block.recovered_block().parent_hash(),
1630 None,
1631 )?;
1632 trie_input.append_ref(block.hashed_state());
1634 let (_root, updates) = provider.state_root_from_nodes_with_updates(trie_input)?;
1635 debug_assert_eq!(_root, block.recovered_block().state_root());
1636
1637 let trie_updates = Arc::new(updates);
1639 let tree_state_block = self
1640 .state
1641 .tree_state
1642 .blocks_by_hash
1643 .get_mut(&block.recovered_block().hash())
1644 .expect("blocks to persist are constructed from tree state blocks");
1645 tree_state_block.trie.set_present(trie_updates.clone());
1646 block.trie.set_present(trie_updates);
1647 }
1648
1649 Ok(blocks_to_persist)
1650 }
1651
1652 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1660 if let Some(remove_above) = self.find_disk_reorg()? {
1663 self.remove_blocks(remove_above);
1664 return Ok(())
1665 }
1666
1667 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1668 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1669 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1670 number: self.persistence_state.last_persisted_block.number,
1671 hash: self.persistence_state.last_persisted_block.hash,
1672 });
1673 Ok(())
1674 }
1675
1676 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1684 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1685 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1687 return Ok(Some(block.block.clone()))
1688 }
1689
1690 let (block, senders) = self
1691 .provider
1692 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1693 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1694 .split_sealed();
1695 let execution_output = self
1696 .provider
1697 .get_state(block.header().number())?
1698 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1699 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1700
1701 Ok(Some(ExecutedBlock {
1702 recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1703 execution_output: Arc::new(execution_output),
1704 hashed_state: Arc::new(hashed_state),
1705 }))
1706 }
1707
1708 fn sealed_header_by_hash(
1710 &self,
1711 hash: B256,
1712 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1713 let header = self.state.tree_state.sealed_header_by_hash(&hash);
1715
1716 if header.is_some() {
1717 Ok(header)
1718 } else {
1719 self.provider.sealed_header_by_hash(hash)
1720 }
1721 }
1722
1723 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1730 self.state
1731 .buffer
1732 .lowest_ancestor(&hash)
1733 .map(|block| block.parent_hash())
1734 .unwrap_or_else(|| hash)
1735 }
1736
1737 fn latest_valid_hash_for_invalid_payload(
1748 &mut self,
1749 parent_hash: B256,
1750 ) -> ProviderResult<Option<B256>> {
1751 if self.sealed_header_by_hash(parent_hash)?.is_some() {
1753 return Ok(Some(parent_hash))
1754 }
1755
1756 let mut current_hash = parent_hash;
1759 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1760 while let Some(block_with_parent) = current_block {
1761 current_hash = block_with_parent.parent;
1762 current_block = self.state.invalid_headers.get(¤t_hash);
1763
1764 if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1767 return Ok(Some(current_hash))
1768 }
1769 }
1770 Ok(None)
1771 }
1772
1773 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1777 if let Some(parent) = self.sealed_header_by_hash(parent_hash)? {
1780 if !parent.difficulty().is_zero() {
1781 parent_hash = B256::ZERO;
1782 }
1783 }
1784
1785 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1786 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1787 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1788 })
1789 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1790 }
1791
1792 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1796 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1797 return target.head_block_hash == block_hash
1798 }
1799 false
1800 }
1801
1802 fn check_invalid_ancestor_with_head(
1808 &mut self,
1809 check: B256,
1810 head: &SealedBlock<N::Block>,
1811 ) -> ProviderResult<Option<PayloadStatus>> {
1812 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1814
1815 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1816 }
1817
1818 fn on_invalid_new_payload(
1820 &mut self,
1821 head: SealedBlock<N::Block>,
1822 invalid: BlockWithParent,
1823 ) -> ProviderResult<PayloadStatus> {
1824 let status = self.prepare_invalid_response(invalid.parent)?;
1826
1827 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1829 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1830
1831 Ok(status)
1832 }
1833
1834 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1837 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1839 Ok(Some(self.prepare_invalid_response(header.parent)?))
1841 }
1842
1843 fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1846 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1847 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1848 return Err(e)
1849 }
1850
1851 if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1852 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1853 return Err(e)
1854 }
1855
1856 Ok(())
1857 }
1858
1859 #[instrument(level = "trace", skip(self), target = "engine::tree")]
1861 fn try_connect_buffered_blocks(
1862 &mut self,
1863 parent: BlockNumHash,
1864 ) -> Result<(), InsertBlockFatalError> {
1865 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1866
1867 if blocks.is_empty() {
1868 return Ok(())
1870 }
1871
1872 let now = Instant::now();
1873 let block_count = blocks.len();
1874 for child in blocks {
1875 let child_num_hash = child.num_hash();
1876 match self.insert_block(child) {
1877 Ok(res) => {
1878 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1879 if self.is_sync_target_head(child_num_hash.hash) &&
1880 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1881 {
1882 self.make_canonical(child_num_hash.hash)?;
1883 }
1884 }
1885 Err(err) => {
1886 if let InsertPayloadError::Block(err) = err {
1887 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1888 if let Err(fatal) = self.on_insert_block_error(err) {
1889 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1890 return Err(fatal)
1891 }
1892 }
1893 }
1894 }
1895 }
1896
1897 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1898 Ok(())
1899 }
1900
1901 fn buffer_block(
1903 &mut self,
1904 block: RecoveredBlock<N::Block>,
1905 ) -> Result<(), InsertBlockError<N::Block>> {
1906 if let Err(err) = self.validate_block(&block) {
1907 return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1908 }
1909 self.state.buffer.insert_block(block);
1910 Ok(())
1911 }
1912
1913 #[inline]
1918 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1919 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1920 }
1921
1922 #[inline]
1925 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1926 if block > local_tip {
1927 Some(block - local_tip)
1928 } else {
1929 None
1930 }
1931 }
1932
1933 fn backfill_sync_target(
1940 &self,
1941 canonical_tip_num: u64,
1942 target_block_number: u64,
1943 downloaded_block: Option<BlockNumHash>,
1944 ) -> Option<B256> {
1945 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1946
1947 let mut exceeds_backfill_threshold = if let Some(buffered_finalized) = sync_target_state
1949 .as_ref()
1950 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1951 {
1952 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number())
1955 } else {
1956 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
1958 };
1959
1960 if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1963 if downloaded_block.hash == state.finalized_block_hash {
1964 exceeds_backfill_threshold =
1966 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1967 }
1968 }
1969
1970 if exceeds_backfill_threshold {
1972 if let Some(state) = sync_target_state {
1973 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
1975 Err(err) => {
1976 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
1977 }
1978 Ok(None) => {
1979 if !state.finalized_block_hash.is_zero() {
1981 return Some(state.finalized_block_hash)
1984 }
1985
1986 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
1999 return Some(state.head_block_hash)
2000 }
2001 Ok(Some(_)) => {
2002 }
2004 }
2005 }
2006 }
2007
2008 None
2009 }
2010
2011 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2014 let mut canonical = self.state.tree_state.current_canonical_head;
2015 let mut persisted = self.persistence_state.last_persisted_block;
2016
2017 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2018 Ok(self
2019 .sealed_header_by_hash(num_hash.hash)?
2020 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2021 .parent_num_hash())
2022 };
2023
2024 while canonical.number > persisted.number {
2027 canonical = parent_num_hash(canonical)?;
2028 }
2029
2030 if canonical == persisted {
2032 return Ok(None);
2033 }
2034
2035 while persisted.number > canonical.number {
2041 persisted = parent_num_hash(persisted)?;
2042 }
2043
2044 debug_assert_eq!(persisted.number, canonical.number);
2045
2046 while persisted.hash != canonical.hash {
2048 canonical = parent_num_hash(canonical)?;
2049 persisted = parent_num_hash(persisted)?;
2050 }
2051
2052 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2053
2054 Ok(Some(persisted.number))
2055 }
2056
2057 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2061 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2062 let start = Instant::now();
2063
2064 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2066
2067 let tip = chain_update.tip().clone_sealed_header();
2068 let notification = chain_update.to_chain_notification();
2069
2070 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2072 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2073 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2074 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2075
2076 self.update_reorg_metrics(old.len());
2077 self.reinsert_reorged_blocks(new.clone());
2078 let old = old
2081 .iter()
2082 .filter_map(|block| {
2083 let trie = self
2084 .state
2085 .tree_state
2086 .persisted_trie_updates
2087 .get(&block.recovered_block.hash())?
2088 .1
2089 .clone();
2090 Some(ExecutedBlockWithTrieUpdates {
2091 block: block.clone(),
2092 trie: ExecutedTrieUpdates::Present(trie),
2093 })
2094 })
2095 .collect::<Vec<_>>();
2096 self.reinsert_reorged_blocks(old);
2097 }
2098
2099 self.canonical_in_memory_state.update_chain(chain_update);
2101 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2102
2103 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2105
2106 self.canonical_in_memory_state.notify_canon_state(notification);
2108
2109 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2111 Box::new(tip),
2112 start.elapsed(),
2113 ));
2114 }
2115
2116 fn update_reorg_metrics(&self, old_chain_length: usize) {
2118 self.metrics.tree.reorgs.increment(1);
2119 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2120 }
2121
2122 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2124 for block in new_chain {
2125 if self
2126 .state
2127 .tree_state
2128 .executed_block_by_hash(block.recovered_block().hash())
2129 .is_none()
2130 {
2131 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2132 self.state.tree_state.insert_executed(block);
2133 }
2134 }
2135 }
2136
2137 fn on_disconnected_downloaded_block(
2142 &self,
2143 downloaded_block: BlockNumHash,
2144 missing_parent: BlockNumHash,
2145 head: BlockNumHash,
2146 ) -> Option<TreeEvent> {
2147 if let Some(target) =
2149 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2150 {
2151 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2152 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2153 }
2154
2155 let request = if let Some(distance) =
2165 self.distance_from_local_tip(head.number, missing_parent.number)
2166 {
2167 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2168 DownloadRequest::BlockRange(missing_parent.hash, distance)
2169 } else {
2170 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2171 DownloadRequest::single_block(missing_parent.hash)
2174 };
2175
2176 Some(TreeEvent::Download(request))
2177 }
2178
2179 #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2185 fn on_downloaded_block(
2186 &mut self,
2187 block: RecoveredBlock<N::Block>,
2188 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2189 let block_num_hash = block.num_hash();
2190 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2191 if self
2192 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2193 .is_some()
2194 {
2195 return Ok(None)
2196 }
2197
2198 if !self.backfill_sync_state.is_idle() {
2199 return Ok(None)
2200 }
2201
2202 match self.insert_block(block) {
2204 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2205 if self.is_sync_target_head(block_num_hash.hash) {
2206 trace!(target: "engine::tree", "appended downloaded sync target block");
2207
2208 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2211 sync_target_head: block_num_hash.hash,
2212 })))
2213 }
2214 trace!(target: "engine::tree", "appended downloaded block");
2215 self.try_connect_buffered_blocks(block_num_hash)?;
2216 }
2217 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2218 return Ok(self.on_disconnected_downloaded_block(
2221 block_num_hash,
2222 missing_ancestor,
2223 head,
2224 ))
2225 }
2226 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2227 trace!(target: "engine::tree", "downloaded block already executed");
2228 }
2229 Err(err) => {
2230 if let InsertPayloadError::Block(err) = err {
2231 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2232 if let Err(fatal) = self.on_insert_block_error(err) {
2233 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2234 return Err(fatal)
2235 }
2236 }
2237 }
2238 }
2239 Ok(None)
2240 }
2241
2242 fn insert_payload(
2243 &mut self,
2244 payload: T::ExecutionData,
2245 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2246 self.insert_block_or_payload(
2247 payload.block_with_parent(),
2248 payload,
2249 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2250 |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2251 )
2252 }
2253
2254 fn insert_block(
2255 &mut self,
2256 block: RecoveredBlock<N::Block>,
2257 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2258 self.insert_block_or_payload(
2259 block.block_with_parent(),
2260 block,
2261 |validator, block, ctx| validator.validate_block(block, ctx),
2262 |_, block| Ok(block),
2263 )
2264 }
2265
2266 fn insert_block_or_payload<Input, Err>(
2267 &mut self,
2268 block_id: BlockWithParent,
2269 input: Input,
2270 execute: impl FnOnce(
2271 &mut V,
2272 Input,
2273 TreeCtx<'_, N>,
2274 ) -> Result<ExecutedBlockWithTrieUpdates<N>, Err>,
2275 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2276 ) -> Result<InsertPayloadOk, Err>
2277 where
2278 Err: From<InsertBlockError<N::Block>>,
2279 {
2280 let block_insert_start = Instant::now();
2281 let block_num_hash = block_id.block;
2282 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2283
2284 match self.sealed_header_by_hash(block_num_hash.hash) {
2285 Err(err) => {
2286 let block = convert_to_block(self, input)?;
2287 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2288 }
2289 Ok(Some(_)) => {
2290 convert_to_block(self, input)?;
2293 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2294 }
2295 _ => {}
2296 };
2297
2298 match self.state_provider_builder(block_id.parent) {
2300 Err(err) => {
2301 let block = convert_to_block(self, input)?;
2302 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2303 }
2304 Ok(None) => {
2305 let block = convert_to_block(self, input)?;
2306
2307 let missing_ancestor = self
2310 .state
2311 .buffer
2312 .lowest_ancestor(&block.parent_hash())
2313 .map(|block| block.parent_num_hash())
2314 .unwrap_or_else(|| block.parent_num_hash());
2315
2316 self.state.buffer.insert_block(block);
2317
2318 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2319 head: self.state.tree_state.current_canonical_head,
2320 missing_ancestor,
2321 }))
2322 }
2323 Ok(Some(_)) => {}
2324 }
2325
2326 let is_fork = match self.is_fork(block_id) {
2328 Err(err) => {
2329 let block = convert_to_block(self, input)?;
2330 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2331 }
2332 Ok(is_fork) => is_fork,
2333 };
2334
2335 let ctx = TreeCtx::new(
2336 &mut self.state,
2337 &self.persistence_state,
2338 &self.canonical_in_memory_state,
2339 is_fork,
2340 );
2341
2342 let start = Instant::now();
2343
2344 let executed = execute(&mut self.payload_validator, input, ctx)?;
2345
2346 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2348 {
2349 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2350 self.canonical_in_memory_state.set_pending_block(executed.clone());
2351 }
2352
2353 self.state.tree_state.insert_executed(executed.clone());
2354 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2355
2356 let elapsed = start.elapsed();
2358 let engine_event = if is_fork {
2359 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2360 } else {
2361 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2362 };
2363 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2364
2365 self.metrics
2366 .engine
2367 .block_insert_total_duration
2368 .record(block_insert_start.elapsed().as_secs_f64());
2369 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2370 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2371 }
2372
2373 fn compute_trie_input<TP: DBProvider + BlockNumReader>(
2389 &self,
2390 persisting_kind: PersistingKind,
2391 provider: TP,
2392 parent_hash: B256,
2393 allocated_trie_input: Option<TrieInput>,
2394 ) -> ProviderResult<TrieInput> {
2395 let mut input = allocated_trie_input.unwrap_or_default();
2397
2398 let best_block_number = provider.best_block_number()?;
2399
2400 let (mut historical, mut blocks) = self
2401 .state
2402 .tree_state
2403 .blocks_by_hash(parent_hash)
2404 .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2405
2406 if persisting_kind.is_descendant() {
2409 while let Some(block) = blocks.last() {
2411 let recovered_block = block.recovered_block();
2412 if recovered_block.number() <= best_block_number {
2413 blocks.pop();
2416 } else {
2417 break
2420 }
2421 }
2422
2423 historical = if let Some(block) = blocks.last() {
2424 (block.recovered_block().number() - 1).into()
2427 } else {
2428 parent_hash.into()
2430 };
2431 }
2432
2433 if blocks.is_empty() {
2434 debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2435 } else {
2436 debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2437 }
2438
2439 let block_number = provider
2441 .convert_hash_or_number(historical)?
2442 .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2443
2444 let revert_state = if block_number == best_block_number {
2446 debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2449 HashedPostState::default()
2450 } else {
2451 let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
2452 provider.tx_ref(),
2453 block_number + 1,
2454 )
2455 .map_err(ProviderError::from)?;
2456 debug!(
2457 target: "engine::tree",
2458 block_number,
2459 best_block_number,
2460 accounts = revert_state.accounts.len(),
2461 storages = revert_state.storages.len(),
2462 "Non-empty revert state"
2463 );
2464 revert_state
2465 };
2466 input.append(revert_state);
2467
2468 input.extend_with_blocks(
2470 blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
2471 );
2472
2473 Ok(input)
2474 }
2475
2476 fn on_insert_block_error(
2482 &mut self,
2483 error: InsertBlockError<N::Block>,
2484 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2485 let (block, error) = error.split();
2486
2487 let validation_err = error.ensure_validation_error()?;
2490
2491 warn!(
2495 target: "engine::tree",
2496 invalid_hash=%block.hash(),
2497 invalid_number=block.number(),
2498 %validation_err,
2499 "Invalid block error on new payload",
2500 );
2501 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2502
2503 self.state.invalid_headers.insert(block.block_with_parent());
2505 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2506 Box::new(block),
2507 )));
2508 Ok(PayloadStatus::new(
2509 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2510 latest_valid_hash,
2511 ))
2512 }
2513
2514 fn on_new_payload_error(
2516 &mut self,
2517 error: NewPayloadError,
2518 parent_hash: B256,
2519 ) -> ProviderResult<PayloadStatus> {
2520 error!(target: "engine::tree", %error, "Invalid payload");
2521 let latest_valid_hash =
2524 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2525 None
2529 } else {
2530 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2531 };
2532
2533 let status = PayloadStatusEnum::from(error);
2534 Ok(PayloadStatus::new(status, latest_valid_hash))
2535 }
2536
2537 pub fn find_canonical_header(
2539 &self,
2540 hash: B256,
2541 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2542 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2543
2544 if canonical.is_none() {
2545 canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2546 }
2547
2548 Ok(canonical)
2549 }
2550
2551 fn update_finalized_block(
2553 &self,
2554 finalized_block_hash: B256,
2555 ) -> Result<(), OnForkChoiceUpdated> {
2556 if finalized_block_hash.is_zero() {
2557 return Ok(())
2558 }
2559
2560 match self.find_canonical_header(finalized_block_hash) {
2561 Ok(None) => {
2562 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2563 return Err(OnForkChoiceUpdated::invalid_state())
2565 }
2566 Ok(Some(finalized)) => {
2567 if Some(finalized.num_hash()) !=
2568 self.canonical_in_memory_state.get_finalized_num_hash()
2569 {
2570 let _ = self.persistence.save_finalized_block_number(finalized.number());
2573 self.canonical_in_memory_state.set_finalized(finalized);
2574 }
2575 }
2576 Err(err) => {
2577 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2578 }
2579 }
2580
2581 Ok(())
2582 }
2583
2584 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2586 if safe_block_hash.is_zero() {
2587 return Ok(())
2588 }
2589
2590 match self.find_canonical_header(safe_block_hash) {
2591 Ok(None) => {
2592 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2593 return Err(OnForkChoiceUpdated::invalid_state())
2595 }
2596 Ok(Some(safe)) => {
2597 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2598 let _ = self.persistence.save_safe_block_number(safe.number());
2601 self.canonical_in_memory_state.set_safe(safe);
2602 }
2603 }
2604 Err(err) => {
2605 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2606 }
2607 }
2608
2609 Ok(())
2610 }
2611
2612 fn ensure_consistent_forkchoice_state(
2621 &self,
2622 state: ForkchoiceState,
2623 ) -> Result<(), OnForkChoiceUpdated> {
2624 self.update_finalized_block(state.finalized_block_hash)?;
2630
2631 self.update_safe_block(state.safe_block_hash)
2637 }
2638
2639 fn pre_validate_forkchoice_update(
2644 &mut self,
2645 state: ForkchoiceState,
2646 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2647 if state.head_block_hash.is_zero() {
2648 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2649 }
2650
2651 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2654 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2655 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2656 }
2657
2658 if !self.backfill_sync_state.is_idle() {
2659 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2662 return Ok(Some(OnForkChoiceUpdated::syncing()))
2663 }
2664
2665 Ok(None)
2666 }
2667
2668 fn process_payload_attributes(
2673 &self,
2674 attrs: T::PayloadAttributes,
2675 head: &N::BlockHeader,
2676 state: ForkchoiceState,
2677 version: EngineApiMessageVersion,
2678 ) -> OnForkChoiceUpdated {
2679 if let Err(err) =
2680 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2681 {
2682 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2683 return OnForkChoiceUpdated::invalid_payload_attributes()
2684 }
2685
2686 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2691 state.head_block_hash,
2692 attrs,
2693 version as u8,
2694 ) {
2695 Ok(attributes) => {
2696 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2699
2700 OnForkChoiceUpdated::updated_with_pending_payload_id(
2712 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2713 pending_payload_id,
2714 )
2715 }
2716 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2717 }
2718 }
2719
2720 pub(crate) fn remove_before(
2727 &mut self,
2728 upper_bound: BlockNumHash,
2729 finalized_hash: Option<B256>,
2730 ) -> ProviderResult<()> {
2731 let num = if let Some(hash) = finalized_hash {
2734 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2735 } else {
2736 None
2737 };
2738
2739 self.state.tree_state.remove_until(
2740 upper_bound,
2741 self.persistence_state.last_persisted_block.hash,
2742 num,
2743 );
2744 Ok(())
2745 }
2746
2747 pub fn state_provider_builder(
2752 &self,
2753 hash: B256,
2754 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2755 where
2756 P: BlockReader + StateProviderFactory + StateReader + Clone,
2757 {
2758 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2759 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2760 return Ok(Some(StateProviderBuilder::new(
2762 self.provider.clone(),
2763 historical,
2764 Some(blocks),
2765 )))
2766 }
2767
2768 if let Some(header) = self.provider.header(&hash)? {
2770 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2771 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2774 }
2775
2776 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2777 Ok(None)
2778 }
2779}
2780
2781#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2787pub enum BlockStatus {
2788 Valid,
2790 Disconnected {
2792 head: BlockNumHash,
2794 missing_ancestor: BlockNumHash,
2796 },
2797}
2798
2799#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2804pub enum InsertPayloadOk {
2805 AlreadySeen(BlockStatus),
2807 Inserted(BlockStatus),
2809}
2810
2811#[derive(Debug, Clone, Copy)]
2813pub enum PersistingKind {
2814 NotPersisting,
2816 PersistingNotDescendant,
2818 PersistingDescendant,
2820}
2821
2822impl PersistingKind {
2823 pub const fn can_run_parallel_state_root(&self) -> bool {
2828 matches!(self, Self::NotPersisting | Self::PersistingDescendant)
2829 }
2830
2831 pub const fn is_descendant(&self) -> bool {
2834 matches!(self, Self::PersistingDescendant)
2835 }
2836}