1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17 MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22 ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29 FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32 BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33 DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34 StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35 StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48 oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64
65use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
66pub use block_buffer::BlockBuffer;
67pub use invalid_headers::InvalidHeaderCache;
68pub use metrics::EngineApiMetrics;
69pub use payload_processor::*;
70pub use payload_validator::{BasicEngineValidator, EngineValidator};
71pub use persistence_state::PersistenceState;
72pub use reth_engine_primitives::TreeConfig;
73pub use reth_execution_cache::{
74 CachedStateMetrics, CachedStateProvider, ExecutionCache, PayloadExecutionCache, SavedCache,
75};
76
77pub mod state;
78
79pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
89
90const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
95
96#[derive(Clone, Debug)]
98pub struct StateProviderBuilder<N: NodePrimitives, P> {
99 provider_factory: P,
101 historical: B256,
103 overlay: Option<Vec<ExecutedBlock<N>>>,
105}
106
107impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
108 pub const fn new(
111 provider_factory: P,
112 historical: B256,
113 overlay: Option<Vec<ExecutedBlock<N>>>,
114 ) -> Self {
115 Self { provider_factory, historical, overlay }
116 }
117}
118
119impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
120where
121 P: BlockReader + StateProviderFactory + StateReader + Clone,
122{
123 pub fn build(&self) -> ProviderResult<StateProviderBox> {
125 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
126 if let Some(overlay) = self.overlay.clone() {
127 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
128 }
129 Ok(provider)
130 }
131}
132
133#[derive(Debug)]
137pub struct EngineApiTreeState<N: NodePrimitives> {
138 tree_state: TreeState<N>,
140 forkchoice_state_tracker: ForkchoiceStateTracker,
142 buffer: BlockBuffer<N::Block>,
144 invalid_headers: InvalidHeaderCache,
147}
148
149impl<N: NodePrimitives> EngineApiTreeState<N> {
150 fn new(
151 block_buffer_limit: u32,
152 max_invalid_header_cache_length: u32,
153 canonical_block: BlockNumHash,
154 engine_kind: EngineApiKind,
155 ) -> Self {
156 Self {
157 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
158 buffer: BlockBuffer::new(block_buffer_limit),
159 tree_state: TreeState::new(canonical_block, engine_kind),
160 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
161 }
162 }
163
164 pub const fn tree_state(&self) -> &TreeState<N> {
166 &self.tree_state
167 }
168
169 pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
171 self.invalid_headers.get(hash).is_some()
172 }
173}
174
175#[derive(Debug)]
177pub struct TreeOutcome<T> {
178 pub outcome: T,
180 pub event: Option<TreeEvent>,
182 pub already_seen: bool,
185}
186
187impl<T> TreeOutcome<T> {
188 pub const fn new(outcome: T) -> Self {
190 Self { outcome, event: None, already_seen: false }
191 }
192
193 pub fn with_event(mut self, event: TreeEvent) -> Self {
195 self.event = Some(event);
196 self
197 }
198
199 pub const fn with_already_seen(mut self, value: bool) -> Self {
201 self.already_seen = value;
202 self
203 }
204}
205
206#[derive(Debug)]
208pub struct TryInsertPayloadResult {
209 pub status: PayloadStatus,
213 pub already_seen: bool,
215}
216
217impl TryInsertPayloadResult {
218 #[inline]
220 pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
221 TreeOutcome::new(self.status).with_already_seen(self.already_seen)
222 }
223}
224
225#[derive(Debug)]
227pub enum TreeEvent {
228 TreeAction(TreeAction),
230 BackfillAction(BackfillAction),
232 Download(DownloadRequest),
234}
235
236impl TreeEvent {
237 const fn is_backfill_action(&self) -> bool {
239 matches!(self, Self::BackfillAction(_))
240 }
241}
242
243#[derive(Debug)]
245pub enum TreeAction {
246 MakeCanonical {
248 sync_target_head: B256,
250 },
251}
252
253pub struct EngineApiTreeHandler<N, P, T, V, C>
258where
259 N: NodePrimitives,
260 T: PayloadTypes,
261 C: ConfigureEvm<Primitives = N> + 'static,
262{
263 provider: P,
264 consensus: Arc<dyn FullConsensus<N>>,
265 payload_validator: V,
266 state: EngineApiTreeState<N>,
268 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
277 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
279 outgoing: UnboundedSender<EngineApiEvent<N>>,
281 persistence: PersistenceHandle<N>,
283 persistence_state: PersistenceState,
285 backfill_sync_state: BackfillSyncState,
287 canonical_in_memory_state: CanonicalInMemoryState<N>,
290 payload_builder: PayloadBuilderHandle<T>,
293 config: TreeConfig,
295 metrics: EngineApiMetrics,
297 engine_kind: EngineApiKind,
299 evm_config: C,
301 changeset_cache: ChangesetCache,
303 execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
307 runtime: reth_tasks::Runtime,
309}
310
311impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
312 for EngineApiTreeHandler<N, P, T, V, C>
313where
314 N: NodePrimitives,
315 C: Debug + ConfigureEvm<Primitives = N>,
316{
317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 f.debug_struct("EngineApiTreeHandler")
319 .field("provider", &self.provider)
320 .field("consensus", &self.consensus)
321 .field("payload_validator", &self.payload_validator)
322 .field("state", &self.state)
323 .field("incoming_tx", &self.incoming_tx)
324 .field("persistence", &self.persistence)
325 .field("persistence_state", &self.persistence_state)
326 .field("backfill_sync_state", &self.backfill_sync_state)
327 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
328 .field("payload_builder", &self.payload_builder)
329 .field("config", &self.config)
330 .field("metrics", &self.metrics)
331 .field("engine_kind", &self.engine_kind)
332 .field("evm_config", &self.evm_config)
333 .field("changeset_cache", &self.changeset_cache)
334 .field("execution_timing_stats", &self.execution_timing_stats.len())
335 .field("runtime", &self.runtime)
336 .finish()
337 }
338}
339
340impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
341where
342 N: NodePrimitives,
343 P: DatabaseProviderFactory
344 + BlockReader<Block = N::Block, Header = N::BlockHeader>
345 + StateProviderFactory
346 + StateReader<Receipt = N::Receipt>
347 + HashedPostStateProvider
348 + Clone
349 + 'static,
350 P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
351 + StageCheckpointReader
352 + ChangeSetReader
353 + StorageChangeSetReader
354 + StorageSettingsCache,
355 C: ConfigureEvm<Primitives = N> + 'static,
356 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
357 V: EngineValidator<T> + WaitForCaches,
358{
359 #[expect(clippy::too_many_arguments)]
361 pub fn new(
362 provider: P,
363 consensus: Arc<dyn FullConsensus<N>>,
364 payload_validator: V,
365 outgoing: UnboundedSender<EngineApiEvent<N>>,
366 state: EngineApiTreeState<N>,
367 canonical_in_memory_state: CanonicalInMemoryState<N>,
368 persistence: PersistenceHandle<N>,
369 persistence_state: PersistenceState,
370 payload_builder: PayloadBuilderHandle<T>,
371 config: TreeConfig,
372 engine_kind: EngineApiKind,
373 evm_config: C,
374 changeset_cache: ChangesetCache,
375 runtime: reth_tasks::Runtime,
376 ) -> Self {
377 let (incoming_tx, incoming) = crossbeam_channel::unbounded();
378
379 Self {
380 provider,
381 consensus,
382 payload_validator,
383 incoming,
384 outgoing,
385 persistence,
386 persistence_state,
387 backfill_sync_state: BackfillSyncState::Idle,
388 state,
389 canonical_in_memory_state,
390 payload_builder,
391 config,
392 metrics: Default::default(),
393 incoming_tx,
394 engine_kind,
395 evm_config,
396 changeset_cache,
397 execution_timing_stats: HashMap::new(),
398 runtime,
399 }
400 }
401
402 #[expect(clippy::complexity)]
408 pub fn spawn_new(
409 provider: P,
410 consensus: Arc<dyn FullConsensus<N>>,
411 payload_validator: V,
412 persistence: PersistenceHandle<N>,
413 payload_builder: PayloadBuilderHandle<T>,
414 canonical_in_memory_state: CanonicalInMemoryState<N>,
415 config: TreeConfig,
416 kind: EngineApiKind,
417 evm_config: C,
418 changeset_cache: ChangesetCache,
419 runtime: reth_tasks::Runtime,
420 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
421 {
422 let best_block_number = provider.best_block_number().unwrap_or(0);
423 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
424
425 let persistence_state = PersistenceState {
426 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
427 rx: None,
428 };
429
430 let (tx, outgoing) = unbounded_channel();
431 let state = EngineApiTreeState::new(
432 config.block_buffer_limit(),
433 config.max_invalid_header_cache_length(),
434 header.num_hash(),
435 kind,
436 );
437
438 let task = Self::new(
439 provider,
440 consensus,
441 payload_validator,
442 tx,
443 state,
444 canonical_in_memory_state,
445 persistence,
446 persistence_state,
447 payload_builder,
448 config,
449 kind,
450 evm_config,
451 changeset_cache,
452 runtime,
453 );
454 let incoming = task.incoming_tx.clone();
455 spawn_os_thread("engine", || {
456 increase_thread_priority();
457 task.run()
458 });
459 (incoming, outgoing)
460 }
461
462 fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
464 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
465 PayloadStatusEnum::Valid,
466 Some(state.head_block_hash),
467 )))
468 }
469
470 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
472 self.incoming_tx.clone()
473 }
474
475 pub fn run(mut self) {
479 loop {
480 match self.wait_for_event() {
481 LoopEvent::EngineMessage(msg) => {
482 debug!(target: "engine::tree", %msg, "received new engine message");
483 match self.on_engine_message(msg) {
484 Ok(ops::ControlFlow::Break(())) => return,
485 Ok(ops::ControlFlow::Continue(())) => {}
486 Err(fatal) => {
487 error!(target: "engine::tree", %fatal, "insert block fatal error");
488 return
489 }
490 }
491 }
492 LoopEvent::PersistenceComplete { result, start_time } => {
493 if let Err(err) = self.on_persistence_complete(result, start_time) {
494 error!(target: "engine::tree", %err, "Persistence complete handling failed");
495 return
496 }
497 }
498 LoopEvent::Disconnected => {
499 error!(target: "engine::tree", "Channel disconnected");
500 return
501 }
502 }
503
504 if let Err(err) = self.advance_persistence() {
509 error!(target: "engine::tree", %err, "Advancing persistence failed");
510 return
511 }
512 }
513 }
514
515 fn wait_for_event(&mut self) -> LoopEvent<T, N> {
521 let maybe_persistence = self.persistence_state.rx.take();
523
524 if let Some((persistence_rx, start_time, action)) = maybe_persistence {
525 crossbeam_channel::select_biased! {
528 recv(persistence_rx) -> result => {
529 match result {
531 Ok(result) => LoopEvent::PersistenceComplete {
532 result,
533 start_time,
534 },
535 Err(_) => LoopEvent::Disconnected,
536 }
537 },
538 recv(self.incoming) -> msg => {
539 self.persistence_state.rx = Some((persistence_rx, start_time, action));
541 match msg {
542 Ok(m) => LoopEvent::EngineMessage(m),
543 Err(_) => LoopEvent::Disconnected,
544 }
545 },
546 }
547 } else {
548 match self.incoming.recv() {
550 Ok(m) => LoopEvent::EngineMessage(m),
551 Err(_) => LoopEvent::Disconnected,
552 }
553 }
554 }
555
556 fn on_downloaded(
562 &mut self,
563 mut blocks: Vec<SealedBlock<N::Block>>,
564 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
565 if blocks.is_empty() {
566 return Ok(None)
568 }
569
570 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
571 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
572 for block in blocks.drain(..batch) {
573 if let Some(event) = self.on_downloaded_block(block)? {
574 let needs_backfill = event.is_backfill_action();
575 self.on_tree_event(event)?;
576 if needs_backfill {
577 return Ok(None)
579 }
580 }
581 }
582
583 if !blocks.is_empty() {
585 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
586 }
587
588 Ok(None)
589 }
590
591 #[instrument(
606 level = "debug",
607 target = "engine::tree",
608 skip_all,
609 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
610 )]
611 fn on_new_payload(
612 &mut self,
613 payload: T::ExecutionData,
614 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
615 trace!(target: "engine::tree", "invoked new payload");
616
617 let start = Instant::now();
619
620 let num_hash = payload.num_hash();
647 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
648 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
649
650 let block_hash = num_hash.hash;
651
652 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
654 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
655 return Ok(TreeOutcome::new(status));
656 }
657
658 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
660
661 let mut outcome = if self.backfill_sync_state.is_idle() {
662 self.try_insert_payload(payload)?.into_outcome()
663 } else {
664 TreeOutcome::new(self.try_buffer_payload(payload)?)
665 };
666
667 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
669 if self.state.tree_state.canonical_block_hash() != block_hash {
671 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
672 sync_target_head: block_hash,
673 }));
674 }
675 }
676
677 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
679
680 Ok(outcome)
681 }
682
683 #[instrument(level = "debug", target = "engine::tree", skip_all)]
685 fn try_insert_payload(
686 &mut self,
687 payload: T::ExecutionData,
688 ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
689 let block_hash = payload.block_hash();
690 let num_hash = payload.num_hash();
691 let parent_hash = payload.parent_hash();
692 let mut latest_valid_hash = None;
693
694 match self.insert_payload(payload) {
695 Ok(status) => {
696 let (status, already_seen) = match status {
697 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
698 latest_valid_hash = Some(block_hash);
699 self.try_connect_buffered_blocks(num_hash)?;
700 (PayloadStatusEnum::Valid, false)
701 }
702 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
703 latest_valid_hash = Some(block_hash);
704 (PayloadStatusEnum::Valid, true)
705 }
706 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
707 (PayloadStatusEnum::Syncing, false)
708 }
709 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
710 (PayloadStatusEnum::Syncing, true)
712 }
713 };
714
715 Ok(TryInsertPayloadResult {
716 status: PayloadStatus::new(status, latest_valid_hash),
717 already_seen,
718 })
719 }
720 Err(error) => {
721 let status = match error {
722 InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
723 InsertPayloadError::Payload(error) => {
724 self.on_new_payload_error(error, num_hash, parent_hash)?
725 }
726 };
727
728 Ok(TryInsertPayloadResult { status, already_seen: false })
729 }
730 }
731 }
732
733 fn try_buffer_payload(
742 &mut self,
743 payload: T::ExecutionData,
744 ) -> Result<PayloadStatus, InsertBlockFatalError> {
745 let parent_hash = payload.parent_hash();
746 let num_hash = payload.num_hash();
747
748 match self.payload_validator.convert_payload_to_block(payload) {
749 Ok(block) => {
751 if let Err(error) = self.buffer_block(block) {
752 Ok(self.on_insert_block_error(error)?)
753 } else {
754 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
755 }
756 }
757 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
758 }
759 }
760
761 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
768 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
770 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
771 self.metrics.engine.executed_new_block_cache_miss.increment(1);
772 return Ok(None)
773 };
774
775 let new_head_number = new_head_block.recovered_block().number();
776 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
777
778 let mut new_chain = vec![new_head_block.clone()];
779 let mut current_hash = new_head_block.recovered_block().parent_hash();
780 let mut current_number = new_head_number - 1;
781
782 while current_number > current_canonical_number {
787 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
788 {
789 current_hash = block.recovered_block().parent_hash();
790 current_number -= 1;
791 new_chain.push(block);
792 } else {
793 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
794 return Ok(None)
797 }
798 }
799
800 if current_hash == self.state.tree_state.current_canonical_head.hash {
803 new_chain.reverse();
804
805 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
807 }
808
809 let mut old_chain = Vec::new();
811 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
812
813 while current_canonical_number > current_number {
816 let block = self.canonical_block_by_hash(old_hash)?;
817 old_hash = block.recovered_block().parent_hash();
818 old_chain.push(block);
819 current_canonical_number -= 1;
820 }
821
822 debug_assert_eq!(current_number, current_canonical_number);
824
825 while old_hash != current_hash {
828 let block = self.canonical_block_by_hash(old_hash)?;
829 old_hash = block.recovered_block().parent_hash();
830 old_chain.push(block);
831
832 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
833 {
834 current_hash = block.recovered_block().parent_hash();
835 new_chain.push(block);
836 } else {
837 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
839 return Ok(None)
840 }
841 }
842 new_chain.reverse();
843 old_chain.reverse();
844
845 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
846 }
847
848 fn update_latest_block_to_canonical_ancestor(
860 &mut self,
861 canonical_header: &SealedHeader<N::BlockHeader>,
862 ) -> ProviderResult<()> {
863 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
864 let current_head_number = self.state.tree_state.canonical_block_number();
865 let new_head_number = canonical_header.number();
866 let new_head_hash = canonical_header.hash();
867
868 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
870
871 if new_head_number < current_head_number {
873 debug!(
874 target: "engine::tree",
875 current_head = current_head_number,
876 new_head = new_head_number,
877 new_head_hash = ?new_head_hash,
878 "FCU unwind detected: reverting to canonical ancestor"
879 );
880
881 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
882 } else {
883 debug!(
884 target: "engine::tree",
885 previous_head = current_head_number,
886 new_head = new_head_number,
887 new_head_hash = ?new_head_hash,
888 "Advancing latest block to canonical ancestor"
889 );
890 self.handle_chain_advance_or_same_height(canonical_header)
891 }
892 }
893
894 fn handle_canonical_chain_unwind(
897 &self,
898 current_head_number: u64,
899 canonical_header: &SealedHeader<N::BlockHeader>,
900 ) -> ProviderResult<()> {
901 let new_head_number = canonical_header.number();
902 debug!(
903 target: "engine::tree",
904 from = current_head_number,
905 to = new_head_number,
906 "Handling unwind: collecting blocks to remove from in-memory state"
907 );
908
909 let old_blocks =
911 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
912
913 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
915 }
916
917 fn collect_blocks_for_canonical_unwind(
919 &self,
920 new_head_number: u64,
921 current_head_number: u64,
922 ) -> Vec<ExecutedBlock<N>> {
923 let mut old_blocks =
924 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
925
926 for block_num in (new_head_number + 1)..=current_head_number {
927 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
928 let executed_block = block_state.block_ref().clone();
929 old_blocks.push(executed_block);
930 debug!(
931 target: "engine::tree",
932 block_number = block_num,
933 "Collected block for removal from in-memory state"
934 );
935 }
936 }
937
938 if old_blocks.is_empty() {
939 debug!(
940 target: "engine::tree",
941 "No blocks found in memory to remove, will clear and reset state"
942 );
943 }
944
945 old_blocks
946 }
947
948 fn apply_canonical_ancestor_via_reorg(
950 &self,
951 canonical_header: &SealedHeader<N::BlockHeader>,
952 old_blocks: Vec<ExecutedBlock<N>>,
953 ) -> ProviderResult<()> {
954 let new_head_hash = canonical_header.hash();
955 let new_head_number = canonical_header.number();
956
957 let executed_block = self.canonical_block_by_hash(new_head_hash)?;
959 self.canonical_in_memory_state
961 .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
962
963 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
966
967 debug!(
968 target: "engine::tree",
969 block_number = new_head_number,
970 block_hash = ?new_head_hash,
971 "Successfully loaded canonical ancestor into memory via reorg"
972 );
973
974 Ok(())
975 }
976
977 fn handle_chain_advance_or_same_height(
979 &self,
980 canonical_header: &SealedHeader<N::BlockHeader>,
981 ) -> ProviderResult<()> {
982 self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
984
985 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
987
988 Ok(())
989 }
990
991 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
993 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
995 return Ok(());
996 }
997
998 let executed_block = self.canonical_block_by_hash(block_hash)?;
1000 self.canonical_in_memory_state
1001 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1002
1003 debug!(
1004 target: "engine::tree",
1005 block_number,
1006 block_hash = ?block_hash,
1007 "Added canonical block to in-memory state"
1008 );
1009
1010 Ok(())
1011 }
1012
1013 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1022 fn on_forkchoice_updated(
1023 &mut self,
1024 state: ForkchoiceState,
1025 attrs: Option<T::PayloadAttributes>,
1026 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1027 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1028
1029 self.record_forkchoice_metrics();
1031
1032 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1034 return Ok(TreeOutcome::new(early_result));
1035 }
1036
1037 if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1039 return Ok(result);
1040 }
1041
1042 if let Some(result) = self.apply_chain_update(state, &attrs)? {
1045 return Ok(result);
1046 }
1047
1048 self.handle_missing_block(state)
1050 }
1051
1052 fn record_forkchoice_metrics(&self) {
1054 self.canonical_in_memory_state.on_forkchoice_update_received();
1055 }
1056
1057 fn validate_forkchoice_state(
1062 &mut self,
1063 state: ForkchoiceState,
1064 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1065 if state.head_block_hash.is_zero() {
1066 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1067 }
1068
1069 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1072 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1073 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1074 }
1075
1076 if !self.backfill_sync_state.is_idle() {
1077 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1080 return Ok(Some(OnForkChoiceUpdated::syncing()));
1081 }
1082
1083 Ok(None)
1084 }
1085
1086 fn handle_canonical_head(
1092 &self,
1093 state: ForkchoiceState,
1094 attrs: &Option<T::PayloadAttributes>, ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1096 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1111 return Ok(None);
1112 }
1113
1114 trace!(target: "engine::tree", "fcu head hash is already canonical");
1115
1116 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1118 return Ok(Some(TreeOutcome::new(outcome)));
1120 }
1121
1122 if let Some(attr) = attrs {
1124 let tip = self
1125 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1126 .ok_or_else(|| {
1127 ProviderError::HeaderNotFound(state.head_block_hash.into())
1130 })?;
1131 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1133 return Ok(Some(TreeOutcome::new(updated)));
1134 }
1135
1136 Ok(Some(Self::valid_outcome(state)))
1138 }
1139
1140 fn apply_chain_update(
1152 &mut self,
1153 state: ForkchoiceState,
1154 attrs: &Option<T::PayloadAttributes>,
1155 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1156 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1158 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1159
1160 if self.engine_kind.is_opstack() ||
1163 self.config.always_process_payload_attributes_on_canonical_head()
1164 {
1165 if self.config.unwind_canonical_header() {
1171 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1172 }
1173
1174 if let Some(attr) = attrs {
1175 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1176 let updated =
1178 self.process_payload_attributes(attr.clone(), &canonical_header, state);
1179 return Ok(Some(TreeOutcome::new(updated)));
1180 }
1181 }
1182
1183 return Ok(Some(Self::valid_outcome(state)));
1194 }
1195
1196 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1198 let tip = chain_update.tip().clone_sealed_header();
1199 self.on_canonical_chain_update(chain_update);
1200
1201 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1203 return Ok(Some(TreeOutcome::new(outcome)));
1205 }
1206
1207 if let Some(attr) = attrs {
1208 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1210 return Ok(Some(TreeOutcome::new(updated)));
1211 }
1212
1213 return Ok(Some(Self::valid_outcome(state)));
1214 }
1215
1216 Ok(None)
1217 }
1218
1219 fn handle_missing_block(
1224 &self,
1225 state: ForkchoiceState,
1226 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1227 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1234 !state.safe_block_hash.is_zero() &&
1236 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1237 {
1238 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1239 state.safe_block_hash
1240 } else {
1241 state.head_block_hash
1242 };
1243
1244 let target = self.lowest_buffered_ancestor_or(target);
1245 trace!(target: "engine::tree", %target, "downloading missing block");
1246
1247 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1248 PayloadStatusEnum::Syncing,
1249 )))
1250 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1251 }
1252
1253 fn remove_blocks(&mut self, new_tip_num: u64) {
1256 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1257 if new_tip_num < self.persistence_state.last_persisted_block.number {
1258 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1259 let (tx, rx) = crossbeam_channel::bounded(1);
1260 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1261 self.persistence_state.start_remove(new_tip_num, rx);
1262 }
1263 }
1264
1265 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1268 if blocks_to_persist.is_empty() {
1269 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1270 return
1271 }
1272
1273 let highest_num_hash = blocks_to_persist
1275 .iter()
1276 .max_by_key(|block| block.recovered_block().number())
1277 .map(|b| b.recovered_block().num_hash())
1278 .expect("Checked non-empty persisting blocks");
1279
1280 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1281 let (tx, rx) = crossbeam_channel::bounded(1);
1282 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1283
1284 self.persistence_state.start_save(highest_num_hash, rx);
1285 }
1286
1287 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1292 if !self.persistence_state.in_progress() {
1293 if let Some(new_tip_num) = self.find_disk_reorg()? {
1294 self.remove_blocks(new_tip_num)
1295 } else if self.should_persist() {
1296 let blocks_to_persist =
1297 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1298 self.persist_blocks(blocks_to_persist);
1299 }
1300 }
1301
1302 Ok(())
1303 }
1304
1305 fn finish_termination(
1310 &mut self,
1311 pending_termination: oneshot::Sender<()>,
1312 ) -> Result<(), AdvancePersistenceError> {
1313 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1314 let result = self.persist_until_complete();
1315 let _ = pending_termination.send(());
1316 result
1317 }
1318
1319 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1321 loop {
1322 if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1324 debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1325 let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1326 self.on_persistence_complete(result, start_time)?;
1327 }
1328
1329 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1330
1331 if blocks_to_persist.is_empty() {
1332 debug!(target: "engine::tree", "persistence complete, signaling termination");
1333 return Ok(())
1334 }
1335
1336 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1337 self.persist_blocks(blocks_to_persist);
1338 }
1339 }
1340
1341 #[cfg(test)]
1345 pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1346 let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1347 return Ok(false);
1348 };
1349
1350 match rx.try_recv() {
1351 Ok(result) => {
1352 self.on_persistence_complete(result, start_time)?;
1353 Ok(true)
1354 }
1355 Err(crossbeam_channel::TryRecvError::Empty) => {
1356 self.persistence_state.rx = Some((rx, start_time, action));
1358 Ok(false)
1359 }
1360 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1361 Err(AdvancePersistenceError::ChannelClosed)
1362 }
1363 }
1364 }
1365
1366 fn on_persistence_complete(
1368 &mut self,
1369 result: PersistenceResult,
1370 start_time: Instant,
1371 ) -> Result<(), AdvancePersistenceError> {
1372 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1373
1374 let commit_duration = result.commit_duration;
1375 let Some(BlockNumHash {
1376 hash: last_persisted_block_hash,
1377 number: last_persisted_block_number,
1378 }) = result.last_block
1379 else {
1380 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1382 return Ok(())
1383 };
1384
1385 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1386 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1387
1388 let min_threshold =
1392 last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1393 let eviction_threshold =
1394 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1395 finalized.number.min(min_threshold)
1397 } else {
1398 min_threshold
1400 };
1401 debug!(
1402 target: "engine::tree",
1403 last_persisted = last_persisted_block_number,
1404 finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1405 eviction_threshold,
1406 "Evicting changesets below threshold"
1407 );
1408 self.changeset_cache.evict(eviction_threshold);
1409
1410 self.state.tree_state.invalidate_cached_overlay();
1412
1413 self.on_new_persisted_block()?;
1414
1415 if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1419 self.runtime.spawn_blocking_named("prepare-overlay", move || {
1420 let _ = overlay.get();
1421 });
1422 }
1423
1424 self.purge_timing_stats(last_persisted_block_number, commit_duration);
1425
1426 Ok(())
1427 }
1428
1429 fn on_engine_message(
1433 &mut self,
1434 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1435 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1436 match msg {
1437 FromEngine::Event(event) => match event {
1438 FromOrchestrator::BackfillSyncStarted => {
1439 debug!(target: "engine::tree", "received backfill sync started event");
1440 self.backfill_sync_state = BackfillSyncState::Active;
1441 }
1442 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1443 self.on_backfill_sync_finished(ctrl)?;
1444 }
1445 FromOrchestrator::Terminate { tx } => {
1446 debug!(target: "engine::tree", "received terminate request");
1447 if let Err(err) = self.finish_termination(tx) {
1448 error!(target: "engine::tree", %err, "Termination failed");
1449 }
1450 return Ok(ops::ControlFlow::Break(()))
1451 }
1452 },
1453 FromEngine::Request(request) => {
1454 match request {
1455 EngineApiRequest::InsertExecutedBlock(block) => {
1456 let block_num_hash = block.recovered_block().num_hash();
1457 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1458 return Ok(ops::ControlFlow::Continue(()))
1460 }
1461
1462 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1463 let now = Instant::now();
1464
1465 if self.state.tree_state.canonical_block_hash() ==
1468 block.recovered_block().parent_hash()
1469 {
1470 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1471 self.canonical_in_memory_state.set_pending_block(block.clone());
1472 }
1473
1474 self.state.tree_state.insert_executed(block.clone());
1475 self.payload_validator.on_inserted_executed_block(block.clone());
1476 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1477 self.emit_event(EngineApiEvent::BeaconConsensus(
1478 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1479 ));
1480 }
1481 EngineApiRequest::Beacon(request) => {
1482 match request {
1483 BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1484 let has_attrs = payload_attrs.is_some();
1485
1486 let start = Instant::now();
1487 let mut output = self.on_forkchoice_updated(state, payload_attrs);
1488
1489 if let Ok(res) = &mut output {
1490 self.state
1492 .forkchoice_state_tracker
1493 .set_latest(state, res.outcome.forkchoice_status());
1494
1495 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1497 state,
1498 res.outcome.forkchoice_status(),
1499 ));
1500
1501 self.on_maybe_tree_event(res.event.take())?;
1503 }
1504
1505 if let Err(ref err) = output {
1506 error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1507 }
1508
1509 self.metrics.engine.forkchoice_updated.update_response_metrics(
1510 start,
1511 &mut self.metrics.engine.new_payload.latest_finish_at,
1512 has_attrs,
1513 &output,
1514 );
1515
1516 if let Err(err) =
1517 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1518 {
1519 self.metrics
1520 .engine
1521 .failed_forkchoice_updated_response_deliveries
1522 .increment(1);
1523 warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1524 }
1525 }
1526 BeaconEngineMessage::NewPayload { payload, tx } => {
1527 let start = Instant::now();
1528 let gas_used = payload.gas_used();
1529 let num_hash = payload.num_hash();
1530 let mut output = self.on_new_payload(payload);
1531 self.metrics.engine.new_payload.update_response_metrics(
1532 start,
1533 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1534 &output,
1535 gas_used,
1536 );
1537
1538 let maybe_event =
1539 output.as_mut().ok().and_then(|out| out.event.take());
1540
1541 if let Err(err) =
1543 tx.send(output.map(|o| o.outcome).map_err(|e| {
1544 BeaconOnNewPayloadError::Internal(Box::new(e))
1545 }))
1546 {
1547 warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1548 self.metrics
1549 .engine
1550 .failed_new_payload_response_deliveries
1551 .increment(1);
1552 }
1553
1554 self.on_maybe_tree_event(maybe_event)?;
1556 }
1557 BeaconEngineMessage::RethNewPayload {
1558 payload,
1559 wait_for_persistence,
1560 wait_for_caches,
1561 tx,
1562 } => {
1563 debug!(
1564 target: "engine::tree",
1565 wait_for_persistence,
1566 wait_for_caches,
1567 "Processing reth_newPayload"
1568 );
1569
1570 let persistence_wait = if wait_for_persistence {
1571 let pending_persistence = self.persistence_state.rx.take();
1572 if let Some((rx, start_time, _action)) = pending_persistence {
1573 let (persistence_tx, persistence_rx) =
1574 std::sync::mpsc::channel();
1575 self.runtime.spawn_blocking_named(
1576 "wait-persist",
1577 move || {
1578 let start = Instant::now();
1579 let result = rx
1580 .recv()
1581 .expect("persistence state channel closed");
1582 let _ = persistence_tx.send((
1583 result,
1584 start_time,
1585 start.elapsed(),
1586 ));
1587 },
1588 );
1589 let (result, start_time, wait_duration) = persistence_rx
1590 .recv()
1591 .expect("persistence result channel closed");
1592 let _ = self.on_persistence_complete(result, start_time);
1593 Some(wait_duration)
1594 } else {
1595 Some(Duration::ZERO)
1596 }
1597 } else {
1598 None
1599 };
1600
1601 let cache_wait = wait_for_caches
1602 .then(|| self.payload_validator.wait_for_caches());
1603
1604 let start = Instant::now();
1605 let gas_used = payload.gas_used();
1606 let num_hash = payload.num_hash();
1607 let mut output = self.on_new_payload(payload);
1608 let latency = start.elapsed();
1609 self.metrics.engine.new_payload.update_response_metrics(
1610 start,
1611 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1612 &output,
1613 gas_used,
1614 );
1615
1616 let maybe_event =
1617 output.as_mut().ok().and_then(|out| out.event.take());
1618
1619 let timings = NewPayloadTimings {
1620 latency,
1621 persistence_wait,
1622 execution_cache_wait: cache_wait
1623 .map(|wait| wait.execution_cache),
1624 sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1625 };
1626 if let Err(err) =
1627 tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1628 BeaconOnNewPayloadError::Internal(Box::new(e))
1629 }))
1630 {
1631 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1632 self.metrics
1633 .engine
1634 .failed_new_payload_response_deliveries
1635 .increment(1);
1636 }
1637
1638 self.on_maybe_tree_event(maybe_event)?;
1639 }
1640 }
1641 }
1642 }
1643 }
1644 FromEngine::DownloadedBlocks(blocks) => {
1645 if let Some(event) = self.on_downloaded(blocks)? {
1646 self.on_tree_event(event)?;
1647 }
1648 }
1649 }
1650 Ok(ops::ControlFlow::Continue(()))
1651 }
1652
1653 fn on_backfill_sync_finished(
1667 &mut self,
1668 ctrl: ControlFlow,
1669 ) -> Result<(), InsertBlockFatalError> {
1670 debug!(target: "engine::tree", "received backfill sync finished event");
1671 self.backfill_sync_state = BackfillSyncState::Idle;
1672
1673 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1675 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1676 self.state.invalid_headers.insert(**bad_block);
1678
1679 Some(*target)
1681 } else {
1682 ctrl.block_number()
1684 };
1685
1686 let Some(backfill_height) = backfill_height else { return Ok(()) };
1688
1689 let Some(backfill_num_hash) = self
1695 .provider
1696 .block_hash(backfill_height)?
1697 .map(|hash| BlockNumHash { hash, number: backfill_height })
1698 else {
1699 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1700 return Ok(())
1701 };
1702
1703 if ctrl.is_unwind() {
1704 self.state.tree_state.reset(backfill_num_hash)
1707 } else {
1708 self.state.tree_state.remove_until(
1709 backfill_num_hash,
1710 self.persistence_state.last_persisted_block.hash,
1711 Some(backfill_num_hash),
1712 );
1713 }
1714
1715 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1716 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1717
1718 self.state.buffer.remove_old_blocks(backfill_height);
1720 self.purge_timing_stats(backfill_height, None);
1721 self.canonical_in_memory_state.clear_state();
1724
1725 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1726 self.state.tree_state.set_canonical_head(new_head.num_hash());
1729 self.persistence_state.finish(new_head.hash(), new_head.number());
1730
1731 self.canonical_in_memory_state.set_canonical_head(new_head);
1733 }
1734
1735 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1738 else {
1739 return Ok(())
1740 };
1741 if sync_target_state.finalized_block_hash.is_zero() {
1742 return Ok(())
1744 }
1745 let newest_finalized = self
1747 .state
1748 .buffer
1749 .block(&sync_target_state.finalized_block_hash)
1750 .map(|block| block.number());
1751
1752 if let Some(backfill_target) =
1758 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1759 self.backfill_sync_target(progress, finalized_number, None)
1762 })
1763 {
1764 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1766 backfill_target.into(),
1767 )));
1768 return Ok(())
1769 };
1770
1771 if let Some(lowest_buffered) =
1773 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1774 {
1775 let current_head_num = self.state.tree_state.current_canonical_head.number;
1776 let target_head_num = lowest_buffered.number();
1777
1778 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1779 {
1780 debug!(
1782 target: "engine::tree",
1783 %current_head_num,
1784 %target_head_num,
1785 %distance,
1786 "Backfill complete, downloading remaining blocks to reach FCU target"
1787 );
1788
1789 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1790 lowest_buffered.parent_hash(),
1791 distance,
1792 )));
1793 return Ok(());
1794 }
1795 } else {
1796 debug!(
1799 target: "engine::tree",
1800 head_hash = %sync_target_state.head_block_hash,
1801 "Backfill complete but head block not buffered, requesting download"
1802 );
1803 self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1804 sync_target_state.head_block_hash,
1805 )));
1806 return Ok(());
1807 }
1808
1809 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1811 }
1812
1813 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1817 if let Some(chain_update) = self.on_new_head(target)? {
1818 self.on_canonical_chain_update(chain_update);
1819 }
1820
1821 Ok(())
1822 }
1823
1824 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1826 if let Some(event) = event {
1827 self.on_tree_event(event)?;
1828 }
1829
1830 Ok(())
1831 }
1832
1833 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1837 match event {
1838 TreeEvent::TreeAction(action) => match action {
1839 TreeAction::MakeCanonical { sync_target_head } => {
1840 self.make_canonical(sync_target_head)?;
1841 }
1842 },
1843 TreeEvent::BackfillAction(action) => {
1844 self.emit_event(EngineApiEvent::BackfillAction(action));
1845 }
1846 TreeEvent::Download(action) => {
1847 self.emit_event(EngineApiEvent::Download(action));
1848 }
1849 }
1850
1851 Ok(())
1852 }
1853
1854 fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
1861 let threshold = self.config.slow_block_threshold();
1862 let check_slow = commit_duration.is_some() && threshold.is_some();
1863
1864 let keys_to_remove: Vec<B256> = self
1866 .execution_timing_stats
1867 .iter()
1868 .filter(|(_, stats)| stats.block_number <= below_number)
1869 .map(|(k, _)| *k)
1870 .collect();
1871
1872 for key in keys_to_remove {
1873 let stats = self.execution_timing_stats.remove(&key).expect("key just found");
1874 if check_slow {
1875 let commit_dur = commit_duration.expect("checked above");
1876 let total_duration =
1878 stats.execution_duration + stats.state_hash_duration + commit_dur;
1879
1880 if total_duration > threshold.expect("checked above") {
1881 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
1882 stats,
1883 commit_duration: Some(commit_dur),
1884 total_duration,
1885 }));
1886 }
1887 }
1888 }
1889 }
1890
1891 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1893 let event = event.into();
1894
1895 if event.is_backfill_action() {
1896 debug_assert_eq!(
1897 self.backfill_sync_state,
1898 BackfillSyncState::Idle,
1899 "backfill action should only be emitted when backfill is idle"
1900 );
1901
1902 if self.persistence_state.in_progress() {
1903 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1906 return
1907 }
1908
1909 self.backfill_sync_state = BackfillSyncState::Pending;
1910 self.metrics.engine.pipeline_runs.increment(1);
1911 debug!(target: "engine::tree", "emitting backfill action event");
1912 }
1913
1914 let _ = self.outgoing.send(event).inspect_err(
1915 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1916 );
1917 }
1918
1919 pub const fn should_persist(&self) -> bool {
1923 if !self.backfill_sync_state.is_idle() {
1924 return false
1926 }
1927
1928 let min_block = self.persistence_state.last_persisted_block.number;
1929 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1930 self.config.persistence_threshold()
1931 }
1932
1933 fn get_canonical_blocks_to_persist(
1936 &self,
1937 target: PersistTarget,
1938 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1939 debug_assert!(!self.persistence_state.in_progress());
1942
1943 let mut blocks_to_persist = Vec::new();
1944 let mut current_hash = self.state.tree_state.canonical_block_hash();
1945 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1946 let canonical_head_number = self.state.tree_state.canonical_block_number();
1947
1948 let target_number = match target {
1949 PersistTarget::Head => canonical_head_number,
1950 PersistTarget::Threshold => {
1951 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1952 }
1953 };
1954
1955 debug!(
1956 target: "engine::tree",
1957 ?current_hash,
1958 ?last_persisted_number,
1959 ?canonical_head_number,
1960 ?target_number,
1961 "Returning canonical blocks to persist"
1962 );
1963 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1964 if block.recovered_block().number() <= last_persisted_number {
1965 break;
1966 }
1967
1968 if block.recovered_block().number() <= target_number {
1969 blocks_to_persist.push(block.clone());
1970 }
1971
1972 current_hash = block.recovered_block().parent_hash();
1973 }
1974
1975 blocks_to_persist.reverse();
1977
1978 Ok(blocks_to_persist)
1979 }
1980
1981 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1989 if let Some(remove_above) = self.find_disk_reorg()? {
1992 self.remove_blocks(remove_above);
1993 return Ok(())
1994 }
1995
1996 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1997 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1998 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1999 number: self.persistence_state.last_persisted_block.number,
2000 hash: self.persistence_state.last_persisted_block.hash,
2001 });
2002 Ok(())
2003 }
2004
2005 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2012 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2013 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2014 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2016 return Ok(block.clone())
2017 }
2018
2019 let (block, senders) = self
2020 .provider
2021 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2022 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2023 .split_sealed();
2024 let mut execution_output = self
2025 .provider
2026 .get_state(block.header().number())?
2027 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2028 let hashed_state = self.provider.hashed_post_state(execution_output.state());
2029
2030 debug!(
2031 target: "engine::tree",
2032 number = ?block.number(),
2033 "computing block trie updates",
2034 );
2035 let db_provider = self.provider.database_provider_ro()?;
2036 let trie_updates = reth_trie_db::compute_block_trie_updates(
2037 &self.changeset_cache,
2038 &db_provider,
2039 block.number(),
2040 )?;
2041
2042 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2043 let sorted_trie_updates = Arc::new(trie_updates);
2044 let trie_data =
2046 ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2047
2048 let execution_output = Arc::new(BlockExecutionOutput {
2049 state: execution_output.bundle,
2050 result: BlockExecutionResult {
2051 receipts: execution_output.receipts.pop().unwrap_or_default(),
2052 requests: execution_output.requests.pop().unwrap_or_default(),
2053 gas_used: block.gas_used(),
2054 blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2055 },
2056 });
2057
2058 Ok(ExecutedBlock::new(
2059 Arc::new(RecoveredBlock::new_sealed(block, senders)),
2060 execution_output,
2061 trie_data,
2062 ))
2063 }
2064
2065 fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2069 if self.state.tree_state.contains_hash(&hash) {
2070 Ok(true)
2071 } else {
2072 self.provider.is_known(hash)
2073 }
2074 }
2075
2076 fn sealed_header_by_hash(
2078 &self,
2079 hash: B256,
2080 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2081 let header = self.state.tree_state.sealed_header_by_hash(&hash);
2083
2084 if header.is_some() {
2085 Ok(header)
2086 } else {
2087 self.provider.sealed_header_by_hash(hash)
2088 }
2089 }
2090
2091 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2098 self.state
2099 .buffer
2100 .lowest_ancestor(&hash)
2101 .map(|block| block.parent_hash())
2102 .unwrap_or_else(|| hash)
2103 }
2104
2105 fn latest_valid_hash_for_invalid_payload(
2116 &mut self,
2117 parent_hash: B256,
2118 ) -> ProviderResult<Option<B256>> {
2119 if self.has_block_by_hash(parent_hash)? {
2121 return Ok(Some(parent_hash))
2122 }
2123
2124 let mut current_hash = parent_hash;
2127 let mut current_block = self.state.invalid_headers.get(¤t_hash);
2128 while let Some(block_with_parent) = current_block {
2129 current_hash = block_with_parent.parent;
2130 current_block = self.state.invalid_headers.get(¤t_hash);
2131
2132 if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2135 return Ok(Some(current_hash))
2136 }
2137 }
2138 Ok(None)
2139 }
2140
2141 fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2145 let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2146 Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2150 Some(_) => Some(parent_hash),
2151 None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2152 };
2153
2154 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2155 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2156 })
2157 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2158 }
2159
2160 fn is_sync_target_head(&self, block_hash: B256) -> bool {
2164 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2165 return target.head_block_hash == block_hash
2166 }
2167 false
2168 }
2169
2170 fn is_any_sync_target(&self, block_hash: B256) -> bool {
2174 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2175 return target.contains(block_hash)
2176 }
2177 false
2178 }
2179
2180 fn check_invalid_ancestor_with_head(
2186 &mut self,
2187 check: B256,
2188 head: &SealedBlock<N::Block>,
2189 ) -> ProviderResult<Option<PayloadStatus>> {
2190 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2192
2193 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2194 }
2195
2196 fn on_invalid_new_payload(
2198 &mut self,
2199 head: SealedBlock<N::Block>,
2200 invalid: BlockWithParent,
2201 ) -> ProviderResult<PayloadStatus> {
2202 let status = self.prepare_invalid_response(invalid.parent)?;
2204
2205 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2207 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2208
2209 Ok(status)
2210 }
2211
2212 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2226 let parent_hash = payload.parent_hash();
2227 let block_hash = payload.block_hash();
2228
2229 if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2231 return Some(entry);
2232 }
2233
2234 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2235 if lowest_buffered_ancestor == block_hash {
2236 lowest_buffered_ancestor = parent_hash;
2237 }
2238
2239 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2241 }
2242
2243 fn handle_invalid_ancestor_payload(
2252 &mut self,
2253 payload: T::ExecutionData,
2254 invalid: BlockWithParent,
2255 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2256 let parent_hash = payload.parent_hash();
2257 let num_hash = payload.num_hash();
2258
2259 let block = match self.payload_validator.convert_payload_to_block(payload) {
2265 Ok(block) => block,
2266 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2267 };
2268
2269 Ok(self.on_invalid_new_payload(block, invalid)?)
2270 }
2271
2272 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2275 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2277
2278 match self.prepare_invalid_response(header.parent) {
2280 Ok(status) => Ok(Some(status)),
2281 Err(err) => {
2282 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2283 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2285 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2286 })))
2287 }
2288 }
2289 }
2290
2291 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2294 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2295 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2296 return Err(e)
2297 }
2298
2299 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2300 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2301 return Err(e)
2302 }
2303
2304 Ok(())
2305 }
2306
2307 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2309 fn try_connect_buffered_blocks(
2310 &mut self,
2311 parent: BlockNumHash,
2312 ) -> Result<(), InsertBlockFatalError> {
2313 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2314
2315 if blocks.is_empty() {
2316 return Ok(())
2318 }
2319
2320 let now = Instant::now();
2321 let block_count = blocks.len();
2322 for child in blocks {
2323 let child_num_hash = child.num_hash();
2324 match self.insert_block(child) {
2325 Ok(res) => {
2326 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2327 if self.is_any_sync_target(child_num_hash.hash) &&
2328 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2329 {
2330 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2331 self.make_canonical(child_num_hash.hash)?;
2334 }
2335 }
2336 Err(err) => {
2337 if let InsertPayloadError::Block(err) = err {
2338 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2339 if let Err(fatal) = self.on_insert_block_error(err) {
2340 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2341 return Err(fatal)
2342 }
2343 }
2344 }
2345 }
2346 }
2347
2348 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2349 Ok(())
2350 }
2351
2352 fn buffer_block(
2354 &mut self,
2355 block: SealedBlock<N::Block>,
2356 ) -> Result<(), InsertBlockError<N::Block>> {
2357 if let Err(err) = self.validate_block(&block) {
2358 return Err(InsertBlockError::consensus_error(err, block))
2359 }
2360 self.state.buffer.insert_block(block);
2361 Ok(())
2362 }
2363
2364 #[inline]
2369 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2370 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2371 }
2372
2373 #[inline]
2376 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2377 if block > local_tip {
2378 Some(block - local_tip)
2379 } else {
2380 None
2381 }
2382 }
2383
2384 fn backfill_sync_target(
2391 &self,
2392 canonical_tip_num: u64,
2393 target_block_number: u64,
2394 downloaded_block: Option<BlockNumHash>,
2395 ) -> Option<B256> {
2396 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2397
2398 let exceeds_backfill_threshold =
2400 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2401 (Some(downloaded_block), Some(state))
2403 if downloaded_block.hash == state.finalized_block_hash =>
2404 {
2405 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2406 }
2407 _ => match sync_target_state
2408 .as_ref()
2409 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2410 {
2411 Some(buffered_finalized) => {
2412 self.exceeds_backfill_run_threshold(
2415 canonical_tip_num,
2416 buffered_finalized.number(),
2417 )
2418 }
2419 None => {
2420 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2422 }
2423 },
2424 };
2425
2426 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2428 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2430 Err(err) => {
2431 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2432 }
2433 Ok(None) => {
2434 if !state.finalized_block_hash.is_zero() {
2436 return Some(state.finalized_block_hash)
2439 }
2440
2441 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2454 return Some(state.head_block_hash)
2455 }
2456 Ok(Some(_)) => {
2457 }
2459 }
2460 }
2461
2462 None
2463 }
2464
2465 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2468 let mut canonical = self.state.tree_state.current_canonical_head;
2469 let mut persisted = self.persistence_state.last_persisted_block;
2470
2471 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2472 Ok(self
2473 .sealed_header_by_hash(num_hash.hash)?
2474 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2475 .parent_num_hash())
2476 };
2477
2478 while canonical.number > persisted.number {
2481 canonical = parent_num_hash(canonical)?;
2482 }
2483
2484 if canonical == persisted {
2486 return Ok(None);
2487 }
2488
2489 while persisted.number > canonical.number {
2495 persisted = parent_num_hash(persisted)?;
2496 }
2497
2498 debug_assert_eq!(persisted.number, canonical.number);
2499
2500 while persisted.hash != canonical.hash {
2502 canonical = parent_num_hash(canonical)?;
2503 persisted = parent_num_hash(persisted)?;
2504 }
2505
2506 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2507
2508 Ok(Some(persisted.number))
2509 }
2510
2511 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2515 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2516 let start = Instant::now();
2517
2518 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2520
2521 let tip = chain_update.tip().clone_sealed_header();
2522 let notification = chain_update.to_chain_notification();
2523
2524 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2526 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2527 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2528 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2529
2530 self.update_reorg_metrics(old.len(), old_first);
2531 self.reinsert_reorged_blocks(new.clone());
2532 self.reinsert_reorged_blocks(old.clone());
2533 }
2534
2535 self.canonical_in_memory_state.update_chain(chain_update);
2537 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2538
2539 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2541
2542 self.canonical_in_memory_state.notify_canon_state(notification);
2544
2545 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2547 Box::new(tip),
2548 start.elapsed(),
2549 ));
2550 }
2551
2552 fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2554 if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2555 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2556 first_reorged_block <= finalized.number
2557 {
2558 self.metrics.tree.reorgs.finalized.increment(1);
2559 } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2560 first_reorged_block <= safe.number
2561 {
2562 self.metrics.tree.reorgs.safe.increment(1);
2563 } else {
2564 self.metrics.tree.reorgs.head.increment(1);
2565 }
2566 } else {
2567 debug_unreachable!("Reorged chain doesn't have any blocks");
2568 }
2569 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2570 }
2571
2572 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2574 for block in new_chain {
2575 if self
2576 .state
2577 .tree_state
2578 .executed_block_by_hash(block.recovered_block().hash())
2579 .is_none()
2580 {
2581 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2582 self.state.tree_state.insert_executed(block);
2583 }
2584 }
2585 }
2586
2587 fn on_disconnected_downloaded_block(
2592 &self,
2593 downloaded_block: BlockNumHash,
2594 missing_parent: BlockNumHash,
2595 head: BlockNumHash,
2596 ) -> Option<TreeEvent> {
2597 if let Some(target) =
2599 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2600 {
2601 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2602 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2603 }
2604
2605 let request = if let Some(distance) =
2615 self.distance_from_local_tip(head.number, missing_parent.number)
2616 {
2617 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2618 DownloadRequest::BlockRange(missing_parent.hash, distance)
2619 } else {
2620 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2621 DownloadRequest::single_block(missing_parent.hash)
2624 };
2625
2626 Some(TreeEvent::Download(request))
2627 }
2628
2629 fn on_valid_downloaded_block(
2636 &mut self,
2637 block_num_hash: BlockNumHash,
2638 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2639 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2642 sync_target.contains(block_num_hash.hash)
2643 {
2644 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2645
2646 if sync_target.head_block_hash == block_num_hash.hash {
2647 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2649 sync_target_head: block_num_hash.hash,
2650 })))
2651 }
2652
2653 self.make_canonical(block_num_hash.hash)?;
2657 self.try_connect_buffered_blocks(block_num_hash)?;
2658
2659 if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2662 let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2663 trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2664 return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2665 }
2666
2667 return Ok(None)
2668 }
2669 trace!(target: "engine::tree", "appended downloaded block");
2670 self.try_connect_buffered_blocks(block_num_hash)?;
2671 Ok(None)
2672 }
2673
2674 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2680 fn on_downloaded_block(
2681 &mut self,
2682 block: SealedBlock<N::Block>,
2683 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2684 let block_num_hash = block.num_hash();
2685 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2686 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2687 return Ok(None)
2688 }
2689
2690 if !self.backfill_sync_state.is_idle() {
2691 return Ok(None)
2692 }
2693
2694 match self.insert_block(block) {
2696 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2697 return self.on_valid_downloaded_block(block_num_hash);
2698 }
2699 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2700 return Ok(self.on_disconnected_downloaded_block(
2703 block_num_hash,
2704 missing_ancestor,
2705 head,
2706 ))
2707 }
2708 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2709 trace!(target: "engine::tree", "downloaded block already executed");
2710 }
2711 Err(err) => {
2712 if let InsertPayloadError::Block(err) = err {
2713 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2714 if let Err(fatal) = self.on_insert_block_error(err) {
2715 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2716 return Err(fatal)
2717 }
2718 }
2719 }
2720 }
2721 Ok(None)
2722 }
2723
2724 fn insert_payload(
2733 &mut self,
2734 payload: T::ExecutionData,
2735 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2736 self.insert_block_or_payload(
2737 payload.block_with_parent(),
2738 payload,
2739 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2740 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2741 )
2742 }
2743
2744 fn insert_block(
2745 &mut self,
2746 block: SealedBlock<N::Block>,
2747 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2748 self.insert_block_or_payload(
2749 block.block_with_parent(),
2750 block,
2751 |validator, block, ctx| validator.validate_block(block, ctx),
2752 |_, block| Ok(block),
2753 )
2754 }
2755
2756 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2773 fn insert_block_or_payload<Input, Err>(
2774 &mut self,
2775 block_id: BlockWithParent,
2776 input: Input,
2777 execute: impl FnOnce(
2778 &mut V,
2779 Input,
2780 TreeCtx<'_, N>,
2781 )
2782 -> Result<(ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>), Err>,
2783 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2784 ) -> Result<InsertPayloadOk, Err>
2785 where
2786 Err: From<InsertBlockError<N::Block>>,
2787 {
2788 let block_insert_start = Instant::now();
2789 let block_num_hash = block_id.block;
2790 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2791
2792 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2794 convert_to_block(self, input)?;
2795 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2796 }
2797
2798 if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2801 match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2802 Err(err) => {
2803 let block = convert_to_block(self, input)?;
2804 return Err(InsertBlockError::new(block, err.into()).into());
2805 }
2806 Ok(Some(_)) => {
2807 convert_to_block(self, input)?;
2808 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2809 }
2810 Ok(None) => {}
2811 }
2812 }
2813
2814 match self.state_provider_builder(block_id.parent) {
2816 Err(err) => {
2817 let block = convert_to_block(self, input)?;
2818 return Err(InsertBlockError::new(block, err.into()).into());
2819 }
2820 Ok(None) => {
2821 let block = convert_to_block(self, input)?;
2822
2823 let missing_ancestor = self
2826 .state
2827 .buffer
2828 .lowest_ancestor(&block.parent_hash())
2829 .map(|block| block.parent_num_hash())
2830 .unwrap_or_else(|| block.parent_num_hash());
2831
2832 self.state.buffer.insert_block(block);
2833
2834 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2835 head: self.state.tree_state.current_canonical_head,
2836 missing_ancestor,
2837 }))
2838 }
2839 Ok(Some(_)) => {}
2840 }
2841
2842 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2847
2848 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2849
2850 let start = Instant::now();
2851
2852 let (executed, timing_stats) = execute(&mut self.payload_validator, input, ctx)?;
2853
2854 if let Some(stats) = timing_stats {
2857 if let Some(threshold) = self.config.slow_block_threshold() {
2858 let total_duration = stats.execution_duration + stats.state_hash_duration;
2859 if total_duration > threshold {
2860 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2861 stats: stats.clone(),
2862 commit_duration: None,
2863 total_duration,
2864 }));
2865 }
2866 }
2867 self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
2868 }
2869
2870 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2872 {
2873 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2874 self.canonical_in_memory_state.set_pending_block(executed.clone());
2875 }
2876
2877 self.state.tree_state.insert_executed(executed.clone());
2878 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2879
2880 let elapsed = start.elapsed();
2882 let engine_event = if is_fork {
2883 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2884 } else {
2885 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2886 };
2887 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2888
2889 self.metrics
2890 .engine
2891 .block_insert_total_duration
2892 .record(block_insert_start.elapsed().as_secs_f64());
2893 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2894 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2895 }
2896
2897 fn on_insert_block_error(
2903 &mut self,
2904 error: InsertBlockError<N::Block>,
2905 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2906 let (block, error) = error.split();
2907
2908 let validation_err = error.ensure_validation_error()?;
2911
2912 warn!(
2916 target: "engine::tree",
2917 invalid_hash=%block.hash(),
2918 invalid_number=block.number(),
2919 %validation_err,
2920 "Invalid block error on new payload",
2921 );
2922 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2923
2924 self.state.invalid_headers.insert(block.block_with_parent());
2926 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2927 Box::new(block),
2928 )));
2929
2930 Ok(PayloadStatus::new(
2931 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2932 latest_valid_hash,
2933 ))
2934 }
2935
2936 fn on_new_payload_error(
2938 &mut self,
2939 error: NewPayloadError,
2940 payload_num_hash: NumHash,
2941 parent_hash: B256,
2942 ) -> ProviderResult<PayloadStatus> {
2943 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2944 let latest_valid_hash =
2947 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2948 None
2952 } else {
2953 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2954 };
2955
2956 let status = PayloadStatusEnum::from(error);
2957 Ok(PayloadStatus::new(status, latest_valid_hash))
2958 }
2959
2960 pub fn find_canonical_header(
2962 &self,
2963 hash: B256,
2964 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2965 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2966
2967 if canonical.is_none() {
2968 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2969 }
2970
2971 Ok(canonical)
2972 }
2973
2974 fn update_finalized_block(
2976 &self,
2977 finalized_block_hash: B256,
2978 ) -> Result<(), OnForkChoiceUpdated> {
2979 if finalized_block_hash.is_zero() {
2980 return Ok(())
2981 }
2982
2983 match self.find_canonical_header(finalized_block_hash) {
2984 Ok(None) => {
2985 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2986 return Err(OnForkChoiceUpdated::invalid_state())
2988 }
2989 Ok(Some(finalized)) => {
2990 if Some(finalized.num_hash()) !=
2991 self.canonical_in_memory_state.get_finalized_num_hash()
2992 {
2993 let _ = self.persistence.save_finalized_block_number(finalized.number());
2996 self.canonical_in_memory_state.set_finalized(finalized.clone());
2997 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2999 }
3000 }
3001 Err(err) => {
3002 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3003 }
3004 }
3005
3006 Ok(())
3007 }
3008
3009 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3011 if safe_block_hash.is_zero() {
3012 return Ok(())
3013 }
3014
3015 match self.find_canonical_header(safe_block_hash) {
3016 Ok(None) => {
3017 debug!(target: "engine::tree", "Safe block not found in canonical chain");
3018 return Err(OnForkChoiceUpdated::invalid_state())
3020 }
3021 Ok(Some(safe)) => {
3022 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3023 let _ = self.persistence.save_safe_block_number(safe.number());
3026 self.canonical_in_memory_state.set_safe(safe.clone());
3027 self.metrics.tree.safe_block_height.set(safe.number() as f64);
3029 }
3030 }
3031 Err(err) => {
3032 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3033 }
3034 }
3035
3036 Ok(())
3037 }
3038
3039 fn ensure_consistent_forkchoice_state(
3048 &self,
3049 state: ForkchoiceState,
3050 ) -> Result<(), OnForkChoiceUpdated> {
3051 self.update_finalized_block(state.finalized_block_hash)?;
3057
3058 self.update_safe_block(state.safe_block_hash)
3064 }
3065
3066 fn process_payload_attributes(
3081 &self,
3082 attrs: T::PayloadAttributes,
3083 head: &N::BlockHeader,
3084 state: ForkchoiceState,
3085 ) -> OnForkChoiceUpdated {
3086 if let Err(err) =
3087 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
3088 {
3089 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3090 return OnForkChoiceUpdated::invalid_payload_attributes()
3091 }
3092
3093 let pending_payload_id =
3101 self.payload_builder.send_new_payload(state.head_block_hash, attrs);
3102
3103 OnForkChoiceUpdated::updated_with_pending_payload_id(
3115 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3116 pending_payload_id,
3117 )
3118 }
3119
3120 pub(crate) fn remove_before(
3127 &mut self,
3128 upper_bound: BlockNumHash,
3129 finalized_hash: Option<B256>,
3130 ) -> ProviderResult<()> {
3131 let num = if let Some(hash) = finalized_hash {
3134 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3135 } else {
3136 None
3137 };
3138
3139 self.state.tree_state.remove_until(
3140 upper_bound,
3141 self.persistence_state.last_persisted_block.hash,
3142 num,
3143 );
3144 Ok(())
3145 }
3146
3147 pub fn state_provider_builder(
3152 &self,
3153 hash: B256,
3154 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3155 where
3156 P: BlockReader + StateProviderFactory + StateReader + Clone,
3157 {
3158 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3159 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3160 return Ok(Some(StateProviderBuilder::new(
3162 self.provider.clone(),
3163 historical,
3164 Some(blocks),
3165 )))
3166 }
3167
3168 if let Some(header) = self.provider.header(hash)? {
3170 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3171 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3174 }
3175
3176 debug!(target: "engine::tree", %hash, "no canonical state found for block");
3177 Ok(None)
3178 }
3179}
3180
3181#[derive(Debug)]
3183enum LoopEvent<T, N>
3184where
3185 N: NodePrimitives,
3186 T: PayloadTypes,
3187{
3188 EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3190 PersistenceComplete {
3192 result: PersistenceResult,
3194 start_time: Instant,
3196 },
3197 Disconnected,
3199}
3200
3201#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3207pub enum BlockStatus {
3208 Valid,
3210 Disconnected {
3212 head: BlockNumHash,
3214 missing_ancestor: BlockNumHash,
3216 },
3217}
3218
3219#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3224pub enum InsertPayloadOk {
3225 AlreadySeen(BlockStatus),
3227 Inserted(BlockStatus),
3229}
3230
3231#[derive(Debug, Clone, Copy)]
3233enum PersistTarget {
3234 Threshold,
3236 Head,
3238}
3239
3240#[derive(Debug, Clone, Copy, Default)]
3242pub struct CacheWaitDurations {
3243 pub execution_cache: Duration,
3245 pub sparse_trie: Duration,
3247}
3248
3249pub trait WaitForCaches {
3254 fn wait_for_caches(&self) -> CacheWaitDurations;
3258}