1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::{map::B256Map, B256};
11use alloy_rpc_types_engine::{
12 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError, InsertBlockValidationError};
15use reth_chain_state::{
16 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17 MemoryOverlayStateProvider, NewCanonicalChain, StateTrieOverlayManager,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22 ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29 FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32 BalProvider, BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33 DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34 StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35 StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority, WorkerPool};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48 oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64pub mod types;
65
66use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
67pub use block_buffer::BlockBuffer;
68pub use invalid_headers::InvalidHeaderCache;
69pub use metrics::EngineApiMetrics;
70pub use payload_processor::*;
71pub use payload_validator::{BasicEngineValidator, EngineValidator};
72pub use persistence_state::PersistenceState;
73pub use reth_engine_primitives::TreeConfig;
74pub use reth_execution_cache::{
75 CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider,
76 ExecutionCache, PayloadExecutionCache, SavedCache,
77};
78pub use types::{ValidationOutcome, ValidationOutput};
79
80pub mod state;
81
82pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
92
93const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
98
99#[derive(Clone, Debug)]
101pub struct StateProviderBuilder<N: NodePrimitives, P> {
102 provider_factory: P,
104 historical: B256,
106 overlay: Option<Vec<ExecutedBlock<N>>>,
108}
109
110impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
111 pub const fn new(
114 provider_factory: P,
115 historical: B256,
116 overlay: Option<Vec<ExecutedBlock<N>>>,
117 ) -> Self {
118 Self { provider_factory, historical, overlay }
119 }
120}
121
122impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
123where
124 P: BlockReader + StateProviderFactory + StateReader + Clone,
125{
126 pub fn build(&self) -> ProviderResult<StateProviderBox> {
128 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
129 if let Some(overlay) = self.overlay.clone() {
130 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
131 }
132 Ok(provider)
133 }
134}
135
136#[derive(Debug)]
140pub struct EngineApiTreeState<N: NodePrimitives> {
141 tree_state: TreeState<N>,
143 forkchoice_state_tracker: ForkchoiceStateTracker,
145 buffer: BlockBuffer<N::Block>,
147 invalid_headers: InvalidHeaderCache,
150}
151
152impl<N: NodePrimitives> EngineApiTreeState<N> {
153 fn new(
154 block_buffer_limit: u32,
155 max_invalid_header_cache_length: u32,
156 invalid_header_hit_eviction_threshold: u8,
157 canonical_block: BlockNumHash,
158 engine_kind: EngineApiKind,
159 state_trie_overlay_worker_pool: Arc<WorkerPool>,
160 ) -> Self {
161 Self {
162 invalid_headers: InvalidHeaderCache::new(
163 max_invalid_header_cache_length,
164 invalid_header_hit_eviction_threshold,
165 ),
166 buffer: BlockBuffer::new(block_buffer_limit),
167 tree_state: TreeState::new(
168 canonical_block,
169 engine_kind,
170 StateTrieOverlayManager::new(state_trie_overlay_worker_pool),
171 ),
172 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
173 }
174 }
175
176 pub const fn tree_state(&self) -> &TreeState<N> {
178 &self.tree_state
179 }
180
181 pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
183 self.invalid_headers.get(hash).is_some()
184 }
185}
186
187#[derive(Debug)]
189pub struct TreeOutcome<T> {
190 pub outcome: T,
192 pub event: Option<TreeEvent>,
194 pub already_seen: bool,
197}
198
199impl<T> TreeOutcome<T> {
200 pub const fn new(outcome: T) -> Self {
202 Self { outcome, event: None, already_seen: false }
203 }
204
205 pub fn with_event(mut self, event: TreeEvent) -> Self {
207 self.event = Some(event);
208 self
209 }
210
211 pub const fn with_already_seen(mut self, value: bool) -> Self {
213 self.already_seen = value;
214 self
215 }
216}
217
218#[derive(Debug)]
220pub struct TryInsertPayloadResult {
221 pub status: PayloadStatus,
225 pub already_seen: bool,
227}
228
229impl TryInsertPayloadResult {
230 #[inline]
232 pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
233 TreeOutcome::new(self.status).with_already_seen(self.already_seen)
234 }
235}
236
237#[derive(Debug)]
239pub enum TreeEvent {
240 TreeAction(TreeAction),
242 BackfillAction(BackfillAction),
244 Download(DownloadRequest),
246}
247
248impl TreeEvent {
249 const fn is_backfill_action(&self) -> bool {
251 matches!(self, Self::BackfillAction(_))
252 }
253}
254
255#[derive(Debug)]
257pub enum TreeAction {
258 MakeCanonical {
260 sync_target_head: B256,
262 },
263}
264
265pub struct EngineApiTreeHandler<N, P, T, V, C>
270where
271 N: NodePrimitives,
272 T: PayloadTypes,
273 C: ConfigureEvm<Primitives = N> + 'static,
274{
275 provider: P,
276 consensus: Arc<dyn FullConsensus<N>>,
277 payload_validator: V,
278 state: EngineApiTreeState<N>,
280 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
289 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
291 outgoing: UnboundedSender<EngineApiEvent<N>>,
293 persistence: PersistenceHandle<N>,
295 persistence_state: PersistenceState,
297 backfill_sync_state: BackfillSyncState,
299 canonical_in_memory_state: CanonicalInMemoryState<N>,
302 payload_builder: PayloadBuilderHandle<T>,
305 config: TreeConfig,
307 metrics: EngineApiMetrics,
309 engine_kind: EngineApiKind,
311 evm_config: C,
313 changeset_cache: ChangesetCache,
315 execution_timing_stats: B256Map<Box<ExecutionTimingStats>>,
319 building_payload: bool,
322 runtime: reth_tasks::Runtime,
324}
325
326impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
327 for EngineApiTreeHandler<N, P, T, V, C>
328where
329 N: NodePrimitives,
330 C: Debug + ConfigureEvm<Primitives = N>,
331{
332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 f.debug_struct("EngineApiTreeHandler")
334 .field("provider", &self.provider)
335 .field("consensus", &self.consensus)
336 .field("payload_validator", &self.payload_validator)
337 .field("state", &self.state)
338 .field("incoming_tx", &self.incoming_tx)
339 .field("persistence", &self.persistence)
340 .field("persistence_state", &self.persistence_state)
341 .field("backfill_sync_state", &self.backfill_sync_state)
342 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
343 .field("payload_builder", &self.payload_builder)
344 .field("config", &self.config)
345 .field("metrics", &self.metrics)
346 .field("engine_kind", &self.engine_kind)
347 .field("evm_config", &self.evm_config)
348 .field("changeset_cache", &self.changeset_cache)
349 .field("execution_timing_stats", &self.execution_timing_stats.len())
350 .field("runtime", &self.runtime)
351 .finish()
352 }
353}
354
355impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
356where
357 N: NodePrimitives,
358 P: DatabaseProviderFactory
359 + BlockReader<Block = N::Block, Header = N::BlockHeader>
360 + StateProviderFactory
361 + StateReader<Receipt = N::Receipt>
362 + HashedPostStateProvider
363 + BalProvider
364 + Clone
365 + 'static,
366 P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
367 + StageCheckpointReader
368 + ChangeSetReader
369 + StorageChangeSetReader
370 + StorageSettingsCache,
371 C: ConfigureEvm<Primitives = N> + 'static,
372 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
373 V: EngineValidator<T> + WaitForCaches,
374{
375 #[expect(clippy::too_many_arguments)]
377 pub fn new(
378 provider: P,
379 consensus: Arc<dyn FullConsensus<N>>,
380 payload_validator: V,
381 outgoing: UnboundedSender<EngineApiEvent<N>>,
382 state: EngineApiTreeState<N>,
383 canonical_in_memory_state: CanonicalInMemoryState<N>,
384 persistence: PersistenceHandle<N>,
385 persistence_state: PersistenceState,
386 payload_builder: PayloadBuilderHandle<T>,
387 config: TreeConfig,
388 engine_kind: EngineApiKind,
389 evm_config: C,
390 changeset_cache: ChangesetCache,
391 runtime: reth_tasks::Runtime,
392 ) -> Self {
393 let (incoming_tx, incoming) = crossbeam_channel::unbounded();
394
395 Self {
396 provider,
397 consensus,
398 payload_validator,
399 incoming,
400 outgoing,
401 persistence,
402 persistence_state,
403 backfill_sync_state: BackfillSyncState::Idle,
404 state,
405 canonical_in_memory_state,
406 payload_builder,
407 config,
408 metrics: Default::default(),
409 incoming_tx,
410 engine_kind,
411 evm_config,
412 changeset_cache,
413 execution_timing_stats: B256Map::default(),
414 building_payload: false,
415 runtime,
416 }
417 }
418
419 #[expect(clippy::complexity)]
425 pub fn spawn_new(
426 provider: P,
427 consensus: Arc<dyn FullConsensus<N>>,
428 payload_validator: V,
429 persistence: PersistenceHandle<N>,
430 payload_builder: PayloadBuilderHandle<T>,
431 canonical_in_memory_state: CanonicalInMemoryState<N>,
432 config: TreeConfig,
433 kind: EngineApiKind,
434 evm_config: C,
435 changeset_cache: ChangesetCache,
436 runtime: reth_tasks::Runtime,
437 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
438 {
439 let best_block_number = provider.best_block_number().unwrap_or(0);
440 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
441
442 let persistence_state = PersistenceState {
443 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
444 rx: None,
445 };
446
447 let (tx, outgoing) = unbounded_channel();
448 let state = EngineApiTreeState::new(
449 config.block_buffer_limit(),
450 config.max_invalid_header_cache_length(),
451 config.invalid_header_hit_eviction_threshold(),
452 header.num_hash(),
453 kind,
454 runtime.state_trie_overlay_worker_pool(),
455 );
456
457 let task = Self::new(
458 provider,
459 consensus,
460 payload_validator,
461 tx,
462 state,
463 canonical_in_memory_state,
464 persistence,
465 persistence_state,
466 payload_builder,
467 config,
468 kind,
469 evm_config,
470 changeset_cache,
471 runtime,
472 );
473 let incoming = task.incoming_tx.clone();
474 spawn_os_thread("engine", || {
475 increase_thread_priority();
476 task.run()
477 });
478 (incoming, outgoing)
479 }
480
481 fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
483 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
484 PayloadStatusEnum::Valid,
485 Some(state.head_block_hash),
486 )))
487 }
488
489 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
491 self.incoming_tx.clone()
492 }
493
494 const fn persistence_gap(&self) -> u64 {
497 self.state
498 .tree_state
499 .canonical_block_number()
500 .saturating_sub(self.persistence_state.last_persisted_block.number)
501 }
502
503 const fn should_backpressure(&self) -> bool {
508 self.persistence_state.in_progress() &&
509 self.persistence_gap() >= self.config.persistence_backpressure_threshold()
510 }
511
512 pub fn run(mut self) {
516 loop {
517 match self.try_poll_persistence() {
541 Ok(true) => {
542 if let Err(err) = self.advance_persistence() {
543 error!(target: "engine::tree", %err, "Advancing persistence failed");
544 return
545 }
546 continue;
547 }
548 Ok(false) => {}
549 Err(err) => {
550 error!(target: "engine::tree", %err, "Polling persistence failed");
551 return
552 }
553 }
554
555 let event = if self.should_backpressure() {
556 self.metrics.engine.backpressure_active.set(1.0);
557 let stall_start = Instant::now();
558 let event = self.wait_for_persistence_event();
559 self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
560 event
561 } else {
562 self.metrics.engine.backpressure_active.set(0.0);
563 self.wait_for_event()
564 };
565
566 match event {
567 LoopEvent::EngineMessage(msg) => {
568 debug!(target: "engine::tree", %msg, "received new engine message");
569 match self.on_engine_message(msg) {
570 Ok(ops::ControlFlow::Break(())) => return,
571 Ok(ops::ControlFlow::Continue(())) => {}
572 Err(fatal) => {
573 error!(target: "engine::tree", %fatal, "insert block fatal error");
574 return
575 }
576 }
577 }
578 LoopEvent::PersistenceComplete { result, start_time } => {
579 if let Err(err) = self.on_persistence_complete(result, start_time) {
580 error!(target: "engine::tree", %err, "Persistence complete handling failed");
581 return
582 }
583 }
584 LoopEvent::Disconnected => {
585 error!(target: "engine::tree", "Channel disconnected");
586 return
587 }
588 }
589
590 if let Err(err) = self.advance_persistence() {
595 error!(target: "engine::tree", %err, "Advancing persistence failed");
596 return
597 }
598 }
599 }
600
601 fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
607 let maybe_persistence = self.persistence_state.rx.take();
608
609 if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
610 match persistence_rx.recv() {
611 Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
612 Err(_) => LoopEvent::Disconnected,
613 }
614 } else {
615 self.wait_for_event()
616 }
617 }
618
619 fn wait_for_event(&mut self) -> LoopEvent<T, N> {
625 let maybe_persistence = self.persistence_state.rx.take();
627
628 if let Some((persistence_rx, start_time, action)) = maybe_persistence {
629 crossbeam_channel::select_biased! {
632 recv(persistence_rx) -> result => {
633 match result {
635 Ok(result) => LoopEvent::PersistenceComplete {
636 result,
637 start_time,
638 },
639 Err(_) => LoopEvent::Disconnected,
640 }
641 },
642 recv(self.incoming) -> msg => {
643 self.persistence_state.rx = Some((persistence_rx, start_time, action));
645 match msg {
646 Ok(m) => LoopEvent::EngineMessage(m),
647 Err(_) => LoopEvent::Disconnected,
648 }
649 },
650 }
651 } else {
652 match self.incoming.recv() {
654 Ok(m) => LoopEvent::EngineMessage(m),
655 Err(_) => LoopEvent::Disconnected,
656 }
657 }
658 }
659
660 fn on_downloaded(
666 &mut self,
667 mut blocks: Vec<SealedBlock<N::Block>>,
668 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
669 if blocks.is_empty() {
670 return Ok(None)
672 }
673
674 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
675 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
676 for block in blocks.drain(..batch) {
677 if let Some(event) = self.on_downloaded_block(block)? {
678 let needs_backfill = event.is_backfill_action();
679 self.on_tree_event(event)?;
680 if needs_backfill {
681 return Ok(None)
683 }
684 }
685 }
686
687 if !blocks.is_empty() {
689 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
690 }
691
692 Ok(None)
693 }
694
695 #[instrument(
710 level = "debug",
711 target = "engine::tree",
712 skip_all,
713 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
714 )]
715 fn on_new_payload(
716 &mut self,
717 payload: T::ExecutionData,
718 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
719 trace!(target: "engine::tree", "invoked new payload");
720
721 let start = Instant::now();
723
724 let num_hash = payload.num_hash();
751 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
752 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
753
754 let block_hash = num_hash.hash;
755
756 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
758 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
759 return Ok(TreeOutcome::new(status));
760 }
761
762 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
764
765 let mut outcome = if self.backfill_sync_state.is_idle() {
766 self.try_insert_payload(payload)?.into_outcome()
767 } else {
768 TreeOutcome::new(self.try_buffer_payload(payload)?)
769 };
770
771 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
773 if self.state.tree_state.canonical_block_hash() != block_hash {
775 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
776 sync_target_head: block_hash,
777 }));
778 }
779 }
780
781 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
783
784 Ok(outcome)
785 }
786
787 #[instrument(level = "debug", target = "engine::tree", skip_all)]
789 fn try_insert_payload(
790 &mut self,
791 payload: T::ExecutionData,
792 ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
793 let block_hash = payload.block_hash();
794 let num_hash = payload.num_hash();
795 let parent_hash = payload.parent_hash();
796 let mut latest_valid_hash = None;
797
798 match self.insert_payload(payload) {
799 Ok(status) => {
800 let (status, already_seen) = match status {
801 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
802 latest_valid_hash = Some(block_hash);
803 self.try_connect_buffered_blocks(num_hash)?;
804 (PayloadStatusEnum::Valid, false)
805 }
806 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
807 latest_valid_hash = Some(block_hash);
808 (PayloadStatusEnum::Valid, true)
809 }
810 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
811 (PayloadStatusEnum::Syncing, false)
812 }
813 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
814 (PayloadStatusEnum::Syncing, true)
816 }
817 };
818
819 Ok(TryInsertPayloadResult {
820 status: PayloadStatus::new(status, latest_valid_hash),
821 already_seen,
822 })
823 }
824 Err(error) => {
825 let status = match error {
826 InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
827 InsertPayloadError::Payload(error) => {
828 self.on_new_payload_error(error, num_hash, parent_hash)?
829 }
830 };
831
832 Ok(TryInsertPayloadResult { status, already_seen: false })
833 }
834 }
835 }
836
837 fn try_buffer_payload(
846 &mut self,
847 payload: T::ExecutionData,
848 ) -> Result<PayloadStatus, InsertBlockFatalError> {
849 let parent_hash = payload.parent_hash();
850 let num_hash = payload.num_hash();
851
852 match self.payload_validator.convert_payload_to_block(payload) {
853 Ok(block) => {
855 if let Err(error) = self.buffer_block(block) {
856 Ok(self.on_insert_block_error(error)?)
857 } else {
858 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
859 }
860 }
861 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
862 }
863 }
864
865 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
872 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
874 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
875 self.metrics.engine.executed_new_block_cache_miss.increment(1);
876 return Ok(None)
877 };
878
879 let new_head_number = new_head_block.recovered_block().number();
880 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
881
882 let mut new_chain = vec![new_head_block.clone()];
883 let mut current_hash = new_head_block.recovered_block().parent_hash();
884 let mut current_number = new_head_number - 1;
885
886 while current_number > current_canonical_number {
891 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
892 {
893 current_hash = block.recovered_block().parent_hash();
894 current_number -= 1;
895 new_chain.push(block);
896 } else {
897 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
898 return Ok(None)
901 }
902 }
903
904 if current_hash == self.state.tree_state.current_canonical_head.hash {
907 new_chain.reverse();
908
909 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
911 }
912
913 let mut old_chain = Vec::new();
915 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
916
917 while current_canonical_number > current_number {
920 let block = self.canonical_block_by_hash(old_hash)?;
921 old_hash = block.recovered_block().parent_hash();
922 old_chain.push(block);
923 current_canonical_number -= 1;
924 }
925
926 debug_assert_eq!(current_number, current_canonical_number);
928
929 while old_hash != current_hash {
932 let block = self.canonical_block_by_hash(old_hash)?;
933 old_hash = block.recovered_block().parent_hash();
934 old_chain.push(block);
935
936 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
937 {
938 current_hash = block.recovered_block().parent_hash();
939 new_chain.push(block);
940 } else {
941 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
943 return Ok(None)
944 }
945 }
946 new_chain.reverse();
947 old_chain.reverse();
948
949 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
950 }
951
952 fn update_latest_block_to_canonical_ancestor(
964 &mut self,
965 canonical_header: &SealedHeader<N::BlockHeader>,
966 ) -> ProviderResult<()> {
967 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
968 let current_head_number = self.state.tree_state.canonical_block_number();
969 let new_head_number = canonical_header.number();
970 let new_head_hash = canonical_header.hash();
971
972 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
974
975 if new_head_number < current_head_number {
977 debug!(
978 target: "engine::tree",
979 current_head = current_head_number,
980 new_head = new_head_number,
981 new_head_hash = ?new_head_hash,
982 "FCU unwind detected: reverting to canonical ancestor"
983 );
984
985 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
986 } else {
987 debug!(
988 target: "engine::tree",
989 previous_head = current_head_number,
990 new_head = new_head_number,
991 new_head_hash = ?new_head_hash,
992 "Advancing latest block to canonical ancestor"
993 );
994 self.handle_chain_advance_or_same_height(canonical_header)
995 }
996 }
997
998 fn handle_canonical_chain_unwind(
1001 &self,
1002 current_head_number: u64,
1003 canonical_header: &SealedHeader<N::BlockHeader>,
1004 ) -> ProviderResult<()> {
1005 let new_head_number = canonical_header.number();
1006 debug!(
1007 target: "engine::tree",
1008 from = current_head_number,
1009 to = new_head_number,
1010 "Handling unwind: collecting blocks to remove from in-memory state"
1011 );
1012
1013 let old_blocks =
1015 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
1016
1017 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1019 }
1020
1021 fn collect_blocks_for_canonical_unwind(
1023 &self,
1024 new_head_number: u64,
1025 current_head_number: u64,
1026 ) -> Vec<ExecutedBlock<N>> {
1027 let mut old_blocks =
1028 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1029
1030 for block_num in (new_head_number + 1)..=current_head_number {
1031 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1032 let executed_block = block_state.block_ref().clone();
1033 old_blocks.push(executed_block);
1034 debug!(
1035 target: "engine::tree",
1036 block_number = block_num,
1037 "Collected block for removal from in-memory state"
1038 );
1039 }
1040 }
1041
1042 if old_blocks.is_empty() {
1043 debug!(
1044 target: "engine::tree",
1045 "No blocks found in memory to remove, will clear and reset state"
1046 );
1047 }
1048
1049 old_blocks
1050 }
1051
1052 fn apply_canonical_ancestor_via_reorg(
1054 &self,
1055 canonical_header: &SealedHeader<N::BlockHeader>,
1056 old_blocks: Vec<ExecutedBlock<N>>,
1057 ) -> ProviderResult<()> {
1058 let new_head_hash = canonical_header.hash();
1059 let new_head_number = canonical_header.number();
1060
1061 let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1063 self.canonical_in_memory_state
1065 .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1066
1067 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1070
1071 debug!(
1072 target: "engine::tree",
1073 block_number = new_head_number,
1074 block_hash = ?new_head_hash,
1075 "Successfully loaded canonical ancestor into memory via reorg"
1076 );
1077
1078 Ok(())
1079 }
1080
1081 fn handle_chain_advance_or_same_height(
1083 &self,
1084 canonical_header: &SealedHeader<N::BlockHeader>,
1085 ) -> ProviderResult<()> {
1086 self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1088
1089 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1091
1092 Ok(())
1093 }
1094
1095 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1097 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1099 return Ok(());
1100 }
1101
1102 let executed_block = self.canonical_block_by_hash(block_hash)?;
1104 self.canonical_in_memory_state
1105 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1106
1107 debug!(
1108 target: "engine::tree",
1109 block_number,
1110 block_hash = ?block_hash,
1111 "Added canonical block to in-memory state"
1112 );
1113
1114 Ok(())
1115 }
1116
1117 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1126 fn on_forkchoice_updated(
1127 &mut self,
1128 state: ForkchoiceState,
1129 attrs: Option<T::PayloadAttributes>,
1130 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1131 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1132
1133 self.building_payload = attrs.is_some() && self.config.suppress_persistence_during_build();
1134
1135 self.record_forkchoice_metrics();
1137
1138 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1140 return Ok(TreeOutcome::new(early_result));
1141 }
1142
1143 if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1145 return Ok(result);
1146 }
1147
1148 if let Some(result) = self.apply_chain_update(state, &attrs)? {
1151 return Ok(result);
1152 }
1153
1154 self.handle_missing_block(state)
1156 }
1157
1158 fn record_forkchoice_metrics(&self) {
1160 self.canonical_in_memory_state.on_forkchoice_update_received();
1161 }
1162
1163 fn validate_forkchoice_state(
1168 &mut self,
1169 state: ForkchoiceState,
1170 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1171 if state.head_block_hash.is_zero() {
1172 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1173 }
1174
1175 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1178 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1179 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1180 }
1181
1182 if !self.backfill_sync_state.is_idle() {
1183 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1186 return Ok(Some(OnForkChoiceUpdated::syncing()));
1187 }
1188
1189 Ok(None)
1190 }
1191
1192 fn handle_canonical_head(
1198 &self,
1199 state: ForkchoiceState,
1200 attrs: &Option<T::PayloadAttributes>, ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1202 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1217 return Ok(None);
1218 }
1219
1220 trace!(target: "engine::tree", "fcu head hash is already canonical");
1221
1222 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1224 return Ok(Some(TreeOutcome::new(outcome)));
1226 }
1227
1228 if let Some(attr) = attrs {
1230 let tip = self
1231 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1232 .ok_or_else(|| {
1233 ProviderError::HeaderNotFound(state.head_block_hash.into())
1236 })?;
1237 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1239 return Ok(Some(TreeOutcome::new(updated)));
1240 }
1241
1242 Ok(Some(Self::valid_outcome(state)))
1244 }
1245
1246 fn apply_chain_update(
1258 &mut self,
1259 state: ForkchoiceState,
1260 attrs: &Option<T::PayloadAttributes>,
1261 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1262 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1264 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1265
1266 if self.engine_kind.is_opstack() ||
1269 self.config.always_process_payload_attributes_on_canonical_head()
1270 {
1271 if self.config.unwind_canonical_header() {
1277 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1278 }
1279
1280 if let Some(attr) = attrs {
1281 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1282 let updated =
1284 self.process_payload_attributes(attr.clone(), &canonical_header, state);
1285 return Ok(Some(TreeOutcome::new(updated)));
1286 }
1287 }
1288
1289 return Ok(Some(Self::valid_outcome(state)));
1300 }
1301
1302 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1304 let tip = chain_update.tip().clone_sealed_header();
1305 self.on_canonical_chain_update(chain_update);
1306
1307 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1309 return Ok(Some(TreeOutcome::new(outcome)));
1311 }
1312
1313 if let Some(attr) = attrs {
1314 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1316 return Ok(Some(TreeOutcome::new(updated)));
1317 }
1318
1319 return Ok(Some(Self::valid_outcome(state)));
1320 }
1321
1322 Ok(None)
1323 }
1324
1325 fn handle_missing_block(
1330 &self,
1331 state: ForkchoiceState,
1332 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1333 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1340 !state.safe_block_hash.is_zero() &&
1342 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1343 {
1344 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1345 state.safe_block_hash
1346 } else {
1347 state.head_block_hash
1348 };
1349
1350 let target = self.lowest_buffered_ancestor_or(target);
1351 trace!(target: "engine::tree", %target, "downloading missing block");
1352
1353 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1354 PayloadStatusEnum::Syncing,
1355 )))
1356 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1357 }
1358
1359 fn remove_blocks(&mut self, new_tip_num: u64) {
1362 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1363 if new_tip_num < self.persistence_state.last_persisted_block.number {
1364 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1365 let (tx, rx) = crossbeam_channel::bounded(1);
1366 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1367 self.persistence_state.start_remove(new_tip_num, rx);
1368 }
1369 }
1370
1371 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1374 if blocks_to_persist.is_empty() {
1375 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1376 return
1377 }
1378
1379 let highest_num_hash = blocks_to_persist
1381 .iter()
1382 .max_by_key(|block| block.recovered_block().number())
1383 .map(|b| b.recovered_block().num_hash())
1384 .expect("Checked non-empty persisting blocks");
1385
1386 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1387 let (tx, rx) = crossbeam_channel::bounded(1);
1388 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1389
1390 self.persistence_state.start_save(highest_num_hash, rx);
1391 }
1392
1393 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1398 if !self.persistence_state.in_progress() {
1399 if let Some(new_tip_num) = self.find_disk_reorg()? {
1400 self.remove_blocks(new_tip_num)
1401 } else if self.should_persist() {
1402 let blocks_to_persist =
1403 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1404 self.persist_blocks(blocks_to_persist);
1405 }
1406 }
1407
1408 Ok(())
1409 }
1410
1411 fn finish_termination(
1416 &mut self,
1417 pending_termination: oneshot::Sender<()>,
1418 ) -> Result<(), AdvancePersistenceError> {
1419 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1420 let result = self.persist_until_complete();
1421 let _ = pending_termination.send(());
1422 result
1423 }
1424
1425 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1427 loop {
1428 if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1430 debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1431 let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1432 self.on_persistence_complete(result, start_time)?;
1433 }
1434
1435 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1436
1437 if blocks_to_persist.is_empty() {
1438 debug!(target: "engine::tree", "persistence complete, signaling termination");
1439 return Ok(())
1440 }
1441
1442 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1443 self.persist_blocks(blocks_to_persist);
1444 }
1445 }
1446
1447 fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1451 let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1452 return Ok(false);
1453 };
1454
1455 match rx.try_recv() {
1456 Ok(result) => {
1457 self.on_persistence_complete(result, start_time)?;
1458 Ok(true)
1459 }
1460 Err(crossbeam_channel::TryRecvError::Empty) => {
1461 self.persistence_state.rx = Some((rx, start_time, action));
1463 Ok(false)
1464 }
1465 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1466 Err(AdvancePersistenceError::ChannelClosed)
1467 }
1468 }
1469 }
1470
1471 fn on_persistence_complete(
1473 &mut self,
1474 result: PersistenceResult,
1475 start_time: Instant,
1476 ) -> Result<(), AdvancePersistenceError> {
1477 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1478
1479 let commit_duration = result.commit_duration;
1480 let Some(BlockNumHash {
1481 hash: last_persisted_block_hash,
1482 number: last_persisted_block_number,
1483 }) = result.last_block
1484 else {
1485 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1487 return Ok(())
1488 };
1489
1490 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1491 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1492
1493 let min_threshold =
1497 last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1498 let eviction_threshold =
1499 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1500 finalized.number.min(min_threshold)
1502 } else {
1503 min_threshold
1505 };
1506 debug!(
1507 target: "engine::tree",
1508 last_persisted = last_persisted_block_number,
1509 finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1510 eviction_threshold,
1511 "Evicting changesets below threshold"
1512 );
1513 self.changeset_cache.evict(eviction_threshold);
1514
1515 self.on_new_persisted_block()?;
1516
1517 self.purge_timing_stats(last_persisted_block_number, commit_duration);
1518
1519 Ok(())
1520 }
1521
1522 fn on_engine_message(
1526 &mut self,
1527 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1528 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1529 match msg {
1530 FromEngine::Event(event) => match event {
1531 FromOrchestrator::BackfillSyncStarted => {
1532 debug!(target: "engine::tree", "received backfill sync started event");
1533 self.backfill_sync_state = BackfillSyncState::Active;
1534 }
1535 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1536 self.on_backfill_sync_finished(ctrl)?;
1537 }
1538 FromOrchestrator::Terminate { tx } => {
1539 debug!(target: "engine::tree", "received terminate request");
1540 if let Err(err) = self.finish_termination(tx) {
1541 error!(target: "engine::tree", %err, "Termination failed");
1542 }
1543 return Ok(ops::ControlFlow::Break(()))
1544 }
1545 },
1546 FromEngine::Request(request) => {
1547 match request {
1548 EngineApiRequest::InsertExecutedBlock(payload) => {
1549 let block_num_hash = payload.recovered_block.num_hash();
1550 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1551 return Ok(ops::ControlFlow::Continue(()))
1553 }
1554
1555 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
1556 return Ok(ops::ControlFlow::Continue(()))
1558 }
1559
1560 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1561 let now = Instant::now();
1562
1563 let block = match self
1564 .payload_validator
1565 .on_inserted_executed_block(payload, &self.state)
1566 {
1567 Ok(block) => block,
1568 Err(err) => {
1569 warn!(target: "engine::tree", %err, block=?block_num_hash, "Failed to insert already executed block");
1570 return Ok(ops::ControlFlow::Continue(()))
1571 }
1572 };
1573
1574 if self.state.tree_state.canonical_block_hash() ==
1577 block.recovered_block().parent_hash()
1578 {
1579 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1580 self.canonical_in_memory_state.set_pending_block(block.clone());
1581 }
1582
1583 self.state.tree_state.insert_executed(block.clone());
1584 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1585 self.emit_event(EngineApiEvent::BeaconConsensus(
1586 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1587 ));
1588 }
1589 EngineApiRequest::Beacon(request) => {
1590 match request {
1591 BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1592 let has_attrs = payload_attrs.is_some();
1593
1594 let start = Instant::now();
1595 let mut output = self.on_forkchoice_updated(state, payload_attrs);
1596
1597 if let Ok(res) = &mut output {
1598 self.state
1600 .forkchoice_state_tracker
1601 .set_latest(state, res.outcome.forkchoice_status());
1602
1603 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1605 state,
1606 res.outcome.forkchoice_status(),
1607 ));
1608
1609 self.on_maybe_tree_event(res.event.take())?;
1611 }
1612
1613 if let Err(ref err) = output {
1614 error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1615 }
1616
1617 self.metrics.engine.forkchoice_updated.update_response_metrics(
1618 start,
1619 &mut self.metrics.engine.new_payload.latest_finish_at,
1620 has_attrs,
1621 &output,
1622 );
1623
1624 if let Err(err) =
1625 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1626 {
1627 self.metrics
1628 .engine
1629 .failed_forkchoice_updated_response_deliveries
1630 .increment(1);
1631 warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1632 }
1633 }
1634 BeaconEngineMessage::NewPayload { payload, tx } => {
1635 let start = Instant::now();
1636 let gas_used = payload.gas_used();
1637 let num_hash = payload.num_hash();
1638 let mut output = self.on_new_payload(payload);
1639 self.metrics.engine.new_payload.update_response_metrics(
1640 start,
1641 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1642 &output,
1643 gas_used,
1644 );
1645
1646 let maybe_event =
1647 output.as_mut().ok().and_then(|out| out.event.take());
1648
1649 if let Err(err) =
1651 tx.send(output.map(|o| o.outcome).map_err(|e| {
1652 BeaconOnNewPayloadError::Internal(Box::new(e))
1653 }))
1654 {
1655 warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1656 self.metrics
1657 .engine
1658 .failed_new_payload_response_deliveries
1659 .increment(1);
1660 }
1661
1662 self.on_maybe_tree_event(maybe_event)?;
1664 }
1665 BeaconEngineMessage::RethNewPayload {
1666 payload,
1667 wait_for_persistence,
1668 wait_for_caches,
1669 tx,
1670 enqueued_at,
1671 } => {
1672 debug!(
1673 target: "engine::tree",
1674 wait_for_persistence,
1675 wait_for_caches,
1676 "Processing reth_newPayload"
1677 );
1678
1679 let backpressure_wait = enqueued_at.elapsed();
1680
1681 let explicit_persistence_wait = if wait_for_persistence {
1682 let pending_persistence = self.persistence_state.rx.take();
1683 if let Some((rx, start_time, _action)) = pending_persistence {
1684 let (persistence_tx, persistence_rx) =
1685 std::sync::mpsc::channel();
1686 self.runtime.spawn_blocking_named(
1687 "wait-persist",
1688 move || {
1689 let start = Instant::now();
1690 let result = rx
1691 .recv()
1692 .expect("persistence state channel closed");
1693 let _ = persistence_tx.send((
1694 result,
1695 start_time,
1696 start.elapsed(),
1697 ));
1698 },
1699 );
1700 let (result, start_time, wait_duration) = persistence_rx
1701 .recv()
1702 .expect("persistence result channel closed");
1703 let _ = self.on_persistence_complete(result, start_time);
1704 wait_duration
1705 } else {
1706 Duration::ZERO
1707 }
1708 } else {
1709 Duration::ZERO
1710 };
1711
1712 let cache_wait = wait_for_caches
1713 .then(|| self.payload_validator.wait_for_caches());
1714
1715 let start = Instant::now();
1716 let gas_used = payload.gas_used();
1717 let num_hash = payload.num_hash();
1718 let mut output = self.on_new_payload(payload);
1719 let latency = start.elapsed();
1720 self.metrics.engine.new_payload.update_response_metrics(
1721 start,
1722 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1723 &output,
1724 gas_used,
1725 );
1726
1727 let maybe_event =
1728 output.as_mut().ok().and_then(|out| out.event.take());
1729
1730 let timings = NewPayloadTimings {
1731 latency,
1732 persistence_wait: backpressure_wait + explicit_persistence_wait,
1733 execution_cache_wait: cache_wait
1734 .map(|wait| wait.execution_cache),
1735 sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1736 };
1737 if let Err(err) =
1738 tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1739 BeaconOnNewPayloadError::Internal(Box::new(e))
1740 }))
1741 {
1742 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1743 self.metrics
1744 .engine
1745 .failed_new_payload_response_deliveries
1746 .increment(1);
1747 }
1748
1749 self.on_maybe_tree_event(maybe_event)?;
1750 }
1751 }
1752 }
1753 }
1754 }
1755 FromEngine::DownloadedBlocks(blocks) => {
1756 if let Some(event) = self.on_downloaded(blocks)? {
1757 self.on_tree_event(event)?;
1758 }
1759 }
1760 }
1761 Ok(ops::ControlFlow::Continue(()))
1762 }
1763
1764 fn on_backfill_sync_finished(
1778 &mut self,
1779 ctrl: ControlFlow,
1780 ) -> Result<(), InsertBlockFatalError> {
1781 debug!(target: "engine::tree", "received backfill sync finished event");
1782 self.backfill_sync_state = BackfillSyncState::Idle;
1783
1784 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1786 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1787 self.state.invalid_headers.insert(**bad_block);
1789
1790 Some(*target)
1792 } else {
1793 ctrl.block_number()
1795 };
1796
1797 let Some(backfill_height) = backfill_height else { return Ok(()) };
1799
1800 let Some(backfill_num_hash) = self
1806 .provider
1807 .block_hash(backfill_height)?
1808 .map(|hash| BlockNumHash { hash, number: backfill_height })
1809 else {
1810 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1811 return Ok(())
1812 };
1813
1814 if ctrl.is_unwind() {
1815 self.state.tree_state.reset(backfill_num_hash)
1818 } else {
1819 self.state.tree_state.remove_until(
1820 backfill_num_hash,
1821 self.persistence_state.last_persisted_block.hash,
1822 Some(backfill_num_hash),
1823 );
1824 }
1825
1826 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1827 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1828
1829 self.state.buffer.remove_old_blocks(backfill_height);
1831 self.purge_timing_stats(backfill_height, None);
1832 self.canonical_in_memory_state.clear_state();
1835
1836 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1837 self.state.tree_state.set_canonical_head(new_head.num_hash());
1840 self.persistence_state.finish(new_head.hash(), new_head.number());
1841
1842 self.canonical_in_memory_state.set_canonical_head(new_head);
1844 }
1845
1846 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1849 else {
1850 return Ok(())
1851 };
1852 if !self.engine_kind.is_opstack() && sync_target_state.finalized_block_hash.is_zero() {
1853 return Ok(())
1855 }
1856 let target_hash = self.backfill_target_hash(sync_target_state);
1857 if target_hash.is_zero() {
1858 return Ok(())
1859 }
1860 let newest_target = self.state.buffer.block(&target_hash).map(|block| block.number());
1862
1863 if let Some(backfill_target) =
1869 ctrl.block_number().zip(newest_target).and_then(|(progress, target_number)| {
1870 self.backfill_sync_target(progress, target_number, None)
1873 })
1874 {
1875 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1877 backfill_target.into(),
1878 )));
1879 return Ok(())
1880 };
1881
1882 if let Some(lowest_buffered) =
1884 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1885 {
1886 let current_head_num = self.state.tree_state.current_canonical_head.number;
1887 let target_head_num = lowest_buffered.number();
1888
1889 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1890 {
1891 debug!(
1893 target: "engine::tree",
1894 %current_head_num,
1895 %target_head_num,
1896 %distance,
1897 "Backfill complete, downloading remaining blocks to reach FCU target"
1898 );
1899
1900 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1901 lowest_buffered.parent_hash(),
1902 distance,
1903 )));
1904 return Ok(());
1905 }
1906 } else {
1907 debug!(
1910 target: "engine::tree",
1911 head_hash = %sync_target_state.head_block_hash,
1912 "Backfill complete but head block not buffered, requesting download"
1913 );
1914 self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1915 sync_target_state.head_block_hash,
1916 )));
1917 return Ok(());
1918 }
1919
1920 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1922 }
1923
1924 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1928 if let Some(chain_update) = self.on_new_head(target)? {
1929 self.on_canonical_chain_update(chain_update);
1930 }
1931
1932 self.on_canonicalized_sync_target(target);
1933
1934 Ok(())
1935 }
1936
1937 fn on_canonicalized_sync_target(&mut self, target: B256) {
1939 let Some(sync_target_state) = self
1940 .state
1941 .forkchoice_state_tracker
1942 .sync_target_state()
1943 .filter(|state| state.head_block_hash == target)
1944 else {
1945 return;
1946 };
1947
1948 if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
1949 debug!(
1950 target: "engine::tree",
1951 head = %sync_target_state.head_block_hash,
1952 safe = %sync_target_state.safe_block_hash,
1953 finalized = %sync_target_state.finalized_block_hash,
1954 ?outcome,
1955 "Canonicalized sync target head before safe/finalized could be applied"
1956 );
1957 return;
1958 }
1959
1960 self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
1961 }
1962
1963 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1965 if let Some(event) = event {
1966 self.on_tree_event(event)?;
1967 }
1968
1969 Ok(())
1970 }
1971
1972 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1976 match event {
1977 TreeEvent::TreeAction(action) => match action {
1978 TreeAction::MakeCanonical { sync_target_head } => {
1979 self.make_canonical(sync_target_head)?;
1980 }
1981 },
1982 TreeEvent::BackfillAction(action) => {
1983 self.emit_event(EngineApiEvent::BackfillAction(action));
1984 }
1985 TreeEvent::Download(action) => {
1986 self.emit_event(EngineApiEvent::Download(action));
1987 }
1988 }
1989
1990 Ok(())
1991 }
1992
1993 fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
2000 let threshold = self.config.slow_block_threshold();
2001 let check_slow = commit_duration.is_some() && threshold.is_some();
2002
2003 let keys_to_remove: Vec<B256> = self
2005 .execution_timing_stats
2006 .iter()
2007 .filter(|(_, stats)| stats.block_number <= below_number)
2008 .map(|(k, _)| *k)
2009 .collect();
2010
2011 for key in keys_to_remove {
2012 let stats = self.execution_timing_stats.remove(&key).expect("key just found");
2013 if check_slow {
2014 let commit_dur = commit_duration.expect("checked above");
2015 let total_duration =
2017 stats.execution_duration + stats.state_hash_duration + commit_dur;
2018
2019 if total_duration > threshold.expect("checked above") {
2020 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2021 stats,
2022 commit_duration: Some(commit_dur),
2023 total_duration,
2024 }));
2025 }
2026 }
2027 }
2028 }
2029
2030 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
2032 let event = event.into();
2033
2034 if event.is_backfill_action() {
2035 debug_assert_eq!(
2036 self.backfill_sync_state,
2037 BackfillSyncState::Idle,
2038 "backfill action should only be emitted when backfill is idle"
2039 );
2040
2041 if self.persistence_state.in_progress() {
2042 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
2045 return
2046 }
2047
2048 self.backfill_sync_state = BackfillSyncState::Pending;
2049 self.metrics.engine.pipeline_runs.increment(1);
2050 debug!(target: "engine::tree", "emitting backfill action event");
2051 }
2052
2053 let _ = self.outgoing.send(event).inspect_err(
2054 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2055 );
2056 }
2057
2058 pub const fn should_persist(&self) -> bool {
2062 if self.building_payload {
2063 return false
2064 }
2065
2066 if !self.backfill_sync_state.is_idle() {
2067 return false
2069 }
2070
2071 let min_block = self.persistence_state.last_persisted_block.number;
2072 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2073 self.config.persistence_threshold()
2074 }
2075
2076 fn get_canonical_blocks_to_persist(
2079 &self,
2080 target: PersistTarget,
2081 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2082 debug_assert!(!self.persistence_state.in_progress());
2085
2086 let mut blocks_to_persist = Vec::new();
2087 let mut current_hash = self.state.tree_state.canonical_block_hash();
2088 let last_persisted_number = self.persistence_state.last_persisted_block.number;
2089 let canonical_head_number = self.state.tree_state.canonical_block_number();
2090
2091 let target_number = match target {
2092 PersistTarget::Head => canonical_head_number,
2093 PersistTarget::Threshold => {
2094 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2095 }
2096 };
2097
2098 debug!(
2099 target: "engine::tree",
2100 ?current_hash,
2101 ?last_persisted_number,
2102 ?canonical_head_number,
2103 ?target_number,
2104 "Returning canonical blocks to persist"
2105 );
2106 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
2107 if block.recovered_block().number() <= last_persisted_number {
2108 break;
2109 }
2110
2111 if block.recovered_block().number() <= target_number {
2112 blocks_to_persist.push(block.clone());
2113 }
2114
2115 current_hash = block.recovered_block().parent_hash();
2116 }
2117
2118 blocks_to_persist.reverse();
2120
2121 Ok(blocks_to_persist)
2122 }
2123
2124 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2132 if let Some(remove_above) = self.find_disk_reorg()? {
2135 self.remove_blocks(remove_above);
2136 return Ok(())
2137 }
2138
2139 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2140 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2141 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2142 number: self.persistence_state.last_persisted_block.number,
2143 hash: self.persistence_state.last_persisted_block.hash,
2144 });
2145 Ok(())
2146 }
2147
2148 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2155 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2156 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2157 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2159 return Ok(block.clone())
2160 }
2161
2162 let (block, senders) = self
2163 .provider
2164 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2165 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2166 .split_sealed();
2167 let mut execution_output = self
2168 .provider
2169 .get_state(block.header().number())?
2170 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2171 let hashed_state = self.provider.hashed_post_state(execution_output.state());
2172
2173 debug!(
2174 target: "engine::tree",
2175 number = ?block.number(),
2176 "computing block trie updates",
2177 );
2178 let db_provider = self.provider.database_provider_ro()?;
2179 let trie_updates = reth_trie_db::compute_block_trie_updates(
2180 &self.changeset_cache,
2181 &db_provider,
2182 block.number(),
2183 )?;
2184
2185 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2186 let sorted_trie_updates = Arc::new(trie_updates);
2187 let trie_data = ComputedTrieData::new(sorted_hashed_state, sorted_trie_updates);
2188
2189 let execution_output = Arc::new(BlockExecutionOutput {
2190 state: execution_output.bundle,
2191 result: BlockExecutionResult {
2192 receipts: execution_output.receipts.pop().unwrap_or_default(),
2193 requests: execution_output.requests.pop().unwrap_or_default(),
2194 gas_used: block.gas_used(),
2195 blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2196 },
2197 });
2198
2199 Ok(ExecutedBlock::new(
2200 Arc::new(RecoveredBlock::new_sealed(block, senders)),
2201 execution_output,
2202 trie_data,
2203 ))
2204 }
2205
2206 fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2210 if self.state.tree_state.contains_hash(&hash) {
2211 Ok(true)
2212 } else {
2213 self.provider.is_known(hash)
2214 }
2215 }
2216
2217 fn sealed_header_by_hash(
2219 &self,
2220 hash: B256,
2221 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2222 let header = self.state.tree_state.sealed_header_by_hash(&hash);
2224
2225 if header.is_some() {
2226 Ok(header)
2227 } else {
2228 self.provider.sealed_header_by_hash(hash)
2229 }
2230 }
2231
2232 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2239 self.state
2240 .buffer
2241 .lowest_ancestor(&hash)
2242 .map(|block| block.parent_hash())
2243 .unwrap_or_else(|| hash)
2244 }
2245
2246 fn latest_valid_hash_for_invalid_payload(
2257 &mut self,
2258 parent_hash: B256,
2259 ) -> ProviderResult<Option<B256>> {
2260 if self.has_block_by_hash(parent_hash)? {
2262 return Ok(Some(parent_hash))
2263 }
2264
2265 let mut current_hash = parent_hash;
2268 let mut current_block = self.state.invalid_headers.get(¤t_hash);
2269 while let Some(block_with_parent) = current_block {
2270 current_hash = block_with_parent.parent;
2271 current_block = self.state.invalid_headers.get(¤t_hash);
2272
2273 if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2276 return Ok(Some(current_hash))
2277 }
2278 }
2279 Ok(None)
2280 }
2281
2282 fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2286 let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2287 Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2291 Some(_) => Some(parent_hash),
2292 None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2293 };
2294
2295 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2296 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2297 })
2298 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2299 }
2300
2301 fn is_sync_target_head(&self, block_hash: B256) -> bool {
2305 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2306 return target.head_block_hash == block_hash
2307 }
2308 false
2309 }
2310
2311 fn is_any_sync_target(&self, block_hash: B256) -> bool {
2315 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2316 return target.contains(block_hash)
2317 }
2318 false
2319 }
2320
2321 fn check_invalid_ancestor_with_head(
2327 &mut self,
2328 check: B256,
2329 head: &SealedBlock<N::Block>,
2330 ) -> ProviderResult<Option<PayloadStatus>> {
2331 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2333
2334 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2335 }
2336
2337 fn on_invalid_new_payload(
2339 &mut self,
2340 head: SealedBlock<N::Block>,
2341 invalid: BlockWithParent,
2342 ) -> ProviderResult<PayloadStatus> {
2343 let status = self.prepare_invalid_response(invalid.parent)?;
2345
2346 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2348 self.emit_event(ConsensusEngineEvent::InvalidBlock {
2349 block: Box::new(head),
2350 error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2351 });
2352
2353 Ok(status)
2354 }
2355
2356 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2370 let parent_hash = payload.parent_hash();
2371 let block_hash = payload.block_hash();
2372
2373 if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2375 return Some(entry);
2376 }
2377
2378 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2379 if lowest_buffered_ancestor == block_hash {
2380 lowest_buffered_ancestor = parent_hash;
2381 }
2382
2383 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2385 }
2386
2387 fn handle_invalid_ancestor_payload(
2396 &mut self,
2397 payload: T::ExecutionData,
2398 invalid: BlockWithParent,
2399 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2400 let parent_hash = payload.parent_hash();
2401 let num_hash = payload.num_hash();
2402
2403 let block = match self.payload_validator.convert_payload_to_block(payload) {
2409 Ok(block) => block,
2410 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2411 };
2412
2413 Ok(self.on_invalid_new_payload(block, invalid)?)
2414 }
2415
2416 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2419 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2421
2422 match self.prepare_invalid_response(header.parent) {
2424 Ok(status) => Ok(Some(status)),
2425 Err(err) => {
2426 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2427 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2429 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2430 })))
2431 }
2432 }
2433 }
2434
2435 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2438 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2439 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2440 return Err(e)
2441 }
2442
2443 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2444 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2445 return Err(e)
2446 }
2447
2448 Ok(())
2449 }
2450
2451 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2453 fn try_connect_buffered_blocks(
2454 &mut self,
2455 parent: BlockNumHash,
2456 ) -> Result<(), InsertBlockFatalError> {
2457 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2458
2459 if blocks.is_empty() {
2460 return Ok(())
2462 }
2463
2464 let now = Instant::now();
2465 let block_count = blocks.len();
2466 for child in blocks {
2467 let child_num_hash = child.num_hash();
2468 match self.insert_block(child) {
2469 Ok(res) => {
2470 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2471 if self.is_any_sync_target(child_num_hash.hash) &&
2472 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2473 {
2474 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2475 self.make_canonical(child_num_hash.hash)?;
2478 }
2479 }
2480 Err(err) => {
2481 if let InsertPayloadError::Block(err) = err {
2482 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2483 if let Err(fatal) = self.on_insert_block_error(err) {
2484 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2485 return Err(fatal)
2486 }
2487 }
2488 }
2489 }
2490 }
2491
2492 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2493 Ok(())
2494 }
2495
2496 fn buffer_block(
2498 &mut self,
2499 block: SealedBlock<N::Block>,
2500 ) -> Result<(), InsertBlockError<N::Block>> {
2501 if let Err(err) = self.validate_block(&block) {
2502 return Err(InsertBlockError::consensus_error(err, block))
2503 }
2504 self.state.buffer.insert_block(block);
2505 Ok(())
2506 }
2507
2508 #[inline]
2513 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2514 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2515 }
2516
2517 #[inline]
2520 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2521 if block > local_tip {
2522 Some(block - local_tip)
2523 } else {
2524 None
2525 }
2526 }
2527
2528 const fn backfill_target_hash(&self, state: ForkchoiceState) -> B256 {
2536 if self.engine_kind.is_opstack() {
2537 state.head_block_hash
2538 } else {
2539 state.finalized_block_hash
2540 }
2541 }
2542
2543 fn backfill_sync_target(
2550 &self,
2551 canonical_tip_num: u64,
2552 target_block_number: u64,
2553 downloaded_block: Option<BlockNumHash>,
2554 ) -> Option<B256> {
2555 let state = self.state.forkchoice_state_tracker.sync_target_state()?;
2556 let target_hash = self.backfill_target_hash(state);
2557
2558 let exceeds_backfill_threshold = match downloaded_block.as_ref() {
2560 Some(downloaded_block) if downloaded_block.hash == target_hash => {
2562 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2563 }
2564 _ => match self.state.buffer.block(&target_hash) {
2565 Some(buffered_target) => {
2567 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_target.number())
2568 }
2569 None => self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number),
2571 },
2572 };
2573
2574 if !exceeds_backfill_threshold {
2575 return None
2576 }
2577
2578 match self.provider.header_by_hash_or_number(target_hash.into()) {
2580 Err(err) => {
2581 warn!(target: "engine::tree", %err, "Failed to get backfill target block header");
2582 None
2583 }
2584 Ok(None) if !target_hash.is_zero() => Some(target_hash),
2586 Ok(None) => {
2587 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2600 Some(state.head_block_hash)
2601 }
2602 Ok(Some(_)) => None,
2604 }
2605 }
2606
2607 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2610 let mut canonical = self.state.tree_state.current_canonical_head;
2611 let mut persisted = self.persistence_state.last_persisted_block;
2612
2613 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2614 Ok(self
2615 .sealed_header_by_hash(num_hash.hash)?
2616 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2617 .parent_num_hash())
2618 };
2619
2620 while canonical.number > persisted.number {
2623 canonical = parent_num_hash(canonical)?;
2624 }
2625
2626 if canonical == persisted {
2628 return Ok(None);
2629 }
2630
2631 while persisted.number > canonical.number {
2637 persisted = parent_num_hash(persisted)?;
2638 }
2639
2640 debug_assert_eq!(persisted.number, canonical.number);
2641
2642 while persisted.hash != canonical.hash {
2644 canonical = parent_num_hash(canonical)?;
2645 persisted = parent_num_hash(persisted)?;
2646 }
2647
2648 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2649
2650 Ok(Some(persisted.number))
2651 }
2652
2653 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2657 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2658 let start = Instant::now();
2659
2660 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2662
2663 let tip = chain_update.tip().clone_sealed_header();
2664 let notification = chain_update.to_chain_notification();
2665
2666 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2668 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2669 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2670 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2671
2672 self.update_reorg_metrics(old.len(), old_first);
2673 self.reinsert_reorged_blocks(new.clone());
2674 self.reinsert_reorged_blocks(old.clone());
2675 }
2676
2677 self.canonical_in_memory_state.update_chain(chain_update);
2679 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2680
2681 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2683
2684 self.canonical_in_memory_state.notify_canon_state(notification);
2686
2687 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2689 Box::new(tip),
2690 start.elapsed(),
2691 ));
2692 }
2693
2694 fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2696 if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2697 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2698 first_reorged_block <= finalized.number
2699 {
2700 self.metrics.tree.reorgs.finalized.increment(1);
2701 } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2702 first_reorged_block <= safe.number
2703 {
2704 self.metrics.tree.reorgs.safe.increment(1);
2705 } else {
2706 self.metrics.tree.reorgs.head.increment(1);
2707 }
2708 } else {
2709 debug_unreachable!("Reorged chain doesn't have any blocks");
2710 }
2711 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2712 }
2713
2714 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2716 for block in new_chain {
2717 if self
2718 .state
2719 .tree_state
2720 .executed_block_by_hash(block.recovered_block().hash())
2721 .is_none()
2722 {
2723 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2724 self.state.tree_state.insert_executed(block);
2725 }
2726 }
2727 }
2728
2729 fn on_disconnected_downloaded_block(
2734 &self,
2735 downloaded_block: BlockNumHash,
2736 missing_parent: BlockNumHash,
2737 head: BlockNumHash,
2738 ) -> Option<TreeEvent> {
2739 if let Some(target) =
2741 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2742 {
2743 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2744 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2745 }
2746
2747 let request = if let Some(distance) =
2757 self.distance_from_local_tip(head.number, missing_parent.number)
2758 {
2759 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2760 DownloadRequest::BlockRange(missing_parent.hash, distance)
2761 } else {
2762 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2763 DownloadRequest::single_block(missing_parent.hash)
2766 };
2767
2768 Some(TreeEvent::Download(request))
2769 }
2770
2771 fn on_valid_downloaded_block(
2778 &mut self,
2779 block_num_hash: BlockNumHash,
2780 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2781 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2784 sync_target.contains(block_num_hash.hash)
2785 {
2786 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2787
2788 if sync_target.head_block_hash == block_num_hash.hash {
2789 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2791 sync_target_head: block_num_hash.hash,
2792 })))
2793 }
2794
2795 self.make_canonical(block_num_hash.hash)?;
2799 self.try_connect_buffered_blocks(block_num_hash)?;
2800
2801 if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2804 let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2805 trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2806 return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2807 }
2808
2809 return Ok(None)
2810 }
2811 trace!(target: "engine::tree", "appended downloaded block");
2812 self.try_connect_buffered_blocks(block_num_hash)?;
2813 Ok(None)
2814 }
2815
2816 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2822 fn on_downloaded_block(
2823 &mut self,
2824 block: SealedBlock<N::Block>,
2825 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2826 let block_num_hash = block.num_hash();
2827 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2828 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2829 return Ok(None)
2830 }
2831
2832 if !self.backfill_sync_state.is_idle() {
2833 return Ok(None)
2834 }
2835
2836 match self.insert_block(block) {
2838 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2839 return self.on_valid_downloaded_block(block_num_hash);
2840 }
2841 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2842 return Ok(self.on_disconnected_downloaded_block(
2845 block_num_hash,
2846 missing_ancestor,
2847 head,
2848 ))
2849 }
2850 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2851 trace!(target: "engine::tree", "downloaded block already executed");
2852 }
2853 Err(err) => {
2854 if let InsertPayloadError::Block(err) = err {
2855 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2856 if let Err(fatal) = self.on_insert_block_error(err) {
2857 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2858 return Err(fatal)
2859 }
2860 }
2861 }
2862 }
2863 Ok(None)
2864 }
2865
2866 fn insert_payload(
2875 &mut self,
2876 payload: T::ExecutionData,
2877 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2878 self.insert_block_or_payload(
2879 payload.block_with_parent(),
2880 payload,
2881 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2882 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2883 )
2884 }
2885
2886 fn insert_block(
2887 &mut self,
2888 block: SealedBlock<N::Block>,
2889 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2890 self.insert_block_or_payload(
2891 block.block_with_parent(),
2892 block,
2893 |validator, block, ctx| validator.validate_block(block, ctx),
2894 |_, block| Ok(block),
2895 )
2896 }
2897
2898 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2915 fn insert_block_or_payload<Input, Err>(
2916 &mut self,
2917 block_id: BlockWithParent,
2918 input: Input,
2919 execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ValidationOutput<N>, Err>,
2920 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2921 ) -> Result<InsertPayloadOk, Err>
2922 where
2923 Err: From<InsertBlockError<N::Block>>,
2924 {
2925 let block_insert_start = Instant::now();
2926 let block_num_hash = block_id.block;
2927 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2928
2929 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2931 convert_to_block(self, input)?;
2932 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2933 }
2934
2935 if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2938 match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2939 Err(err) => {
2940 let block = convert_to_block(self, input)?;
2941 return Err(InsertBlockError::new(block, err.into()).into());
2942 }
2943 Ok(Some(_)) => {
2944 convert_to_block(self, input)?;
2945 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2946 }
2947 Ok(None) => {}
2948 }
2949 }
2950
2951 match self.state_provider_builder(block_id.parent) {
2953 Err(err) => {
2954 let block = convert_to_block(self, input)?;
2955 return Err(InsertBlockError::new(block, err.into()).into());
2956 }
2957 Ok(None) => {
2958 let block = convert_to_block(self, input)?;
2959
2960 let missing_ancestor = self
2963 .state
2964 .buffer
2965 .lowest_ancestor(&block.parent_hash())
2966 .map(|block| block.parent_num_hash())
2967 .unwrap_or_else(|| block.parent_num_hash());
2968
2969 self.state.buffer.insert_block(block);
2970
2971 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2972 head: self.state.tree_state.current_canonical_head,
2973 missing_ancestor,
2974 }))
2975 }
2976 Ok(Some(_)) => {}
2977 }
2978
2979 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2984
2985 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2986
2987 let start = Instant::now();
2988
2989 let ValidationOutput {
2990 executed_block: executed,
2991 execution_timing_stats: timing_stats,
2992 raw_bal,
2993 } = execute(&mut self.payload_validator, input, ctx)?;
2994
2995 if let Some(raw_bal) = raw_bal {
2996 let num_hash = executed.recovered_block().num_hash();
2997 if let Err(err) = self.provider.bal_store().insert(num_hash, raw_bal) {
2998 warn!(
2999 target: "engine::tree",
3000 ?num_hash,
3001 %err,
3002 "Failed to store validated block access list"
3003 );
3004 }
3005 }
3006
3007 if let Some(stats) = timing_stats {
3010 if let Some(threshold) = self.config.slow_block_threshold() {
3011 let total_duration = stats.execution_duration + stats.state_hash_duration;
3012 if total_duration > threshold {
3013 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
3014 stats: stats.clone(),
3015 commit_duration: None,
3016 total_duration,
3017 }));
3018 }
3019 }
3020 self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
3021 }
3022
3023 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
3025 {
3026 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
3027 self.canonical_in_memory_state.set_pending_block(executed.clone());
3028 }
3029
3030 self.state.tree_state.insert_executed(executed.clone());
3031 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
3032
3033 let elapsed = start.elapsed();
3035 let engine_event = if is_fork {
3036 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
3037 } else {
3038 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
3039 };
3040 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
3041
3042 self.metrics
3043 .engine
3044 .block_insert_total_duration
3045 .record(block_insert_start.elapsed().as_secs_f64());
3046 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
3047 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
3048 }
3049
3050 fn on_insert_block_error(
3056 &mut self,
3057 error: InsertBlockError<N::Block>,
3058 ) -> Result<PayloadStatus, InsertBlockFatalError> {
3059 let (block, error) = error.split();
3060
3061 let validation_err = error.ensure_validation_error()?;
3064
3065 warn!(
3069 target: "engine::tree",
3070 invalid_hash=%block.hash(),
3071 invalid_number=block.number(),
3072 %validation_err,
3073 "Invalid block error on new payload",
3074 );
3075 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3076
3077 let is_transient = match &validation_err {
3079 InsertBlockValidationError::Consensus(err) => self.consensus.is_transient_error(err),
3080 _ => false,
3081 };
3082 if is_transient {
3083 warn!(
3084 target: "engine::tree",
3085 invalid_hash=%block.hash(),
3086 invalid_number=block.number(),
3087 %validation_err,
3088 "Skipping invalid header cache insert for transient validation error",
3089 );
3090 } else {
3091 self.state.invalid_headers.insert(block.block_with_parent());
3092 }
3093 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock {
3094 block: Box::new(block),
3095 error: validation_err.to_string(),
3096 }));
3097
3098 Ok(PayloadStatus::new(
3099 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3100 latest_valid_hash,
3101 ))
3102 }
3103
3104 fn on_new_payload_error(
3106 &mut self,
3107 error: NewPayloadError,
3108 payload_num_hash: NumHash,
3109 parent_hash: B256,
3110 ) -> ProviderResult<PayloadStatus> {
3111 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3112 let latest_valid_hash =
3115 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3116 None
3120 } else {
3121 self.latest_valid_hash_for_invalid_payload(parent_hash)?
3122 };
3123
3124 let status = PayloadStatusEnum::from(error);
3125 Ok(PayloadStatus::new(status, latest_valid_hash))
3126 }
3127
3128 pub fn find_canonical_header(
3130 &self,
3131 hash: B256,
3132 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3133 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3134
3135 if canonical.is_none() {
3136 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3137 }
3138
3139 Ok(canonical)
3140 }
3141
3142 fn update_finalized_block(
3144 &self,
3145 finalized_block_hash: B256,
3146 ) -> Result<(), OnForkChoiceUpdated> {
3147 if finalized_block_hash.is_zero() {
3148 return Ok(())
3149 }
3150
3151 match self.find_canonical_header(finalized_block_hash) {
3152 Ok(None) => {
3153 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3154 return Err(OnForkChoiceUpdated::invalid_state())
3156 }
3157 Ok(Some(finalized)) => {
3158 if Some(finalized.num_hash()) !=
3159 self.canonical_in_memory_state.get_finalized_num_hash()
3160 {
3161 let _ = self.persistence.save_finalized_block_number(finalized.number());
3164 self.canonical_in_memory_state.set_finalized(finalized.clone());
3165 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3167 }
3168 }
3169 Err(err) => {
3170 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3171 }
3172 }
3173
3174 Ok(())
3175 }
3176
3177 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3179 if safe_block_hash.is_zero() {
3180 return Ok(())
3181 }
3182
3183 match self.find_canonical_header(safe_block_hash) {
3184 Ok(None) => {
3185 debug!(target: "engine::tree", "Safe block not found in canonical chain");
3186 return Err(OnForkChoiceUpdated::invalid_state())
3188 }
3189 Ok(Some(safe)) => {
3190 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3191 let _ = self.persistence.save_safe_block_number(safe.number());
3194 self.canonical_in_memory_state.set_safe(safe.clone());
3195 self.metrics.tree.safe_block_height.set(safe.number() as f64);
3197 }
3198 }
3199 Err(err) => {
3200 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3201 }
3202 }
3203
3204 Ok(())
3205 }
3206
3207 fn ensure_consistent_forkchoice_state(
3216 &self,
3217 state: ForkchoiceState,
3218 ) -> Result<(), OnForkChoiceUpdated> {
3219 self.update_finalized_block(state.finalized_block_hash)?;
3225
3226 self.update_safe_block(state.safe_block_hash)
3232 }
3233
3234 fn process_payload_attributes(
3249 &self,
3250 attributes: T::PayloadAttributes,
3251 head: &N::BlockHeader,
3252 state: ForkchoiceState,
3253 ) -> OnForkChoiceUpdated {
3254 if let Err(err) =
3255 self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3256 {
3257 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3258 return OnForkChoiceUpdated::invalid_payload_attributes()
3259 }
3260
3261 let cache = if self.config.share_execution_cache_with_payload_builder() {
3267 self.payload_validator.cache_for(state.head_block_hash)
3268 } else {
3269 None
3270 };
3271
3272 let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3273 self.payload_validator.sparse_trie_handle_for(
3274 state.head_block_hash,
3275 head.state_root(),
3276 &self.state,
3277 )
3278 } else {
3279 None
3280 };
3281
3282 let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3285 parent_hash: state.head_block_hash,
3286 attributes,
3287 cache,
3288 trie_handle,
3289 });
3290
3291 OnForkChoiceUpdated::updated_with_pending_payload_id(
3303 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3304 pending_payload_id,
3305 )
3306 }
3307
3308 pub(crate) fn remove_before(
3315 &mut self,
3316 upper_bound: BlockNumHash,
3317 finalized_hash: Option<B256>,
3318 ) -> ProviderResult<()> {
3319 let num = if let Some(hash) = finalized_hash {
3322 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3323 } else {
3324 None
3325 };
3326
3327 self.state.tree_state.remove_until(
3328 upper_bound,
3329 self.persistence_state.last_persisted_block.hash,
3330 num,
3331 );
3332 Ok(())
3333 }
3334
3335 pub fn state_provider_builder(
3340 &self,
3341 hash: B256,
3342 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3343 where
3344 P: BlockReader + StateProviderFactory + StateReader + Clone,
3345 {
3346 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3347 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3348 return Ok(Some(StateProviderBuilder::new(
3350 self.provider.clone(),
3351 historical,
3352 Some(blocks),
3353 )))
3354 }
3355
3356 if let Some(header) = self.provider.header(hash)? {
3358 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3359 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3362 }
3363
3364 debug!(target: "engine::tree", %hash, "no canonical state found for block");
3365 Ok(None)
3366 }
3367}
3368
3369#[derive(Debug)]
3371enum LoopEvent<T, N>
3372where
3373 N: NodePrimitives,
3374 T: PayloadTypes,
3375{
3376 EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3378 PersistenceComplete {
3380 result: PersistenceResult,
3382 start_time: Instant,
3384 },
3385 Disconnected,
3387}
3388
3389#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3395pub enum BlockStatus {
3396 Valid,
3398 Disconnected {
3400 head: BlockNumHash,
3402 missing_ancestor: BlockNumHash,
3404 },
3405}
3406
3407#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3412pub enum InsertPayloadOk {
3413 AlreadySeen(BlockStatus),
3415 Inserted(BlockStatus),
3417}
3418
3419#[derive(Debug, Clone, Copy)]
3421enum PersistTarget {
3422 Threshold,
3424 Head,
3426}
3427
3428#[derive(Debug, Clone, Copy, Default)]
3430pub struct CacheWaitDurations {
3431 pub execution_cache: Duration,
3433 pub sparse_trie: Duration,
3435}
3436
3437pub trait WaitForCaches {
3442 fn wait_for_caches(&self) -> CacheWaitDurations;
3446}