1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use persistence_state::CurrentPersistenceAction;
17use reth_chain_state::{
18 CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
19 MemoryOverlayStateProvider, NewCanonicalChain,
20};
21use reth_consensus::{Consensus, FullConsensus};
22use reth_engine_primitives::{
23 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
24 ForkchoiceStateTracker, OnForkChoiceUpdated,
25};
26use reth_errors::{ConsensusError, ProviderResult};
27use reth_evm::{ConfigureEvm, OnStateHook};
28use reth_payload_builder::PayloadBuilderHandle;
29use reth_payload_primitives::{
30 BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
31};
32use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
33use reth_provider::{
34 providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
35 HashedPostStateProvider, ProviderError, StateProviderBox, StateProviderFactory, StateReader,
36 StateRootProvider, TransactionVariant,
37};
38use reth_revm::database::StateProviderDatabase;
39use reth_stages_api::ControlFlow;
40use reth_trie::{HashedPostState, TrieInput};
41use reth_trie_db::DatabaseHashedPostState;
42use revm::state::EvmState;
43use state::TreeState;
44use std::{
45 fmt::Debug,
46 sync::{
47 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
48 Arc,
49 },
50 time::Instant,
51};
52use tokio::sync::{
53 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
54 oneshot::{self, error::TryRecvError},
55};
56use tracing::*;
57
58mod block_buffer;
59mod cached_state;
60pub mod error;
61mod instrumented_state;
62mod invalid_headers;
63mod metrics;
64mod payload_processor;
65pub mod payload_validator;
66mod persistence_state;
67pub mod precompile_cache;
68#[cfg(test)]
69mod tests;
70#[expect(unused)]
72mod trie_updates;
73
74use crate::tree::error::AdvancePersistenceError;
75pub use block_buffer::BlockBuffer;
76pub use invalid_headers::InvalidHeaderCache;
77pub use payload_processor::*;
78pub use payload_validator::{BasicEngineValidator, EngineValidator};
79pub use persistence_state::PersistenceState;
80pub use reth_engine_primitives::TreeConfig;
81use reth_trie::KeccakKeyHasher;
82
83pub mod state;
84
85pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
95
96#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99 provider_factory: P,
101 historical: B256,
103 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108 pub const fn new(
111 provider_factory: P,
112 historical: B256,
113 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
114 ) -> Self {
115 Self { provider_factory, historical, overlay }
116 }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121 P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123 pub fn build(&self) -> ProviderResult<StateProviderBox> {
125 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126 if let Some(overlay) = self.overlay.clone() {
127 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128 }
129 Ok(provider)
130 }
131}
132
133#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138 tree_state: TreeState<N>,
140 forkchoice_state_tracker: ForkchoiceStateTracker,
142 buffer: BlockBuffer<N::Block>,
144 invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150 fn new(
151 block_buffer_limit: u32,
152 max_invalid_header_cache_length: u32,
153 canonical_block: BlockNumHash,
154 engine_kind: EngineApiKind,
155 ) -> Self {
156 Self {
157 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158 buffer: BlockBuffer::new(block_buffer_limit),
159 tree_state: TreeState::new(canonical_block, engine_kind),
160 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161 }
162 }
163}
164
165#[derive(Debug)]
167pub struct TreeOutcome<T> {
168 pub outcome: T,
170 pub event: Option<TreeEvent>,
172}
173
174impl<T> TreeOutcome<T> {
175 pub const fn new(outcome: T) -> Self {
177 Self { outcome, event: None }
178 }
179
180 pub fn with_event(mut self, event: TreeEvent) -> Self {
182 self.event = Some(event);
183 self
184 }
185}
186
187#[derive(Debug)]
189pub enum TreeEvent {
190 TreeAction(TreeAction),
192 BackfillAction(BackfillAction),
194 Download(DownloadRequest),
196}
197
198impl TreeEvent {
199 const fn is_backfill_action(&self) -> bool {
201 matches!(self, Self::BackfillAction(_))
202 }
203}
204
205#[derive(Debug)]
207pub enum TreeAction {
208 MakeCanonical {
210 sync_target_head: B256,
212 },
213}
214
215struct MeteredStateHook {
217 metrics: reth_evm::metrics::ExecutorMetrics,
218 inner_hook: Box<dyn OnStateHook>,
219}
220
221impl OnStateHook for MeteredStateHook {
222 fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
223 let accounts = state.keys().len();
225 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
226 let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
227
228 self.metrics.accounts_loaded_histogram.record(accounts as f64);
229 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
230 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
231
232 self.inner_hook.on_state(source, state);
234 }
235}
236
237pub struct EngineApiTreeHandler<N, P, T, V, C>
242where
243 N: NodePrimitives,
244 T: PayloadTypes,
245 C: ConfigureEvm<Primitives = N> + 'static,
246{
247 provider: P,
248 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
249 payload_validator: V,
250 state: EngineApiTreeState<N>,
252 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
261 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
263 outgoing: UnboundedSender<EngineApiEvent<N>>,
265 persistence: PersistenceHandle<N>,
267 persistence_state: PersistenceState,
269 backfill_sync_state: BackfillSyncState,
271 canonical_in_memory_state: CanonicalInMemoryState<N>,
274 payload_builder: PayloadBuilderHandle<T>,
277 config: TreeConfig,
279 metrics: EngineApiMetrics,
281 engine_kind: EngineApiKind,
283 evm_config: C,
285}
286
287impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
288 for EngineApiTreeHandler<N, P, T, V, C>
289where
290 N: NodePrimitives,
291 C: Debug + ConfigureEvm<Primitives = N>,
292{
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 f.debug_struct("EngineApiTreeHandler")
295 .field("provider", &self.provider)
296 .field("consensus", &self.consensus)
297 .field("payload_validator", &self.payload_validator)
298 .field("state", &self.state)
299 .field("incoming_tx", &self.incoming_tx)
300 .field("persistence", &self.persistence)
301 .field("persistence_state", &self.persistence_state)
302 .field("backfill_sync_state", &self.backfill_sync_state)
303 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
304 .field("payload_builder", &self.payload_builder)
305 .field("config", &self.config)
306 .field("metrics", &self.metrics)
307 .field("engine_kind", &self.engine_kind)
308 .field("evm_config", &self.evm_config)
309 .finish()
310 }
311}
312
313impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
314where
315 N: NodePrimitives,
316 P: DatabaseProviderFactory
317 + BlockReader<Block = N::Block, Header = N::BlockHeader>
318 + StateProviderFactory
319 + StateReader<Receipt = N::Receipt>
320 + HashedPostStateProvider
321 + Clone
322 + 'static,
323 <P as DatabaseProviderFactory>::Provider:
324 BlockReader<Block = N::Block, Header = N::BlockHeader>,
325 C: ConfigureEvm<Primitives = N> + 'static,
326 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
327 V: EngineValidator<T>,
328{
329 #[expect(clippy::too_many_arguments)]
331 pub fn new(
332 provider: P,
333 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
334 payload_validator: V,
335 outgoing: UnboundedSender<EngineApiEvent<N>>,
336 state: EngineApiTreeState<N>,
337 canonical_in_memory_state: CanonicalInMemoryState<N>,
338 persistence: PersistenceHandle<N>,
339 persistence_state: PersistenceState,
340 payload_builder: PayloadBuilderHandle<T>,
341 config: TreeConfig,
342 engine_kind: EngineApiKind,
343 evm_config: C,
344 ) -> Self {
345 let (incoming_tx, incoming) = std::sync::mpsc::channel();
346
347 Self {
348 provider,
349 consensus,
350 payload_validator,
351 incoming,
352 outgoing,
353 persistence,
354 persistence_state,
355 backfill_sync_state: BackfillSyncState::Idle,
356 state,
357 canonical_in_memory_state,
358 payload_builder,
359 config,
360 metrics: Default::default(),
361 incoming_tx,
362 engine_kind,
363 evm_config,
364 }
365 }
366
367 #[expect(clippy::complexity)]
373 pub fn spawn_new(
374 provider: P,
375 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
376 payload_validator: V,
377 persistence: PersistenceHandle<N>,
378 payload_builder: PayloadBuilderHandle<T>,
379 canonical_in_memory_state: CanonicalInMemoryState<N>,
380 config: TreeConfig,
381 kind: EngineApiKind,
382 evm_config: C,
383 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
384 {
385 let best_block_number = provider.best_block_number().unwrap_or(0);
386 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
387
388 let persistence_state = PersistenceState {
389 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
390 rx: None,
391 };
392
393 let (tx, outgoing) = unbounded_channel();
394 let state = EngineApiTreeState::new(
395 config.block_buffer_limit(),
396 config.max_invalid_header_cache_length(),
397 header.num_hash(),
398 kind,
399 );
400
401 let task = Self::new(
402 provider,
403 consensus,
404 payload_validator,
405 tx,
406 state,
407 canonical_in_memory_state,
408 persistence,
409 persistence_state,
410 payload_builder,
411 config,
412 kind,
413 evm_config,
414 );
415 let incoming = task.incoming_tx.clone();
416 std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
417 (incoming, outgoing)
418 }
419
420 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
422 self.incoming_tx.clone()
423 }
424
425 pub fn run(mut self) {
429 loop {
430 match self.try_recv_engine_message() {
431 Ok(Some(msg)) => {
432 debug!(target: "engine::tree", %msg, "received new engine message");
433 if let Err(fatal) = self.on_engine_message(msg) {
434 error!(target: "engine::tree", %fatal, "insert block fatal error");
435 return
436 }
437 }
438 Ok(None) => {
439 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440 }
441 Err(_err) => {
442 error!(target: "engine::tree", "Engine channel disconnected");
443 return
444 }
445 }
446
447 if let Err(err) = self.advance_persistence() {
448 error!(target: "engine::tree", %err, "Advancing persistence failed");
449 return
450 }
451 }
452 }
453
454 fn on_downloaded(
460 &mut self,
461 mut blocks: Vec<RecoveredBlock<N::Block>>,
462 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463 if blocks.is_empty() {
464 return Ok(None)
466 }
467
468 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470 for block in blocks.drain(..batch) {
471 if let Some(event) = self.on_downloaded_block(block)? {
472 let needs_backfill = event.is_backfill_action();
473 self.on_tree_event(event)?;
474 if needs_backfill {
475 return Ok(None)
477 }
478 }
479 }
480
481 if !blocks.is_empty() {
483 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484 }
485
486 Ok(None)
487 }
488
489 #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
504 fn on_new_payload(
505 &mut self,
506 payload: T::ExecutionData,
507 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
508 trace!(target: "engine::tree", "invoked new payload");
509 self.metrics.engine.new_payload_messages.increment(1);
510
511 let start = Instant::now();
513
514 let num_hash = payload.num_hash();
541 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
542 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
543
544 let block_hash = num_hash.hash;
545
546 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
548 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
549 return Ok(TreeOutcome::new(status));
550 }
551
552 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
554
555 let status = if self.backfill_sync_state.is_idle() {
556 self.try_insert_payload(payload)?
557 } else {
558 self.try_buffer_payload(payload)?
559 };
560
561 let mut outcome = TreeOutcome::new(status);
562 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
564 if self.state.tree_state.canonical_block_hash() != block_hash {
566 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
567 sync_target_head: block_hash,
568 }));
569 }
570 }
571
572 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
574
575 Ok(outcome)
576 }
577
578 fn try_insert_payload(
585 &mut self,
586 payload: T::ExecutionData,
587 ) -> Result<PayloadStatus, InsertBlockFatalError> {
588 let block_hash = payload.block_hash();
589 let num_hash = payload.num_hash();
590 let parent_hash = payload.parent_hash();
591 let mut latest_valid_hash = None;
592
593 match self.insert_payload(payload) {
594 Ok(status) => {
595 let status = match status {
596 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
597 latest_valid_hash = Some(block_hash);
598 self.try_connect_buffered_blocks(num_hash)?;
599 PayloadStatusEnum::Valid
600 }
601 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
602 latest_valid_hash = Some(block_hash);
603 PayloadStatusEnum::Valid
604 }
605 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
606 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
607 PayloadStatusEnum::Syncing
609 }
610 };
611
612 Ok(PayloadStatus::new(status, latest_valid_hash))
613 }
614 Err(error) => match error {
615 InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
616 InsertPayloadError::Payload(error) => {
617 Ok(self.on_new_payload_error(error, parent_hash)?)
618 }
619 },
620 }
621 }
622
623 fn try_buffer_payload(
632 &mut self,
633 payload: T::ExecutionData,
634 ) -> Result<PayloadStatus, InsertBlockFatalError> {
635 let parent_hash = payload.parent_hash();
636
637 match self.payload_validator.ensure_well_formed_payload(payload) {
638 Ok(block) => {
640 if let Err(error) = self.buffer_block(block) {
641 Ok(self.on_insert_block_error(error)?)
642 } else {
643 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
644 }
645 }
646 Err(error) => Ok(self.on_new_payload_error(error, parent_hash)?),
647 }
648 }
649
650 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
657 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
659 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
660 self.metrics.engine.executed_new_block_cache_miss.increment(1);
661 return Ok(None)
662 };
663
664 let new_head_number = new_head_block.recovered_block().number();
665 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
666
667 let mut new_chain = vec![new_head_block.clone()];
668 let mut current_hash = new_head_block.recovered_block().parent_hash();
669 let mut current_number = new_head_number - 1;
670
671 while current_number > current_canonical_number {
676 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
677 {
678 current_hash = block.recovered_block().parent_hash();
679 current_number -= 1;
680 new_chain.push(block);
681 } else {
682 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
683 return Ok(None)
686 }
687 }
688
689 if current_hash == self.state.tree_state.current_canonical_head.hash {
692 new_chain.reverse();
693
694 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
696 }
697
698 let mut old_chain = Vec::new();
700 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
701
702 while current_canonical_number > current_number {
705 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
706 old_chain.push(block.clone());
707 old_hash = block.recovered_block().parent_hash();
708 current_canonical_number -= 1;
709 } else {
710 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
712 return Ok(None)
713 }
714 }
715
716 debug_assert_eq!(current_number, current_canonical_number);
718
719 while old_hash != current_hash {
722 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
723 old_hash = block.recovered_block().parent_hash();
724 old_chain.push(block);
725 } else {
726 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
728 return Ok(None)
729 }
730
731 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
732 {
733 current_hash = block.recovered_block().parent_hash();
734 new_chain.push(block);
735 } else {
736 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
738 return Ok(None)
739 }
740 }
741 new_chain.reverse();
742 old_chain.reverse();
743
744 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
745 }
746
747 fn update_latest_block_to_canonical_ancestor(
759 &mut self,
760 canonical_header: &SealedHeader<N::BlockHeader>,
761 ) -> ProviderResult<()> {
762 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
763 let current_head_number = self.state.tree_state.canonical_block_number();
764 let new_head_number = canonical_header.number();
765 let new_head_hash = canonical_header.hash();
766
767 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
769
770 if new_head_number < current_head_number {
772 debug!(
773 target: "engine::tree",
774 current_head = current_head_number,
775 new_head = new_head_number,
776 new_head_hash = ?new_head_hash,
777 "FCU unwind detected: reverting to canonical ancestor"
778 );
779
780 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
781 } else {
782 debug!(
783 target: "engine::tree",
784 previous_head = current_head_number,
785 new_head = new_head_number,
786 new_head_hash = ?new_head_hash,
787 "Advancing latest block to canonical ancestor"
788 );
789 self.handle_chain_advance_or_same_height(canonical_header)
790 }
791 }
792
793 fn handle_canonical_chain_unwind(
796 &self,
797 current_head_number: u64,
798 canonical_header: &SealedHeader<N::BlockHeader>,
799 ) -> ProviderResult<()> {
800 let new_head_number = canonical_header.number();
801 debug!(
802 target: "engine::tree",
803 from = current_head_number,
804 to = new_head_number,
805 "Handling unwind: collecting blocks to remove from in-memory state"
806 );
807
808 let old_blocks =
810 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
811
812 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
814 }
815
816 fn collect_blocks_for_canonical_unwind(
818 &self,
819 new_head_number: u64,
820 current_head_number: u64,
821 ) -> Vec<ExecutedBlock<N>> {
822 let mut old_blocks = Vec::new();
823
824 for block_num in (new_head_number + 1)..=current_head_number {
825 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
826 let executed_block = block_state.block_ref().block.clone();
827 old_blocks.push(executed_block);
828 debug!(
829 target: "engine::tree",
830 block_number = block_num,
831 "Collected block for removal from in-memory state"
832 );
833 }
834 }
835
836 if old_blocks.is_empty() {
837 debug!(
838 target: "engine::tree",
839 "No blocks found in memory to remove, will clear and reset state"
840 );
841 }
842
843 old_blocks
844 }
845
846 fn apply_canonical_ancestor_via_reorg(
848 &self,
849 canonical_header: &SealedHeader<N::BlockHeader>,
850 old_blocks: Vec<ExecutedBlock<N>>,
851 ) -> ProviderResult<()> {
852 let new_head_hash = canonical_header.hash();
853 let new_head_number = canonical_header.number();
854
855 match self.canonical_block_by_hash(new_head_hash)? {
857 Some(executed_block) => {
858 let block_with_trie = ExecutedBlockWithTrieUpdates {
859 block: executed_block,
860 trie: ExecutedTrieUpdates::Missing,
861 };
862
863 self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
865 new: vec![block_with_trie],
866 old: old_blocks,
867 });
868
869 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
872
873 debug!(
874 target: "engine::tree",
875 block_number = new_head_number,
876 block_hash = ?new_head_hash,
877 "Successfully loaded canonical ancestor into memory via reorg"
878 );
879 }
880 None => {
881 warn!(
883 target: "engine::tree",
884 block_hash = ?new_head_hash,
885 "Could not find canonical ancestor block, updating header only"
886 );
887 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
888 }
889 }
890
891 Ok(())
892 }
893
894 fn handle_chain_advance_or_same_height(
896 &self,
897 canonical_header: &SealedHeader<N::BlockHeader>,
898 ) -> ProviderResult<()> {
899 let new_head_number = canonical_header.number();
900 let new_head_hash = canonical_header.hash();
901
902 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
904
905 self.ensure_block_in_memory(new_head_number, new_head_hash)
907 }
908
909 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
911 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
913 return Ok(());
914 }
915
916 if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
918 let block_with_trie = ExecutedBlockWithTrieUpdates {
919 block: executed_block,
920 trie: ExecutedTrieUpdates::Missing,
921 };
922
923 self.canonical_in_memory_state
924 .update_chain(NewCanonicalChain::Commit { new: vec![block_with_trie] });
925
926 debug!(
927 target: "engine::tree",
928 block_number,
929 block_hash = ?block_hash,
930 "Added canonical block to in-memory state"
931 );
932 }
933
934 Ok(())
935 }
936
937 fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
947 let target_hash = target.block.hash;
948 let canonical_head = self.state.tree_state.canonical_head();
950 let mut current_hash;
951 let mut current_block = target;
952 loop {
953 if current_block.block.hash == canonical_head.hash {
954 return Ok(false)
955 }
956 if current_block.block.number <= canonical_head.number {
958 break
959 }
960 current_hash = current_block.parent;
961
962 let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
963 current_block = next_block.block_with_parent();
964 }
965
966 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
968 return Ok(false)
969 }
970
971 if self.provider.block_number(target_hash)?.is_some() {
973 return Ok(false)
974 }
975
976 Ok(true)
977 }
978
979 fn persisting_kind_for(&self, block: BlockWithParent) -> PersistingKind {
981 let Some(action) = self.persistence_state.current_action() else {
983 return PersistingKind::NotPersisting
984 };
985 let CurrentPersistenceAction::SavingBlocks { highest } = action else {
987 return PersistingKind::PersistingNotDescendant
988 };
989
990 if block.block.number > highest.number &&
993 self.state.tree_state.is_descendant(*highest, block)
994 {
995 return PersistingKind::PersistingDescendant
996 }
997
998 PersistingKind::PersistingNotDescendant
1000 }
1001
1002 #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
1011 fn on_forkchoice_updated(
1012 &mut self,
1013 state: ForkchoiceState,
1014 attrs: Option<T::PayloadAttributes>,
1015 version: EngineApiMessageVersion,
1016 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1017 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1018
1019 self.record_forkchoice_metrics(&attrs);
1021
1022 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1024 return Ok(TreeOutcome::new(early_result));
1025 }
1026
1027 if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
1029 return Ok(result);
1030 }
1031
1032 if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1035 return Ok(result);
1036 }
1037
1038 self.handle_missing_block(state)
1040 }
1041
1042 fn record_forkchoice_metrics(&self, attrs: &Option<T::PayloadAttributes>) {
1044 self.metrics.engine.forkchoice_updated_messages.increment(1);
1045 if attrs.is_some() {
1046 self.metrics.engine.forkchoice_with_attributes_updated_messages.increment(1);
1047 }
1048 self.canonical_in_memory_state.on_forkchoice_update_received();
1049 }
1050
1051 fn validate_forkchoice_state(
1056 &mut self,
1057 state: ForkchoiceState,
1058 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1059 if state.head_block_hash.is_zero() {
1060 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1061 }
1062
1063 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1066 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1067 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1068 }
1069
1070 if !self.backfill_sync_state.is_idle() {
1071 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1074 return Ok(Some(OnForkChoiceUpdated::syncing()));
1075 }
1076
1077 Ok(None)
1078 }
1079
1080 fn handle_canonical_head(
1086 &self,
1087 state: ForkchoiceState,
1088 attrs: &Option<T::PayloadAttributes>, version: EngineApiMessageVersion,
1090 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1091 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1106 return Ok(None);
1107 }
1108
1109 trace!(target: "engine::tree", "fcu head hash is already canonical");
1110
1111 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1113 return Ok(Some(TreeOutcome::new(outcome)));
1115 }
1116
1117 if let Some(attr) = attrs {
1119 let tip = self
1120 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1121 .ok_or_else(|| {
1122 ProviderError::HeaderNotFound(state.head_block_hash.into())
1125 })?;
1126 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1128 return Ok(Some(TreeOutcome::new(updated)));
1129 }
1130
1131 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1133 PayloadStatusEnum::Valid,
1134 Some(state.head_block_hash),
1135 )));
1136 Ok(Some(outcome))
1137 }
1138
1139 fn apply_chain_update(
1151 &mut self,
1152 state: ForkchoiceState,
1153 attrs: &Option<T::PayloadAttributes>,
1154 version: EngineApiMessageVersion,
1155 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1156 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1158 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1159
1160 if self.engine_kind.is_opstack() ||
1163 self.config.always_process_payload_attributes_on_canonical_head()
1164 {
1165 if let Some(attr) = attrs {
1166 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1167 let updated = self.process_payload_attributes(
1169 attr.clone(),
1170 &canonical_header,
1171 state,
1172 version,
1173 );
1174 return Ok(Some(TreeOutcome::new(updated)));
1175 }
1176
1177 if self.config.unwind_canonical_header() {
1185 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1186 }
1187 }
1188
1189 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1201 PayloadStatusEnum::Valid,
1202 Some(state.head_block_hash),
1203 )));
1204 return Ok(Some(outcome));
1205 }
1206
1207 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1209 let tip = chain_update.tip().clone_sealed_header();
1210 self.on_canonical_chain_update(chain_update);
1211
1212 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1214 return Ok(Some(TreeOutcome::new(outcome)));
1216 }
1217
1218 if let Some(attr) = attrs {
1219 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1221 return Ok(Some(TreeOutcome::new(updated)));
1222 }
1223
1224 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1225 PayloadStatusEnum::Valid,
1226 Some(state.head_block_hash),
1227 )));
1228 return Ok(Some(outcome));
1229 }
1230
1231 Ok(None)
1232 }
1233
1234 fn handle_missing_block(
1239 &self,
1240 state: ForkchoiceState,
1241 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1242 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1249 !state.safe_block_hash.is_zero() &&
1251 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1252 {
1253 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1254 state.safe_block_hash
1255 } else {
1256 state.head_block_hash
1257 };
1258
1259 let target = self.lowest_buffered_ancestor_or(target);
1260 trace!(target: "engine::tree", %target, "downloading missing block");
1261
1262 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1263 PayloadStatusEnum::Syncing,
1264 )))
1265 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1266 }
1267
1268 #[expect(clippy::type_complexity)]
1277 fn try_recv_engine_message(
1278 &self,
1279 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1280 if self.persistence_state.in_progress() {
1281 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1283 Ok(msg) => Ok(Some(msg)),
1284 Err(err) => match err {
1285 RecvTimeoutError::Timeout => Ok(None),
1286 RecvTimeoutError::Disconnected => Err(RecvError),
1287 },
1288 }
1289 } else {
1290 self.incoming.recv().map(Some)
1291 }
1292 }
1293
1294 fn remove_blocks(&mut self, new_tip_num: u64) {
1297 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1298 if new_tip_num < self.persistence_state.last_persisted_block.number {
1299 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1300 let (tx, rx) = oneshot::channel();
1301 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1302 self.persistence_state.start_remove(new_tip_num, rx);
1303 }
1304 }
1305
1306 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1309 if blocks_to_persist.is_empty() {
1310 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1311 return
1312 }
1313
1314 let highest_num_hash = blocks_to_persist
1316 .iter()
1317 .max_by_key(|block| block.recovered_block().number())
1318 .map(|b| b.recovered_block().num_hash())
1319 .expect("Checked non-empty persisting blocks");
1320
1321 debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1322 let (tx, rx) = oneshot::channel();
1323 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1324
1325 self.persistence_state.start_save(highest_num_hash, rx);
1326 }
1327
1328 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1333 if self.persistence_state.in_progress() {
1334 let (mut rx, start_time, current_action) = self
1335 .persistence_state
1336 .rx
1337 .take()
1338 .expect("if a persistence task is in progress Receiver must be Some");
1339 match rx.try_recv() {
1341 Ok(last_persisted_hash_num) => {
1342 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1343 let Some(BlockNumHash {
1344 hash: last_persisted_block_hash,
1345 number: last_persisted_block_number,
1346 }) = last_persisted_hash_num
1347 else {
1348 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1351 return Ok(())
1352 };
1353
1354 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1355 self.persistence_state
1356 .finish(last_persisted_block_hash, last_persisted_block_number);
1357 self.on_new_persisted_block()?;
1358 }
1359 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1360 Err(TryRecvError::Empty) => {
1361 self.persistence_state.rx = Some((rx, start_time, current_action))
1362 }
1363 }
1364 }
1365
1366 if !self.persistence_state.in_progress() {
1367 if let Some(new_tip_num) = self.find_disk_reorg()? {
1368 self.remove_blocks(new_tip_num)
1369 } else if self.should_persist() {
1370 let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1371 self.persist_blocks(blocks_to_persist);
1372 }
1373 }
1374
1375 Ok(())
1376 }
1377
1378 fn on_engine_message(
1380 &mut self,
1381 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1382 ) -> Result<(), InsertBlockFatalError> {
1383 match msg {
1384 FromEngine::Event(event) => match event {
1385 FromOrchestrator::BackfillSyncStarted => {
1386 debug!(target: "engine::tree", "received backfill sync started event");
1387 self.backfill_sync_state = BackfillSyncState::Active;
1388 }
1389 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1390 self.on_backfill_sync_finished(ctrl)?;
1391 }
1392 },
1393 FromEngine::Request(request) => {
1394 match request {
1395 EngineApiRequest::InsertExecutedBlock(block) => {
1396 let block_num_hash = block.recovered_block().num_hash();
1397 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1398 return Ok(())
1400 }
1401
1402 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1403 let now = Instant::now();
1404
1405 if self.state.tree_state.canonical_block_hash() ==
1408 block.recovered_block().parent_hash()
1409 {
1410 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1411 self.canonical_in_memory_state.set_pending_block(block.clone());
1412 }
1413
1414 self.state.tree_state.insert_executed(block.clone());
1415 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1416 self.emit_event(EngineApiEvent::BeaconConsensus(
1417 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1418 ));
1419 }
1420 EngineApiRequest::Beacon(request) => {
1421 match request {
1422 BeaconEngineMessage::ForkchoiceUpdated {
1423 state,
1424 payload_attrs,
1425 tx,
1426 version,
1427 } => {
1428 let mut output =
1429 self.on_forkchoice_updated(state, payload_attrs, version);
1430
1431 if let Ok(res) = &mut output {
1432 self.state
1434 .forkchoice_state_tracker
1435 .set_latest(state, res.outcome.forkchoice_status());
1436
1437 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1439 state,
1440 res.outcome.forkchoice_status(),
1441 ));
1442
1443 self.on_maybe_tree_event(res.event.take())?;
1445 }
1446
1447 if let Err(err) =
1448 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1449 {
1450 self.metrics
1451 .engine
1452 .failed_forkchoice_updated_response_deliveries
1453 .increment(1);
1454 error!(target: "engine::tree", "Failed to send event: {err:?}");
1455 }
1456 }
1457 BeaconEngineMessage::NewPayload { payload, tx } => {
1458 let mut output = self.on_new_payload(payload);
1459
1460 let maybe_event =
1461 output.as_mut().ok().and_then(|out| out.event.take());
1462
1463 if let Err(err) =
1465 tx.send(output.map(|o| o.outcome).map_err(|e| {
1466 BeaconOnNewPayloadError::Internal(Box::new(e))
1467 }))
1468 {
1469 error!(target: "engine::tree", "Failed to send event: {err:?}");
1470 self.metrics
1471 .engine
1472 .failed_new_payload_response_deliveries
1473 .increment(1);
1474 }
1475
1476 self.on_maybe_tree_event(maybe_event)?;
1478 }
1479 }
1480 }
1481 }
1482 }
1483 FromEngine::DownloadedBlocks(blocks) => {
1484 if let Some(event) = self.on_downloaded(blocks)? {
1485 self.on_tree_event(event)?;
1486 }
1487 }
1488 }
1489 Ok(())
1490 }
1491
1492 fn on_backfill_sync_finished(
1506 &mut self,
1507 ctrl: ControlFlow,
1508 ) -> Result<(), InsertBlockFatalError> {
1509 debug!(target: "engine::tree", "received backfill sync finished event");
1510 self.backfill_sync_state = BackfillSyncState::Idle;
1511
1512 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1514 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1515 self.state.invalid_headers.insert(**bad_block);
1517
1518 Some(*target)
1520 } else {
1521 ctrl.block_number()
1523 };
1524
1525 let Some(backfill_height) = backfill_height else { return Ok(()) };
1527
1528 let Some(backfill_num_hash) = self
1534 .provider
1535 .block_hash(backfill_height)?
1536 .map(|hash| BlockNumHash { hash, number: backfill_height })
1537 else {
1538 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1539 return Ok(())
1540 };
1541
1542 if ctrl.is_unwind() {
1543 self.state.tree_state.reset(backfill_num_hash)
1546 } else {
1547 self.state.tree_state.remove_until(
1548 backfill_num_hash,
1549 self.persistence_state.last_persisted_block.hash,
1550 Some(backfill_num_hash),
1551 );
1552 }
1553
1554 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1555 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1556
1557 self.state.buffer.remove_old_blocks(backfill_height);
1559 self.canonical_in_memory_state.clear_state();
1562
1563 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1564 self.state.tree_state.set_canonical_head(new_head.num_hash());
1567 self.persistence_state.finish(new_head.hash(), new_head.number());
1568
1569 self.canonical_in_memory_state.set_canonical_head(new_head);
1571 }
1572
1573 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1576 else {
1577 return Ok(())
1578 };
1579 if sync_target_state.finalized_block_hash.is_zero() {
1580 return Ok(())
1582 }
1583 let newest_finalized = self
1585 .state
1586 .buffer
1587 .block(&sync_target_state.finalized_block_hash)
1588 .map(|block| block.number());
1589
1590 if let Some(backfill_target) =
1596 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1597 self.backfill_sync_target(progress, finalized_number, None)
1600 })
1601 {
1602 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1604 backfill_target.into(),
1605 )));
1606 return Ok(())
1607 };
1608
1609 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1611 }
1612
1613 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1617 if let Some(chain_update) = self.on_new_head(target)? {
1618 self.on_canonical_chain_update(chain_update);
1619 }
1620
1621 Ok(())
1622 }
1623
1624 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1626 if let Some(event) = event {
1627 self.on_tree_event(event)?;
1628 }
1629
1630 Ok(())
1631 }
1632
1633 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1637 match event {
1638 TreeEvent::TreeAction(action) => match action {
1639 TreeAction::MakeCanonical { sync_target_head } => {
1640 self.make_canonical(sync_target_head)?;
1641 }
1642 },
1643 TreeEvent::BackfillAction(action) => {
1644 self.emit_event(EngineApiEvent::BackfillAction(action));
1645 }
1646 TreeEvent::Download(action) => {
1647 self.emit_event(EngineApiEvent::Download(action));
1648 }
1649 }
1650
1651 Ok(())
1652 }
1653
1654 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1656 let event = event.into();
1657
1658 if event.is_backfill_action() {
1659 debug_assert_eq!(
1660 self.backfill_sync_state,
1661 BackfillSyncState::Idle,
1662 "backfill action should only be emitted when backfill is idle"
1663 );
1664
1665 if self.persistence_state.in_progress() {
1666 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1669 return
1670 }
1671
1672 self.backfill_sync_state = BackfillSyncState::Pending;
1673 self.metrics.engine.pipeline_runs.increment(1);
1674 debug!(target: "engine::tree", "emitting backfill action event");
1675 }
1676
1677 let _ = self.outgoing.send(event).inspect_err(
1678 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1679 );
1680 }
1681
1682 pub const fn should_persist(&self) -> bool {
1686 if !self.backfill_sync_state.is_idle() {
1687 return false
1689 }
1690
1691 let min_block = self.persistence_state.last_persisted_block.number;
1692 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1693 self.config.persistence_threshold()
1694 }
1695
1696 fn get_canonical_blocks_to_persist(
1708 &mut self,
1709 ) -> Result<Vec<ExecutedBlockWithTrieUpdates<N>>, AdvancePersistenceError> {
1710 debug_assert!(!self.persistence_state.in_progress());
1713
1714 let mut blocks_to_persist = Vec::new();
1715 let mut current_hash = self.state.tree_state.canonical_block_hash();
1716 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1717 let canonical_head_number = self.state.tree_state.canonical_block_number();
1718 let all_blocks_have_trie_updates = self
1719 .state
1720 .tree_state
1721 .blocks_by_hash
1722 .values()
1723 .all(|block| block.trie_updates().is_some());
1724
1725 let target_number = if all_blocks_have_trie_updates {
1726 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1728 } else {
1729 canonical_head_number
1731 };
1732
1733 debug!(
1734 target: "engine::tree",
1735 ?current_hash,
1736 ?last_persisted_number,
1737 ?canonical_head_number,
1738 ?all_blocks_have_trie_updates,
1739 ?target_number,
1740 "Returning canonical blocks to persist"
1741 );
1742 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1743 if block.recovered_block().number() <= last_persisted_number {
1744 break;
1745 }
1746
1747 if block.recovered_block().number() <= target_number {
1748 blocks_to_persist.push(block.clone());
1749 }
1750
1751 current_hash = block.recovered_block().parent_hash();
1752 }
1753
1754 blocks_to_persist.reverse();
1756
1757 for block in &mut blocks_to_persist {
1759 if block.trie.is_present() {
1760 continue
1761 }
1762
1763 debug!(
1764 target: "engine::tree",
1765 block = ?block.recovered_block().num_hash(),
1766 "Calculating trie updates before persisting"
1767 );
1768
1769 let provider = self
1770 .state_provider_builder(block.recovered_block().parent_hash())?
1771 .ok_or(AdvancePersistenceError::MissingAncestor(
1772 block.recovered_block().parent_hash(),
1773 ))?
1774 .build()?;
1775
1776 let mut trie_input = self.compute_trie_input(
1777 self.persisting_kind_for(block.recovered_block.block_with_parent()),
1778 self.provider.database_provider_ro()?,
1779 block.recovered_block().parent_hash(),
1780 None,
1781 )?;
1782 trie_input.append_ref(block.hashed_state());
1784 let (_root, updates) = provider.state_root_from_nodes_with_updates(trie_input)?;
1785 debug_assert_eq!(_root, block.recovered_block().state_root());
1786
1787 let trie_updates = Arc::new(updates);
1789 let tree_state_block = self
1790 .state
1791 .tree_state
1792 .blocks_by_hash
1793 .get_mut(&block.recovered_block().hash())
1794 .expect("blocks to persist are constructed from tree state blocks");
1795 tree_state_block.trie.set_present(trie_updates.clone());
1796 block.trie.set_present(trie_updates);
1797 }
1798
1799 Ok(blocks_to_persist)
1800 }
1801
1802 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1810 if let Some(remove_above) = self.find_disk_reorg()? {
1813 self.remove_blocks(remove_above);
1814 return Ok(())
1815 }
1816
1817 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1818 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1819 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1820 number: self.persistence_state.last_persisted_block.number,
1821 hash: self.persistence_state.last_persisted_block.hash,
1822 });
1823 Ok(())
1824 }
1825
1826 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1834 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1835 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1837 return Ok(Some(block.block.clone()))
1838 }
1839
1840 let (block, senders) = self
1841 .provider
1842 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1843 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1844 .split_sealed();
1845 let execution_output = self
1846 .provider
1847 .get_state(block.header().number())?
1848 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1849 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1850
1851 Ok(Some(ExecutedBlock {
1852 recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1853 execution_output: Arc::new(execution_output),
1854 hashed_state: Arc::new(hashed_state),
1855 }))
1856 }
1857
1858 fn sealed_header_by_hash(
1860 &self,
1861 hash: B256,
1862 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1863 let header = self.state.tree_state.sealed_header_by_hash(&hash);
1865
1866 if header.is_some() {
1867 Ok(header)
1868 } else {
1869 self.provider.sealed_header_by_hash(hash)
1870 }
1871 }
1872
1873 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1880 self.state
1881 .buffer
1882 .lowest_ancestor(&hash)
1883 .map(|block| block.parent_hash())
1884 .unwrap_or_else(|| hash)
1885 }
1886
1887 fn latest_valid_hash_for_invalid_payload(
1898 &mut self,
1899 parent_hash: B256,
1900 ) -> ProviderResult<Option<B256>> {
1901 if self.sealed_header_by_hash(parent_hash)?.is_some() {
1903 return Ok(Some(parent_hash))
1904 }
1905
1906 let mut current_hash = parent_hash;
1909 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1910 while let Some(block_with_parent) = current_block {
1911 current_hash = block_with_parent.parent;
1912 current_block = self.state.invalid_headers.get(¤t_hash);
1913
1914 if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1917 return Ok(Some(current_hash))
1918 }
1919 }
1920 Ok(None)
1921 }
1922
1923 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1927 if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1930 !parent.difficulty().is_zero()
1931 {
1932 parent_hash = B256::ZERO;
1933 }
1934
1935 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1936 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1937 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1938 })
1939 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1940 }
1941
1942 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1946 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1947 return target.head_block_hash == block_hash
1948 }
1949 false
1950 }
1951
1952 fn check_invalid_ancestor_with_head(
1958 &mut self,
1959 check: B256,
1960 head: &SealedBlock<N::Block>,
1961 ) -> ProviderResult<Option<PayloadStatus>> {
1962 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1964
1965 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1966 }
1967
1968 fn on_invalid_new_payload(
1970 &mut self,
1971 head: SealedBlock<N::Block>,
1972 invalid: BlockWithParent,
1973 ) -> ProviderResult<PayloadStatus> {
1974 let status = self.prepare_invalid_response(invalid.parent)?;
1976
1977 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1979 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1980
1981 Ok(status)
1982 }
1983
1984 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1996 let parent_hash = payload.parent_hash();
1997 let block_hash = payload.block_hash();
1998 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1999 if lowest_buffered_ancestor == block_hash {
2000 lowest_buffered_ancestor = parent_hash;
2001 }
2002
2003 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2005 }
2006
2007 fn handle_invalid_ancestor_payload(
2016 &mut self,
2017 payload: T::ExecutionData,
2018 invalid: BlockWithParent,
2019 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2020 let parent_hash = payload.parent_hash();
2021
2022 let block = match self.payload_validator.ensure_well_formed_payload(payload) {
2028 Ok(block) => block,
2029 Err(error) => return Ok(self.on_new_payload_error(error, parent_hash)?),
2030 };
2031
2032 Ok(self.on_invalid_new_payload(block.into_sealed_block(), invalid)?)
2033 }
2034
2035 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2038 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2040
2041 match self.prepare_invalid_response(header.parent) {
2043 Ok(status) => Ok(Some(status)),
2044 Err(err) => {
2045 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2046 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2048 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2049 })))
2050 }
2051 }
2052 }
2053
2054 fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
2057 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2058 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2059 return Err(e)
2060 }
2061
2062 if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
2063 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2064 return Err(e)
2065 }
2066
2067 Ok(())
2068 }
2069
2070 #[instrument(level = "trace", skip(self), target = "engine::tree")]
2072 fn try_connect_buffered_blocks(
2073 &mut self,
2074 parent: BlockNumHash,
2075 ) -> Result<(), InsertBlockFatalError> {
2076 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2077
2078 if blocks.is_empty() {
2079 return Ok(())
2081 }
2082
2083 let now = Instant::now();
2084 let block_count = blocks.len();
2085 for child in blocks {
2086 let child_num_hash = child.num_hash();
2087 match self.insert_block(child) {
2088 Ok(res) => {
2089 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2090 if self.is_sync_target_head(child_num_hash.hash) &&
2091 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2092 {
2093 self.make_canonical(child_num_hash.hash)?;
2094 }
2095 }
2096 Err(err) => {
2097 if let InsertPayloadError::Block(err) = err {
2098 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2099 if let Err(fatal) = self.on_insert_block_error(err) {
2100 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2101 return Err(fatal)
2102 }
2103 }
2104 }
2105 }
2106 }
2107
2108 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2109 Ok(())
2110 }
2111
2112 fn buffer_block(
2114 &mut self,
2115 block: RecoveredBlock<N::Block>,
2116 ) -> Result<(), InsertBlockError<N::Block>> {
2117 if let Err(err) = self.validate_block(&block) {
2118 return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
2119 }
2120 self.state.buffer.insert_block(block);
2121 Ok(())
2122 }
2123
2124 #[inline]
2129 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2130 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2131 }
2132
2133 #[inline]
2136 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2137 if block > local_tip {
2138 Some(block - local_tip)
2139 } else {
2140 None
2141 }
2142 }
2143
2144 fn backfill_sync_target(
2151 &self,
2152 canonical_tip_num: u64,
2153 target_block_number: u64,
2154 downloaded_block: Option<BlockNumHash>,
2155 ) -> Option<B256> {
2156 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2157
2158 let exceeds_backfill_threshold =
2160 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2161 (Some(downloaded_block), Some(state))
2163 if downloaded_block.hash == state.finalized_block_hash =>
2164 {
2165 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2166 }
2167 _ => match sync_target_state
2168 .as_ref()
2169 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2170 {
2171 Some(buffered_finalized) => {
2172 self.exceeds_backfill_run_threshold(
2175 canonical_tip_num,
2176 buffered_finalized.number(),
2177 )
2178 }
2179 None => {
2180 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2182 }
2183 },
2184 };
2185
2186 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2188 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2190 Err(err) => {
2191 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2192 }
2193 Ok(None) => {
2194 if !state.finalized_block_hash.is_zero() {
2196 return Some(state.finalized_block_hash)
2199 }
2200
2201 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2214 return Some(state.head_block_hash)
2215 }
2216 Ok(Some(_)) => {
2217 }
2219 }
2220 }
2221
2222 None
2223 }
2224
2225 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2228 let mut canonical = self.state.tree_state.current_canonical_head;
2229 let mut persisted = self.persistence_state.last_persisted_block;
2230
2231 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2232 Ok(self
2233 .sealed_header_by_hash(num_hash.hash)?
2234 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2235 .parent_num_hash())
2236 };
2237
2238 while canonical.number > persisted.number {
2241 canonical = parent_num_hash(canonical)?;
2242 }
2243
2244 if canonical == persisted {
2246 return Ok(None);
2247 }
2248
2249 while persisted.number > canonical.number {
2255 persisted = parent_num_hash(persisted)?;
2256 }
2257
2258 debug_assert_eq!(persisted.number, canonical.number);
2259
2260 while persisted.hash != canonical.hash {
2262 canonical = parent_num_hash(canonical)?;
2263 persisted = parent_num_hash(persisted)?;
2264 }
2265
2266 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2267
2268 Ok(Some(persisted.number))
2269 }
2270
2271 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2275 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2276 let start = Instant::now();
2277
2278 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2280
2281 let tip = chain_update.tip().clone_sealed_header();
2282 let notification = chain_update.to_chain_notification();
2283
2284 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2286 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2287 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2288 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2289
2290 self.update_reorg_metrics(old.len());
2291 self.reinsert_reorged_blocks(new.clone());
2292 let old = old
2295 .iter()
2296 .filter_map(|block| {
2297 let trie = self
2298 .state
2299 .tree_state
2300 .persisted_trie_updates
2301 .get(&block.recovered_block.hash())?
2302 .1
2303 .clone();
2304 Some(ExecutedBlockWithTrieUpdates {
2305 block: block.clone(),
2306 trie: ExecutedTrieUpdates::Present(trie),
2307 })
2308 })
2309 .collect::<Vec<_>>();
2310 self.reinsert_reorged_blocks(old);
2311 }
2312
2313 self.canonical_in_memory_state.update_chain(chain_update);
2315 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2316
2317 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2319
2320 self.canonical_in_memory_state.notify_canon_state(notification);
2322
2323 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2325 Box::new(tip),
2326 start.elapsed(),
2327 ));
2328 }
2329
2330 fn update_reorg_metrics(&self, old_chain_length: usize) {
2332 self.metrics.tree.reorgs.increment(1);
2333 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2334 }
2335
2336 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
2338 for block in new_chain {
2339 if self
2340 .state
2341 .tree_state
2342 .executed_block_by_hash(block.recovered_block().hash())
2343 .is_none()
2344 {
2345 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2346 self.state.tree_state.insert_executed(block);
2347 }
2348 }
2349 }
2350
2351 fn on_disconnected_downloaded_block(
2356 &self,
2357 downloaded_block: BlockNumHash,
2358 missing_parent: BlockNumHash,
2359 head: BlockNumHash,
2360 ) -> Option<TreeEvent> {
2361 if let Some(target) =
2363 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2364 {
2365 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2366 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2367 }
2368
2369 let request = if let Some(distance) =
2379 self.distance_from_local_tip(head.number, missing_parent.number)
2380 {
2381 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2382 DownloadRequest::BlockRange(missing_parent.hash, distance)
2383 } else {
2384 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2385 DownloadRequest::single_block(missing_parent.hash)
2388 };
2389
2390 Some(TreeEvent::Download(request))
2391 }
2392
2393 #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2399 fn on_downloaded_block(
2400 &mut self,
2401 block: RecoveredBlock<N::Block>,
2402 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2403 let block_num_hash = block.num_hash();
2404 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2405 if self
2406 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2407 .is_some()
2408 {
2409 return Ok(None)
2410 }
2411
2412 if !self.backfill_sync_state.is_idle() {
2413 return Ok(None)
2414 }
2415
2416 match self.insert_block(block) {
2418 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2419 if self.is_sync_target_head(block_num_hash.hash) {
2420 trace!(target: "engine::tree", "appended downloaded sync target block");
2421
2422 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2425 sync_target_head: block_num_hash.hash,
2426 })))
2427 }
2428 trace!(target: "engine::tree", "appended downloaded block");
2429 self.try_connect_buffered_blocks(block_num_hash)?;
2430 }
2431 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2432 return Ok(self.on_disconnected_downloaded_block(
2435 block_num_hash,
2436 missing_ancestor,
2437 head,
2438 ))
2439 }
2440 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2441 trace!(target: "engine::tree", "downloaded block already executed");
2442 }
2443 Err(err) => {
2444 if let InsertPayloadError::Block(err) = err {
2445 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2446 if let Err(fatal) = self.on_insert_block_error(err) {
2447 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2448 return Err(fatal)
2449 }
2450 }
2451 }
2452 }
2453 Ok(None)
2454 }
2455
2456 fn insert_payload(
2465 &mut self,
2466 payload: T::ExecutionData,
2467 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2468 self.insert_block_or_payload(
2469 payload.block_with_parent(),
2470 payload,
2471 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2472 |this, payload| Ok(this.payload_validator.ensure_well_formed_payload(payload)?),
2473 )
2474 }
2475
2476 fn insert_block(
2477 &mut self,
2478 block: RecoveredBlock<N::Block>,
2479 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2480 self.insert_block_or_payload(
2481 block.block_with_parent(),
2482 block,
2483 |validator, block, ctx| validator.validate_block(block, ctx),
2484 |_, block| Ok(block),
2485 )
2486 }
2487
2488 fn insert_block_or_payload<Input, Err>(
2505 &mut self,
2506 block_id: BlockWithParent,
2507 input: Input,
2508 execute: impl FnOnce(
2509 &mut V,
2510 Input,
2511 TreeCtx<'_, N>,
2512 ) -> Result<ExecutedBlockWithTrieUpdates<N>, Err>,
2513 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<RecoveredBlock<N::Block>, Err>,
2514 ) -> Result<InsertPayloadOk, Err>
2515 where
2516 Err: From<InsertBlockError<N::Block>>,
2517 {
2518 let block_insert_start = Instant::now();
2519 let block_num_hash = block_id.block;
2520 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2521
2522 match self.sealed_header_by_hash(block_num_hash.hash) {
2523 Err(err) => {
2524 let block = convert_to_block(self, input)?;
2525 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2526 }
2527 Ok(Some(_)) => {
2528 convert_to_block(self, input)?;
2531 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2532 }
2533 _ => {}
2534 };
2535
2536 match self.state_provider_builder(block_id.parent) {
2538 Err(err) => {
2539 let block = convert_to_block(self, input)?;
2540 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2541 }
2542 Ok(None) => {
2543 let block = convert_to_block(self, input)?;
2544
2545 let missing_ancestor = self
2548 .state
2549 .buffer
2550 .lowest_ancestor(&block.parent_hash())
2551 .map(|block| block.parent_num_hash())
2552 .unwrap_or_else(|| block.parent_num_hash());
2553
2554 self.state.buffer.insert_block(block);
2555
2556 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2557 head: self.state.tree_state.current_canonical_head,
2558 missing_ancestor,
2559 }))
2560 }
2561 Ok(Some(_)) => {}
2562 }
2563
2564 let is_fork = match self.is_fork(block_id) {
2566 Err(err) => {
2567 let block = convert_to_block(self, input)?;
2568 return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into());
2569 }
2570 Ok(is_fork) => is_fork,
2571 };
2572
2573 let ctx =
2574 TreeCtx::new(&mut self.state, &self.persistence_state, &self.canonical_in_memory_state);
2575
2576 let start = Instant::now();
2577
2578 let executed = execute(&mut self.payload_validator, input, ctx)?;
2579
2580 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2582 {
2583 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2584 self.canonical_in_memory_state.set_pending_block(executed.clone());
2585 }
2586
2587 self.state.tree_state.insert_executed(executed.clone());
2588 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2589
2590 let elapsed = start.elapsed();
2592 let engine_event = if is_fork {
2593 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2594 } else {
2595 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2596 };
2597 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2598
2599 self.metrics
2600 .engine
2601 .block_insert_total_duration
2602 .record(block_insert_start.elapsed().as_secs_f64());
2603 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2604 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2605 }
2606
2607 fn compute_trie_input<TP: DBProvider + BlockNumReader>(
2623 &self,
2624 persisting_kind: PersistingKind,
2625 provider: TP,
2626 parent_hash: B256,
2627 allocated_trie_input: Option<TrieInput>,
2628 ) -> ProviderResult<TrieInput> {
2629 let mut input = allocated_trie_input.unwrap_or_default();
2631
2632 let best_block_number = provider.best_block_number()?;
2633
2634 let (mut historical, mut blocks) = self
2635 .state
2636 .tree_state
2637 .blocks_by_hash(parent_hash)
2638 .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2639
2640 if persisting_kind.is_descendant() {
2643 while let Some(block) = blocks.last() {
2645 let recovered_block = block.recovered_block();
2646 if recovered_block.number() <= best_block_number {
2647 blocks.pop();
2650 } else {
2651 break
2654 }
2655 }
2656
2657 historical = if let Some(block) = blocks.last() {
2658 (block.recovered_block().number() - 1).into()
2661 } else {
2662 parent_hash.into()
2664 };
2665 }
2666
2667 if blocks.is_empty() {
2668 debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2669 } else {
2670 debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2671 }
2672
2673 let block_number = provider
2675 .convert_hash_or_number(historical)?
2676 .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2677
2678 let revert_state = if block_number == best_block_number {
2680 debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2683 HashedPostState::default()
2684 } else {
2685 let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
2686 provider.tx_ref(),
2687 block_number + 1..,
2688 )
2689 .map_err(ProviderError::from)?;
2690 debug!(
2691 target: "engine::tree",
2692 block_number,
2693 best_block_number,
2694 accounts = revert_state.accounts.len(),
2695 storages = revert_state.storages.len(),
2696 "Non-empty revert state"
2697 );
2698 revert_state
2699 };
2700 input.append(revert_state);
2701
2702 input.extend_with_blocks(
2704 blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
2705 );
2706
2707 Ok(input)
2708 }
2709
2710 fn on_insert_block_error(
2716 &mut self,
2717 error: InsertBlockError<N::Block>,
2718 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2719 let (block, error) = error.split();
2720
2721 let validation_err = error.ensure_validation_error()?;
2724
2725 warn!(
2729 target: "engine::tree",
2730 invalid_hash=%block.hash(),
2731 invalid_number=block.number(),
2732 %validation_err,
2733 "Invalid block error on new payload",
2734 );
2735 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2736
2737 self.state.invalid_headers.insert(block.block_with_parent());
2739 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2740 Box::new(block),
2741 )));
2742
2743 Ok(PayloadStatus::new(
2744 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2745 latest_valid_hash,
2746 ))
2747 }
2748
2749 fn on_new_payload_error(
2751 &mut self,
2752 error: NewPayloadError,
2753 parent_hash: B256,
2754 ) -> ProviderResult<PayloadStatus> {
2755 error!(target: "engine::tree", %error, "Invalid payload");
2756 let latest_valid_hash =
2759 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2760 None
2764 } else {
2765 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2766 };
2767
2768 let status = PayloadStatusEnum::from(error);
2769 Ok(PayloadStatus::new(status, latest_valid_hash))
2770 }
2771
2772 pub fn find_canonical_header(
2774 &self,
2775 hash: B256,
2776 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2777 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2778
2779 if canonical.is_none() {
2780 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2781 }
2782
2783 Ok(canonical)
2784 }
2785
2786 fn update_finalized_block(
2788 &self,
2789 finalized_block_hash: B256,
2790 ) -> Result<(), OnForkChoiceUpdated> {
2791 if finalized_block_hash.is_zero() {
2792 return Ok(())
2793 }
2794
2795 match self.find_canonical_header(finalized_block_hash) {
2796 Ok(None) => {
2797 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2798 return Err(OnForkChoiceUpdated::invalid_state())
2800 }
2801 Ok(Some(finalized)) => {
2802 if Some(finalized.num_hash()) !=
2803 self.canonical_in_memory_state.get_finalized_num_hash()
2804 {
2805 let _ = self.persistence.save_finalized_block_number(finalized.number());
2808 self.canonical_in_memory_state.set_finalized(finalized);
2809 }
2810 }
2811 Err(err) => {
2812 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2813 }
2814 }
2815
2816 Ok(())
2817 }
2818
2819 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2821 if safe_block_hash.is_zero() {
2822 return Ok(())
2823 }
2824
2825 match self.find_canonical_header(safe_block_hash) {
2826 Ok(None) => {
2827 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2828 return Err(OnForkChoiceUpdated::invalid_state())
2830 }
2831 Ok(Some(safe)) => {
2832 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2833 let _ = self.persistence.save_safe_block_number(safe.number());
2836 self.canonical_in_memory_state.set_safe(safe);
2837 }
2838 }
2839 Err(err) => {
2840 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2841 }
2842 }
2843
2844 Ok(())
2845 }
2846
2847 fn ensure_consistent_forkchoice_state(
2856 &self,
2857 state: ForkchoiceState,
2858 ) -> Result<(), OnForkChoiceUpdated> {
2859 self.update_finalized_block(state.finalized_block_hash)?;
2865
2866 self.update_safe_block(state.safe_block_hash)
2872 }
2873
2874 fn process_payload_attributes(
2879 &self,
2880 attrs: T::PayloadAttributes,
2881 head: &N::BlockHeader,
2882 state: ForkchoiceState,
2883 version: EngineApiMessageVersion,
2884 ) -> OnForkChoiceUpdated {
2885 if let Err(err) =
2886 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2887 {
2888 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2889 return OnForkChoiceUpdated::invalid_payload_attributes()
2890 }
2891
2892 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2897 state.head_block_hash,
2898 attrs,
2899 version as u8,
2900 ) {
2901 Ok(attributes) => {
2902 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2905
2906 OnForkChoiceUpdated::updated_with_pending_payload_id(
2918 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2919 pending_payload_id,
2920 )
2921 }
2922 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2923 }
2924 }
2925
2926 pub(crate) fn remove_before(
2933 &mut self,
2934 upper_bound: BlockNumHash,
2935 finalized_hash: Option<B256>,
2936 ) -> ProviderResult<()> {
2937 let num = if let Some(hash) = finalized_hash {
2940 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2941 } else {
2942 None
2943 };
2944
2945 self.state.tree_state.remove_until(
2946 upper_bound,
2947 self.persistence_state.last_persisted_block.hash,
2948 num,
2949 );
2950 Ok(())
2951 }
2952
2953 pub fn state_provider_builder(
2958 &self,
2959 hash: B256,
2960 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2961 where
2962 P: BlockReader + StateProviderFactory + StateReader + Clone,
2963 {
2964 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2965 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2966 return Ok(Some(StateProviderBuilder::new(
2968 self.provider.clone(),
2969 historical,
2970 Some(blocks),
2971 )))
2972 }
2973
2974 if let Some(header) = self.provider.header(hash)? {
2976 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2977 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2980 }
2981
2982 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2983 Ok(None)
2984 }
2985}
2986
2987#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2993pub enum BlockStatus {
2994 Valid,
2996 Disconnected {
2998 head: BlockNumHash,
3000 missing_ancestor: BlockNumHash,
3002 },
3003}
3004
3005#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3010pub enum InsertPayloadOk {
3011 AlreadySeen(BlockStatus),
3013 Inserted(BlockStatus),
3015}
3016
3017#[derive(Debug, Clone, Copy)]
3019pub enum PersistingKind {
3020 NotPersisting,
3022 PersistingNotDescendant,
3024 PersistingDescendant,
3026}
3027
3028impl PersistingKind {
3029 pub const fn can_run_parallel_state_root(&self) -> bool {
3034 matches!(self, Self::NotPersisting | Self::PersistingDescendant)
3035 }
3036
3037 pub const fn is_descendant(&self) -> bool {
3040 matches!(self, Self::PersistingDescendant)
3041 }
3042}