1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17 MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22 ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29 FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32 BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33 DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34 StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35 StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48 oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64
65use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
66pub use block_buffer::BlockBuffer;
67pub use invalid_headers::InvalidHeaderCache;
68pub use metrics::EngineApiMetrics;
69pub use payload_processor::*;
70pub use payload_validator::{BasicEngineValidator, EngineValidator};
71pub use persistence_state::PersistenceState;
72pub use reth_engine_primitives::TreeConfig;
73pub use reth_execution_cache::{
74 CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider, ExecutionCache,
75 PayloadExecutionCache, SavedCache,
76};
77
78pub mod state;
79
80pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
96
97#[derive(Clone, Debug)]
99pub struct StateProviderBuilder<N: NodePrimitives, P> {
100 provider_factory: P,
102 historical: B256,
104 overlay: Option<Vec<ExecutedBlock<N>>>,
106}
107
108impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
109 pub const fn new(
112 provider_factory: P,
113 historical: B256,
114 overlay: Option<Vec<ExecutedBlock<N>>>,
115 ) -> Self {
116 Self { provider_factory, historical, overlay }
117 }
118}
119
120impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
121where
122 P: BlockReader + StateProviderFactory + StateReader + Clone,
123{
124 pub fn build(&self) -> ProviderResult<StateProviderBox> {
126 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
127 if let Some(overlay) = self.overlay.clone() {
128 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
129 }
130 Ok(provider)
131 }
132}
133
134#[derive(Debug)]
138pub struct EngineApiTreeState<N: NodePrimitives> {
139 tree_state: TreeState<N>,
141 forkchoice_state_tracker: ForkchoiceStateTracker,
143 buffer: BlockBuffer<N::Block>,
145 invalid_headers: InvalidHeaderCache,
148}
149
150impl<N: NodePrimitives> EngineApiTreeState<N> {
151 fn new(
152 block_buffer_limit: u32,
153 max_invalid_header_cache_length: u32,
154 canonical_block: BlockNumHash,
155 engine_kind: EngineApiKind,
156 ) -> Self {
157 Self {
158 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
159 buffer: BlockBuffer::new(block_buffer_limit),
160 tree_state: TreeState::new(canonical_block, engine_kind),
161 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
162 }
163 }
164
165 pub const fn tree_state(&self) -> &TreeState<N> {
167 &self.tree_state
168 }
169
170 pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
172 self.invalid_headers.get(hash).is_some()
173 }
174}
175
176#[derive(Debug)]
178pub struct TreeOutcome<T> {
179 pub outcome: T,
181 pub event: Option<TreeEvent>,
183 pub already_seen: bool,
186}
187
188impl<T> TreeOutcome<T> {
189 pub const fn new(outcome: T) -> Self {
191 Self { outcome, event: None, already_seen: false }
192 }
193
194 pub fn with_event(mut self, event: TreeEvent) -> Self {
196 self.event = Some(event);
197 self
198 }
199
200 pub const fn with_already_seen(mut self, value: bool) -> Self {
202 self.already_seen = value;
203 self
204 }
205}
206
207#[derive(Debug)]
209pub struct TryInsertPayloadResult {
210 pub status: PayloadStatus,
214 pub already_seen: bool,
216}
217
218impl TryInsertPayloadResult {
219 #[inline]
221 pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
222 TreeOutcome::new(self.status).with_already_seen(self.already_seen)
223 }
224}
225
226#[derive(Debug)]
228pub enum TreeEvent {
229 TreeAction(TreeAction),
231 BackfillAction(BackfillAction),
233 Download(DownloadRequest),
235}
236
237impl TreeEvent {
238 const fn is_backfill_action(&self) -> bool {
240 matches!(self, Self::BackfillAction(_))
241 }
242}
243
244#[derive(Debug)]
246pub enum TreeAction {
247 MakeCanonical {
249 sync_target_head: B256,
251 },
252}
253
254pub struct EngineApiTreeHandler<N, P, T, V, C>
259where
260 N: NodePrimitives,
261 T: PayloadTypes,
262 C: ConfigureEvm<Primitives = N> + 'static,
263{
264 provider: P,
265 consensus: Arc<dyn FullConsensus<N>>,
266 payload_validator: V,
267 state: EngineApiTreeState<N>,
269 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
278 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
280 outgoing: UnboundedSender<EngineApiEvent<N>>,
282 persistence: PersistenceHandle<N>,
284 persistence_state: PersistenceState,
286 backfill_sync_state: BackfillSyncState,
288 canonical_in_memory_state: CanonicalInMemoryState<N>,
291 payload_builder: PayloadBuilderHandle<T>,
294 config: TreeConfig,
296 metrics: EngineApiMetrics,
298 engine_kind: EngineApiKind,
300 evm_config: C,
302 changeset_cache: ChangesetCache,
304 execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
308 runtime: reth_tasks::Runtime,
310}
311
312impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
313 for EngineApiTreeHandler<N, P, T, V, C>
314where
315 N: NodePrimitives,
316 C: Debug + ConfigureEvm<Primitives = N>,
317{
318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 f.debug_struct("EngineApiTreeHandler")
320 .field("provider", &self.provider)
321 .field("consensus", &self.consensus)
322 .field("payload_validator", &self.payload_validator)
323 .field("state", &self.state)
324 .field("incoming_tx", &self.incoming_tx)
325 .field("persistence", &self.persistence)
326 .field("persistence_state", &self.persistence_state)
327 .field("backfill_sync_state", &self.backfill_sync_state)
328 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
329 .field("payload_builder", &self.payload_builder)
330 .field("config", &self.config)
331 .field("metrics", &self.metrics)
332 .field("engine_kind", &self.engine_kind)
333 .field("evm_config", &self.evm_config)
334 .field("changeset_cache", &self.changeset_cache)
335 .field("execution_timing_stats", &self.execution_timing_stats.len())
336 .field("runtime", &self.runtime)
337 .finish()
338 }
339}
340
341impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
342where
343 N: NodePrimitives,
344 P: DatabaseProviderFactory
345 + BlockReader<Block = N::Block, Header = N::BlockHeader>
346 + StateProviderFactory
347 + StateReader<Receipt = N::Receipt>
348 + HashedPostStateProvider
349 + Clone
350 + 'static,
351 P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
352 + StageCheckpointReader
353 + ChangeSetReader
354 + StorageChangeSetReader
355 + StorageSettingsCache,
356 C: ConfigureEvm<Primitives = N> + 'static,
357 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
358 V: EngineValidator<T> + WaitForCaches,
359{
360 #[expect(clippy::too_many_arguments)]
362 pub fn new(
363 provider: P,
364 consensus: Arc<dyn FullConsensus<N>>,
365 payload_validator: V,
366 outgoing: UnboundedSender<EngineApiEvent<N>>,
367 state: EngineApiTreeState<N>,
368 canonical_in_memory_state: CanonicalInMemoryState<N>,
369 persistence: PersistenceHandle<N>,
370 persistence_state: PersistenceState,
371 payload_builder: PayloadBuilderHandle<T>,
372 config: TreeConfig,
373 engine_kind: EngineApiKind,
374 evm_config: C,
375 changeset_cache: ChangesetCache,
376 runtime: reth_tasks::Runtime,
377 ) -> Self {
378 let (incoming_tx, incoming) = crossbeam_channel::unbounded();
379
380 Self {
381 provider,
382 consensus,
383 payload_validator,
384 incoming,
385 outgoing,
386 persistence,
387 persistence_state,
388 backfill_sync_state: BackfillSyncState::Idle,
389 state,
390 canonical_in_memory_state,
391 payload_builder,
392 config,
393 metrics: Default::default(),
394 incoming_tx,
395 engine_kind,
396 evm_config,
397 changeset_cache,
398 execution_timing_stats: HashMap::new(),
399 runtime,
400 }
401 }
402
403 #[expect(clippy::complexity)]
409 pub fn spawn_new(
410 provider: P,
411 consensus: Arc<dyn FullConsensus<N>>,
412 payload_validator: V,
413 persistence: PersistenceHandle<N>,
414 payload_builder: PayloadBuilderHandle<T>,
415 canonical_in_memory_state: CanonicalInMemoryState<N>,
416 config: TreeConfig,
417 kind: EngineApiKind,
418 evm_config: C,
419 changeset_cache: ChangesetCache,
420 runtime: reth_tasks::Runtime,
421 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
422 {
423 let best_block_number = provider.best_block_number().unwrap_or(0);
424 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
425
426 let persistence_state = PersistenceState {
427 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
428 rx: None,
429 };
430
431 let (tx, outgoing) = unbounded_channel();
432 let state = EngineApiTreeState::new(
433 config.block_buffer_limit(),
434 config.max_invalid_header_cache_length(),
435 header.num_hash(),
436 kind,
437 );
438
439 let task = Self::new(
440 provider,
441 consensus,
442 payload_validator,
443 tx,
444 state,
445 canonical_in_memory_state,
446 persistence,
447 persistence_state,
448 payload_builder,
449 config,
450 kind,
451 evm_config,
452 changeset_cache,
453 runtime,
454 );
455 let incoming = task.incoming_tx.clone();
456 spawn_os_thread("engine", || {
457 increase_thread_priority();
458 task.run()
459 });
460 (incoming, outgoing)
461 }
462
463 fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
465 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
466 PayloadStatusEnum::Valid,
467 Some(state.head_block_hash),
468 )))
469 }
470
471 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
473 self.incoming_tx.clone()
474 }
475
476 const fn persistence_gap(&self) -> u64 {
479 self.state
480 .tree_state
481 .canonical_block_number()
482 .saturating_sub(self.persistence_state.last_persisted_block.number)
483 }
484
485 const fn should_backpressure(&self) -> bool {
490 self.persistence_state.in_progress() &&
491 self.persistence_gap() >= self.config.persistence_backpressure_threshold()
492 }
493
494 pub fn run(mut self) {
498 loop {
499 match self.try_poll_persistence() {
523 Ok(true) => {
524 if let Err(err) = self.advance_persistence() {
525 error!(target: "engine::tree", %err, "Advancing persistence failed");
526 return
527 }
528 continue;
529 }
530 Ok(false) => {}
531 Err(err) => {
532 error!(target: "engine::tree", %err, "Polling persistence failed");
533 return
534 }
535 }
536
537 let event = if self.should_backpressure() {
538 self.metrics.engine.backpressure_active.set(1.0);
539 let stall_start = Instant::now();
540 let event = self.wait_for_persistence_event();
541 self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
542 event
543 } else {
544 self.metrics.engine.backpressure_active.set(0.0);
545 self.wait_for_event()
546 };
547
548 match event {
549 LoopEvent::EngineMessage(msg) => {
550 debug!(target: "engine::tree", %msg, "received new engine message");
551 match self.on_engine_message(msg) {
552 Ok(ops::ControlFlow::Break(())) => return,
553 Ok(ops::ControlFlow::Continue(())) => {}
554 Err(fatal) => {
555 error!(target: "engine::tree", %fatal, "insert block fatal error");
556 return
557 }
558 }
559 }
560 LoopEvent::PersistenceComplete { result, start_time } => {
561 if let Err(err) = self.on_persistence_complete(result, start_time) {
562 error!(target: "engine::tree", %err, "Persistence complete handling failed");
563 return
564 }
565 }
566 LoopEvent::Disconnected => {
567 error!(target: "engine::tree", "Channel disconnected");
568 return
569 }
570 }
571
572 if let Err(err) = self.advance_persistence() {
577 error!(target: "engine::tree", %err, "Advancing persistence failed");
578 return
579 }
580 }
581 }
582
583 fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
589 let maybe_persistence = self.persistence_state.rx.take();
590
591 if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
592 match persistence_rx.recv() {
593 Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
594 Err(_) => LoopEvent::Disconnected,
595 }
596 } else {
597 self.wait_for_event()
598 }
599 }
600
601 fn wait_for_event(&mut self) -> LoopEvent<T, N> {
607 let maybe_persistence = self.persistence_state.rx.take();
609
610 if let Some((persistence_rx, start_time, action)) = maybe_persistence {
611 crossbeam_channel::select_biased! {
614 recv(persistence_rx) -> result => {
615 match result {
617 Ok(result) => LoopEvent::PersistenceComplete {
618 result,
619 start_time,
620 },
621 Err(_) => LoopEvent::Disconnected,
622 }
623 },
624 recv(self.incoming) -> msg => {
625 self.persistence_state.rx = Some((persistence_rx, start_time, action));
627 match msg {
628 Ok(m) => LoopEvent::EngineMessage(m),
629 Err(_) => LoopEvent::Disconnected,
630 }
631 },
632 }
633 } else {
634 match self.incoming.recv() {
636 Ok(m) => LoopEvent::EngineMessage(m),
637 Err(_) => LoopEvent::Disconnected,
638 }
639 }
640 }
641
642 fn on_downloaded(
648 &mut self,
649 mut blocks: Vec<SealedBlock<N::Block>>,
650 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
651 if blocks.is_empty() {
652 return Ok(None)
654 }
655
656 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
657 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
658 for block in blocks.drain(..batch) {
659 if let Some(event) = self.on_downloaded_block(block)? {
660 let needs_backfill = event.is_backfill_action();
661 self.on_tree_event(event)?;
662 if needs_backfill {
663 return Ok(None)
665 }
666 }
667 }
668
669 if !blocks.is_empty() {
671 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
672 }
673
674 Ok(None)
675 }
676
677 #[instrument(
692 level = "debug",
693 target = "engine::tree",
694 skip_all,
695 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
696 )]
697 fn on_new_payload(
698 &mut self,
699 payload: T::ExecutionData,
700 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
701 trace!(target: "engine::tree", "invoked new payload");
702
703 let start = Instant::now();
705
706 let num_hash = payload.num_hash();
733 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
734 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
735
736 let block_hash = num_hash.hash;
737
738 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
740 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
741 return Ok(TreeOutcome::new(status));
742 }
743
744 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
746
747 let mut outcome = if self.backfill_sync_state.is_idle() {
748 self.try_insert_payload(payload)?.into_outcome()
749 } else {
750 TreeOutcome::new(self.try_buffer_payload(payload)?)
751 };
752
753 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
755 if self.state.tree_state.canonical_block_hash() != block_hash {
757 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
758 sync_target_head: block_hash,
759 }));
760 }
761 }
762
763 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
765
766 Ok(outcome)
767 }
768
769 #[instrument(level = "debug", target = "engine::tree", skip_all)]
771 fn try_insert_payload(
772 &mut self,
773 payload: T::ExecutionData,
774 ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
775 let block_hash = payload.block_hash();
776 let num_hash = payload.num_hash();
777 let parent_hash = payload.parent_hash();
778 let mut latest_valid_hash = None;
779
780 match self.insert_payload(payload) {
781 Ok(status) => {
782 let (status, already_seen) = match status {
783 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
784 latest_valid_hash = Some(block_hash);
785 self.try_connect_buffered_blocks(num_hash)?;
786 (PayloadStatusEnum::Valid, false)
787 }
788 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
789 latest_valid_hash = Some(block_hash);
790 (PayloadStatusEnum::Valid, true)
791 }
792 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
793 (PayloadStatusEnum::Syncing, false)
794 }
795 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
796 (PayloadStatusEnum::Syncing, true)
798 }
799 };
800
801 Ok(TryInsertPayloadResult {
802 status: PayloadStatus::new(status, latest_valid_hash),
803 already_seen,
804 })
805 }
806 Err(error) => {
807 let status = match error {
808 InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
809 InsertPayloadError::Payload(error) => {
810 self.on_new_payload_error(error, num_hash, parent_hash)?
811 }
812 };
813
814 Ok(TryInsertPayloadResult { status, already_seen: false })
815 }
816 }
817 }
818
819 fn try_buffer_payload(
828 &mut self,
829 payload: T::ExecutionData,
830 ) -> Result<PayloadStatus, InsertBlockFatalError> {
831 let parent_hash = payload.parent_hash();
832 let num_hash = payload.num_hash();
833
834 match self.payload_validator.convert_payload_to_block(payload) {
835 Ok(block) => {
837 if let Err(error) = self.buffer_block(block) {
838 Ok(self.on_insert_block_error(error)?)
839 } else {
840 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
841 }
842 }
843 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
844 }
845 }
846
847 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
854 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
856 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
857 self.metrics.engine.executed_new_block_cache_miss.increment(1);
858 return Ok(None)
859 };
860
861 let new_head_number = new_head_block.recovered_block().number();
862 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
863
864 let mut new_chain = vec![new_head_block.clone()];
865 let mut current_hash = new_head_block.recovered_block().parent_hash();
866 let mut current_number = new_head_number - 1;
867
868 while current_number > current_canonical_number {
873 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
874 {
875 current_hash = block.recovered_block().parent_hash();
876 current_number -= 1;
877 new_chain.push(block);
878 } else {
879 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
880 return Ok(None)
883 }
884 }
885
886 if current_hash == self.state.tree_state.current_canonical_head.hash {
889 new_chain.reverse();
890
891 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
893 }
894
895 let mut old_chain = Vec::new();
897 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
898
899 while current_canonical_number > current_number {
902 let block = self.canonical_block_by_hash(old_hash)?;
903 old_hash = block.recovered_block().parent_hash();
904 old_chain.push(block);
905 current_canonical_number -= 1;
906 }
907
908 debug_assert_eq!(current_number, current_canonical_number);
910
911 while old_hash != current_hash {
914 let block = self.canonical_block_by_hash(old_hash)?;
915 old_hash = block.recovered_block().parent_hash();
916 old_chain.push(block);
917
918 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
919 {
920 current_hash = block.recovered_block().parent_hash();
921 new_chain.push(block);
922 } else {
923 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
925 return Ok(None)
926 }
927 }
928 new_chain.reverse();
929 old_chain.reverse();
930
931 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
932 }
933
934 fn update_latest_block_to_canonical_ancestor(
946 &mut self,
947 canonical_header: &SealedHeader<N::BlockHeader>,
948 ) -> ProviderResult<()> {
949 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
950 let current_head_number = self.state.tree_state.canonical_block_number();
951 let new_head_number = canonical_header.number();
952 let new_head_hash = canonical_header.hash();
953
954 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
956
957 if new_head_number < current_head_number {
959 debug!(
960 target: "engine::tree",
961 current_head = current_head_number,
962 new_head = new_head_number,
963 new_head_hash = ?new_head_hash,
964 "FCU unwind detected: reverting to canonical ancestor"
965 );
966
967 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
968 } else {
969 debug!(
970 target: "engine::tree",
971 previous_head = current_head_number,
972 new_head = new_head_number,
973 new_head_hash = ?new_head_hash,
974 "Advancing latest block to canonical ancestor"
975 );
976 self.handle_chain_advance_or_same_height(canonical_header)
977 }
978 }
979
980 fn handle_canonical_chain_unwind(
983 &self,
984 current_head_number: u64,
985 canonical_header: &SealedHeader<N::BlockHeader>,
986 ) -> ProviderResult<()> {
987 let new_head_number = canonical_header.number();
988 debug!(
989 target: "engine::tree",
990 from = current_head_number,
991 to = new_head_number,
992 "Handling unwind: collecting blocks to remove from in-memory state"
993 );
994
995 let old_blocks =
997 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
998
999 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1001 }
1002
1003 fn collect_blocks_for_canonical_unwind(
1005 &self,
1006 new_head_number: u64,
1007 current_head_number: u64,
1008 ) -> Vec<ExecutedBlock<N>> {
1009 let mut old_blocks =
1010 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1011
1012 for block_num in (new_head_number + 1)..=current_head_number {
1013 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1014 let executed_block = block_state.block_ref().clone();
1015 old_blocks.push(executed_block);
1016 debug!(
1017 target: "engine::tree",
1018 block_number = block_num,
1019 "Collected block for removal from in-memory state"
1020 );
1021 }
1022 }
1023
1024 if old_blocks.is_empty() {
1025 debug!(
1026 target: "engine::tree",
1027 "No blocks found in memory to remove, will clear and reset state"
1028 );
1029 }
1030
1031 old_blocks
1032 }
1033
1034 fn apply_canonical_ancestor_via_reorg(
1036 &self,
1037 canonical_header: &SealedHeader<N::BlockHeader>,
1038 old_blocks: Vec<ExecutedBlock<N>>,
1039 ) -> ProviderResult<()> {
1040 let new_head_hash = canonical_header.hash();
1041 let new_head_number = canonical_header.number();
1042
1043 let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1045 self.canonical_in_memory_state
1047 .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1048
1049 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1052
1053 debug!(
1054 target: "engine::tree",
1055 block_number = new_head_number,
1056 block_hash = ?new_head_hash,
1057 "Successfully loaded canonical ancestor into memory via reorg"
1058 );
1059
1060 Ok(())
1061 }
1062
1063 fn handle_chain_advance_or_same_height(
1065 &self,
1066 canonical_header: &SealedHeader<N::BlockHeader>,
1067 ) -> ProviderResult<()> {
1068 self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1070
1071 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1073
1074 Ok(())
1075 }
1076
1077 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1079 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1081 return Ok(());
1082 }
1083
1084 let executed_block = self.canonical_block_by_hash(block_hash)?;
1086 self.canonical_in_memory_state
1087 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1088
1089 debug!(
1090 target: "engine::tree",
1091 block_number,
1092 block_hash = ?block_hash,
1093 "Added canonical block to in-memory state"
1094 );
1095
1096 Ok(())
1097 }
1098
1099 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1108 fn on_forkchoice_updated(
1109 &mut self,
1110 state: ForkchoiceState,
1111 attrs: Option<T::PayloadAttributes>,
1112 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1113 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1114
1115 self.record_forkchoice_metrics();
1117
1118 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1120 return Ok(TreeOutcome::new(early_result));
1121 }
1122
1123 if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1125 return Ok(result);
1126 }
1127
1128 if let Some(result) = self.apply_chain_update(state, &attrs)? {
1131 return Ok(result);
1132 }
1133
1134 self.handle_missing_block(state)
1136 }
1137
1138 fn record_forkchoice_metrics(&self) {
1140 self.canonical_in_memory_state.on_forkchoice_update_received();
1141 }
1142
1143 fn validate_forkchoice_state(
1148 &mut self,
1149 state: ForkchoiceState,
1150 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1151 if state.head_block_hash.is_zero() {
1152 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1153 }
1154
1155 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1158 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1159 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1160 }
1161
1162 if !self.backfill_sync_state.is_idle() {
1163 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1166 return Ok(Some(OnForkChoiceUpdated::syncing()));
1167 }
1168
1169 Ok(None)
1170 }
1171
1172 fn handle_canonical_head(
1178 &self,
1179 state: ForkchoiceState,
1180 attrs: &Option<T::PayloadAttributes>, ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1182 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1197 return Ok(None);
1198 }
1199
1200 trace!(target: "engine::tree", "fcu head hash is already canonical");
1201
1202 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1204 return Ok(Some(TreeOutcome::new(outcome)));
1206 }
1207
1208 if let Some(attr) = attrs {
1210 let tip = self
1211 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1212 .ok_or_else(|| {
1213 ProviderError::HeaderNotFound(state.head_block_hash.into())
1216 })?;
1217 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1219 return Ok(Some(TreeOutcome::new(updated)));
1220 }
1221
1222 Ok(Some(Self::valid_outcome(state)))
1224 }
1225
1226 fn apply_chain_update(
1238 &mut self,
1239 state: ForkchoiceState,
1240 attrs: &Option<T::PayloadAttributes>,
1241 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1242 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1244 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1245
1246 if self.engine_kind.is_opstack() ||
1249 self.config.always_process_payload_attributes_on_canonical_head()
1250 {
1251 if self.config.unwind_canonical_header() {
1257 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1258 }
1259
1260 if let Some(attr) = attrs {
1261 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1262 let updated =
1264 self.process_payload_attributes(attr.clone(), &canonical_header, state);
1265 return Ok(Some(TreeOutcome::new(updated)));
1266 }
1267 }
1268
1269 return Ok(Some(Self::valid_outcome(state)));
1280 }
1281
1282 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1284 let tip = chain_update.tip().clone_sealed_header();
1285 self.on_canonical_chain_update(chain_update);
1286
1287 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1289 return Ok(Some(TreeOutcome::new(outcome)));
1291 }
1292
1293 if let Some(attr) = attrs {
1294 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1296 return Ok(Some(TreeOutcome::new(updated)));
1297 }
1298
1299 return Ok(Some(Self::valid_outcome(state)));
1300 }
1301
1302 Ok(None)
1303 }
1304
1305 fn handle_missing_block(
1310 &self,
1311 state: ForkchoiceState,
1312 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1313 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1320 !state.safe_block_hash.is_zero() &&
1322 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1323 {
1324 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1325 state.safe_block_hash
1326 } else {
1327 state.head_block_hash
1328 };
1329
1330 let target = self.lowest_buffered_ancestor_or(target);
1331 trace!(target: "engine::tree", %target, "downloading missing block");
1332
1333 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1334 PayloadStatusEnum::Syncing,
1335 )))
1336 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1337 }
1338
1339 fn remove_blocks(&mut self, new_tip_num: u64) {
1342 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1343 if new_tip_num < self.persistence_state.last_persisted_block.number {
1344 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1345 let (tx, rx) = crossbeam_channel::bounded(1);
1346 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1347 self.persistence_state.start_remove(new_tip_num, rx);
1348 }
1349 }
1350
1351 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1354 if blocks_to_persist.is_empty() {
1355 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1356 return
1357 }
1358
1359 let highest_num_hash = blocks_to_persist
1361 .iter()
1362 .max_by_key(|block| block.recovered_block().number())
1363 .map(|b| b.recovered_block().num_hash())
1364 .expect("Checked non-empty persisting blocks");
1365
1366 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1367 let (tx, rx) = crossbeam_channel::bounded(1);
1368 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1369
1370 self.persistence_state.start_save(highest_num_hash, rx);
1371 }
1372
1373 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1378 if !self.persistence_state.in_progress() {
1379 if let Some(new_tip_num) = self.find_disk_reorg()? {
1380 self.remove_blocks(new_tip_num)
1381 } else if self.should_persist() {
1382 let blocks_to_persist =
1383 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1384 self.persist_blocks(blocks_to_persist);
1385 }
1386 }
1387
1388 Ok(())
1389 }
1390
1391 fn finish_termination(
1396 &mut self,
1397 pending_termination: oneshot::Sender<()>,
1398 ) -> Result<(), AdvancePersistenceError> {
1399 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1400 let result = self.persist_until_complete();
1401 let _ = pending_termination.send(());
1402 result
1403 }
1404
1405 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1407 loop {
1408 if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1410 debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1411 let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1412 self.on_persistence_complete(result, start_time)?;
1413 }
1414
1415 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1416
1417 if blocks_to_persist.is_empty() {
1418 debug!(target: "engine::tree", "persistence complete, signaling termination");
1419 return Ok(())
1420 }
1421
1422 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1423 self.persist_blocks(blocks_to_persist);
1424 }
1425 }
1426
1427 fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1431 let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1432 return Ok(false);
1433 };
1434
1435 match rx.try_recv() {
1436 Ok(result) => {
1437 self.on_persistence_complete(result, start_time)?;
1438 Ok(true)
1439 }
1440 Err(crossbeam_channel::TryRecvError::Empty) => {
1441 self.persistence_state.rx = Some((rx, start_time, action));
1443 Ok(false)
1444 }
1445 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1446 Err(AdvancePersistenceError::ChannelClosed)
1447 }
1448 }
1449 }
1450
1451 fn on_persistence_complete(
1453 &mut self,
1454 result: PersistenceResult,
1455 start_time: Instant,
1456 ) -> Result<(), AdvancePersistenceError> {
1457 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1458
1459 let commit_duration = result.commit_duration;
1460 let Some(BlockNumHash {
1461 hash: last_persisted_block_hash,
1462 number: last_persisted_block_number,
1463 }) = result.last_block
1464 else {
1465 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1467 return Ok(())
1468 };
1469
1470 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1471 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1472
1473 let min_threshold =
1477 last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1478 let eviction_threshold =
1479 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1480 finalized.number.min(min_threshold)
1482 } else {
1483 min_threshold
1485 };
1486 debug!(
1487 target: "engine::tree",
1488 last_persisted = last_persisted_block_number,
1489 finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1490 eviction_threshold,
1491 "Evicting changesets below threshold"
1492 );
1493 self.changeset_cache.evict(eviction_threshold);
1494
1495 self.state.tree_state.invalidate_cached_overlay();
1497
1498 self.on_new_persisted_block()?;
1499
1500 if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1504 self.runtime.spawn_blocking_named("prepare-overlay", move || {
1505 let _ = overlay.get();
1506 });
1507 }
1508
1509 self.purge_timing_stats(last_persisted_block_number, commit_duration);
1510
1511 Ok(())
1512 }
1513
1514 fn on_engine_message(
1518 &mut self,
1519 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1520 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1521 match msg {
1522 FromEngine::Event(event) => match event {
1523 FromOrchestrator::BackfillSyncStarted => {
1524 debug!(target: "engine::tree", "received backfill sync started event");
1525 self.backfill_sync_state = BackfillSyncState::Active;
1526 }
1527 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1528 self.on_backfill_sync_finished(ctrl)?;
1529 }
1530 FromOrchestrator::Terminate { tx } => {
1531 debug!(target: "engine::tree", "received terminate request");
1532 if let Err(err) = self.finish_termination(tx) {
1533 error!(target: "engine::tree", %err, "Termination failed");
1534 }
1535 return Ok(ops::ControlFlow::Break(()))
1536 }
1537 },
1538 FromEngine::Request(request) => {
1539 match request {
1540 EngineApiRequest::InsertExecutedBlock(block) => {
1541 let block_num_hash = block.recovered_block().num_hash();
1542 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1543 return Ok(ops::ControlFlow::Continue(()))
1545 }
1546
1547 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1548 let now = Instant::now();
1549
1550 if self.state.tree_state.canonical_block_hash() ==
1553 block.recovered_block().parent_hash()
1554 {
1555 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1556 self.canonical_in_memory_state.set_pending_block(block.clone());
1557 }
1558
1559 self.state.tree_state.insert_executed(block.clone());
1560 self.payload_validator.on_inserted_executed_block(block.clone());
1561 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1562 self.emit_event(EngineApiEvent::BeaconConsensus(
1563 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1564 ));
1565 }
1566 EngineApiRequest::Beacon(request) => {
1567 match request {
1568 BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1569 let has_attrs = payload_attrs.is_some();
1570
1571 let start = Instant::now();
1572 let mut output = self.on_forkchoice_updated(state, payload_attrs);
1573
1574 if let Ok(res) = &mut output {
1575 self.state
1577 .forkchoice_state_tracker
1578 .set_latest(state, res.outcome.forkchoice_status());
1579
1580 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1582 state,
1583 res.outcome.forkchoice_status(),
1584 ));
1585
1586 self.on_maybe_tree_event(res.event.take())?;
1588 }
1589
1590 if let Err(ref err) = output {
1591 error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1592 }
1593
1594 self.metrics.engine.forkchoice_updated.update_response_metrics(
1595 start,
1596 &mut self.metrics.engine.new_payload.latest_finish_at,
1597 has_attrs,
1598 &output,
1599 );
1600
1601 if let Err(err) =
1602 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1603 {
1604 self.metrics
1605 .engine
1606 .failed_forkchoice_updated_response_deliveries
1607 .increment(1);
1608 warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1609 }
1610 }
1611 BeaconEngineMessage::NewPayload { payload, tx } => {
1612 let start = Instant::now();
1613 let gas_used = payload.gas_used();
1614 let num_hash = payload.num_hash();
1615 let mut output = self.on_new_payload(payload);
1616 self.metrics.engine.new_payload.update_response_metrics(
1617 start,
1618 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1619 &output,
1620 gas_used,
1621 );
1622
1623 let maybe_event =
1624 output.as_mut().ok().and_then(|out| out.event.take());
1625
1626 if let Err(err) =
1628 tx.send(output.map(|o| o.outcome).map_err(|e| {
1629 BeaconOnNewPayloadError::Internal(Box::new(e))
1630 }))
1631 {
1632 warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1633 self.metrics
1634 .engine
1635 .failed_new_payload_response_deliveries
1636 .increment(1);
1637 }
1638
1639 self.on_maybe_tree_event(maybe_event)?;
1641 }
1642 BeaconEngineMessage::RethNewPayload {
1643 payload,
1644 wait_for_persistence,
1645 wait_for_caches,
1646 tx,
1647 enqueued_at,
1648 } => {
1649 debug!(
1650 target: "engine::tree",
1651 wait_for_persistence,
1652 wait_for_caches,
1653 "Processing reth_newPayload"
1654 );
1655
1656 let backpressure_wait = enqueued_at.elapsed();
1657
1658 let explicit_persistence_wait = if wait_for_persistence {
1659 let pending_persistence = self.persistence_state.rx.take();
1660 if let Some((rx, start_time, _action)) = pending_persistence {
1661 let (persistence_tx, persistence_rx) =
1662 std::sync::mpsc::channel();
1663 self.runtime.spawn_blocking_named(
1664 "wait-persist",
1665 move || {
1666 let start = Instant::now();
1667 let result = rx
1668 .recv()
1669 .expect("persistence state channel closed");
1670 let _ = persistence_tx.send((
1671 result,
1672 start_time,
1673 start.elapsed(),
1674 ));
1675 },
1676 );
1677 let (result, start_time, wait_duration) = persistence_rx
1678 .recv()
1679 .expect("persistence result channel closed");
1680 let _ = self.on_persistence_complete(result, start_time);
1681 wait_duration
1682 } else {
1683 Duration::ZERO
1684 }
1685 } else {
1686 Duration::ZERO
1687 };
1688
1689 let cache_wait = wait_for_caches
1690 .then(|| self.payload_validator.wait_for_caches());
1691
1692 let start = Instant::now();
1693 let gas_used = payload.gas_used();
1694 let num_hash = payload.num_hash();
1695 let mut output = self.on_new_payload(payload);
1696 self.metrics.engine.new_payload.update_response_metrics(
1697 start,
1698 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1699 &output,
1700 gas_used,
1701 );
1702
1703 let latency = enqueued_at.elapsed() - explicit_persistence_wait;
1708
1709 let maybe_event =
1710 output.as_mut().ok().and_then(|out| out.event.take());
1711
1712 let timings = NewPayloadTimings {
1713 latency,
1714 persistence_wait: backpressure_wait + explicit_persistence_wait,
1715 execution_cache_wait: cache_wait
1716 .map(|wait| wait.execution_cache),
1717 sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1718 };
1719 if let Err(err) =
1720 tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1721 BeaconOnNewPayloadError::Internal(Box::new(e))
1722 }))
1723 {
1724 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1725 self.metrics
1726 .engine
1727 .failed_new_payload_response_deliveries
1728 .increment(1);
1729 }
1730
1731 self.on_maybe_tree_event(maybe_event)?;
1732 }
1733 }
1734 }
1735 }
1736 }
1737 FromEngine::DownloadedBlocks(blocks) => {
1738 if let Some(event) = self.on_downloaded(blocks)? {
1739 self.on_tree_event(event)?;
1740 }
1741 }
1742 }
1743 Ok(ops::ControlFlow::Continue(()))
1744 }
1745
1746 fn on_backfill_sync_finished(
1760 &mut self,
1761 ctrl: ControlFlow,
1762 ) -> Result<(), InsertBlockFatalError> {
1763 debug!(target: "engine::tree", "received backfill sync finished event");
1764 self.backfill_sync_state = BackfillSyncState::Idle;
1765
1766 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1768 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1769 self.state.invalid_headers.insert(**bad_block);
1771
1772 Some(*target)
1774 } else {
1775 ctrl.block_number()
1777 };
1778
1779 let Some(backfill_height) = backfill_height else { return Ok(()) };
1781
1782 let Some(backfill_num_hash) = self
1788 .provider
1789 .block_hash(backfill_height)?
1790 .map(|hash| BlockNumHash { hash, number: backfill_height })
1791 else {
1792 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1793 return Ok(())
1794 };
1795
1796 if ctrl.is_unwind() {
1797 self.state.tree_state.reset(backfill_num_hash)
1800 } else {
1801 self.state.tree_state.remove_until(
1802 backfill_num_hash,
1803 self.persistence_state.last_persisted_block.hash,
1804 Some(backfill_num_hash),
1805 );
1806 }
1807
1808 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1809 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1810
1811 self.state.buffer.remove_old_blocks(backfill_height);
1813 self.purge_timing_stats(backfill_height, None);
1814 self.canonical_in_memory_state.clear_state();
1817
1818 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1819 self.state.tree_state.set_canonical_head(new_head.num_hash());
1822 self.persistence_state.finish(new_head.hash(), new_head.number());
1823
1824 self.canonical_in_memory_state.set_canonical_head(new_head);
1826 }
1827
1828 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1831 else {
1832 return Ok(())
1833 };
1834 if sync_target_state.finalized_block_hash.is_zero() {
1835 return Ok(())
1837 }
1838 let newest_finalized = self
1840 .state
1841 .buffer
1842 .block(&sync_target_state.finalized_block_hash)
1843 .map(|block| block.number());
1844
1845 if let Some(backfill_target) =
1851 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1852 self.backfill_sync_target(progress, finalized_number, None)
1855 })
1856 {
1857 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1859 backfill_target.into(),
1860 )));
1861 return Ok(())
1862 };
1863
1864 if let Some(lowest_buffered) =
1866 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1867 {
1868 let current_head_num = self.state.tree_state.current_canonical_head.number;
1869 let target_head_num = lowest_buffered.number();
1870
1871 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1872 {
1873 debug!(
1875 target: "engine::tree",
1876 %current_head_num,
1877 %target_head_num,
1878 %distance,
1879 "Backfill complete, downloading remaining blocks to reach FCU target"
1880 );
1881
1882 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1883 lowest_buffered.parent_hash(),
1884 distance,
1885 )));
1886 return Ok(());
1887 }
1888 } else {
1889 debug!(
1892 target: "engine::tree",
1893 head_hash = %sync_target_state.head_block_hash,
1894 "Backfill complete but head block not buffered, requesting download"
1895 );
1896 self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1897 sync_target_state.head_block_hash,
1898 )));
1899 return Ok(());
1900 }
1901
1902 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1904 }
1905
1906 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1910 if let Some(chain_update) = self.on_new_head(target)? {
1911 self.on_canonical_chain_update(chain_update);
1912 }
1913
1914 Ok(())
1915 }
1916
1917 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1919 if let Some(event) = event {
1920 self.on_tree_event(event)?;
1921 }
1922
1923 Ok(())
1924 }
1925
1926 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1930 match event {
1931 TreeEvent::TreeAction(action) => match action {
1932 TreeAction::MakeCanonical { sync_target_head } => {
1933 self.make_canonical(sync_target_head)?;
1934 }
1935 },
1936 TreeEvent::BackfillAction(action) => {
1937 self.emit_event(EngineApiEvent::BackfillAction(action));
1938 }
1939 TreeEvent::Download(action) => {
1940 self.emit_event(EngineApiEvent::Download(action));
1941 }
1942 }
1943
1944 Ok(())
1945 }
1946
1947 fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
1954 let threshold = self.config.slow_block_threshold();
1955 let check_slow = commit_duration.is_some() && threshold.is_some();
1956
1957 let keys_to_remove: Vec<B256> = self
1959 .execution_timing_stats
1960 .iter()
1961 .filter(|(_, stats)| stats.block_number <= below_number)
1962 .map(|(k, _)| *k)
1963 .collect();
1964
1965 for key in keys_to_remove {
1966 let stats = self.execution_timing_stats.remove(&key).expect("key just found");
1967 if check_slow {
1968 let commit_dur = commit_duration.expect("checked above");
1969 let total_duration =
1971 stats.execution_duration + stats.state_hash_duration + commit_dur;
1972
1973 if total_duration > threshold.expect("checked above") {
1974 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
1975 stats,
1976 commit_duration: Some(commit_dur),
1977 total_duration,
1978 }));
1979 }
1980 }
1981 }
1982 }
1983
1984 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1986 let event = event.into();
1987
1988 if event.is_backfill_action() {
1989 debug_assert_eq!(
1990 self.backfill_sync_state,
1991 BackfillSyncState::Idle,
1992 "backfill action should only be emitted when backfill is idle"
1993 );
1994
1995 if self.persistence_state.in_progress() {
1996 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1999 return
2000 }
2001
2002 self.backfill_sync_state = BackfillSyncState::Pending;
2003 self.metrics.engine.pipeline_runs.increment(1);
2004 debug!(target: "engine::tree", "emitting backfill action event");
2005 }
2006
2007 let _ = self.outgoing.send(event).inspect_err(
2008 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2009 );
2010 }
2011
2012 pub const fn should_persist(&self) -> bool {
2016 if !self.backfill_sync_state.is_idle() {
2017 return false
2019 }
2020
2021 let min_block = self.persistence_state.last_persisted_block.number;
2022 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2023 self.config.persistence_threshold()
2024 }
2025
2026 fn get_canonical_blocks_to_persist(
2029 &self,
2030 target: PersistTarget,
2031 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2032 debug_assert!(!self.persistence_state.in_progress());
2035
2036 let mut blocks_to_persist = Vec::new();
2037 let mut current_hash = self.state.tree_state.canonical_block_hash();
2038 let last_persisted_number = self.persistence_state.last_persisted_block.number;
2039 let canonical_head_number = self.state.tree_state.canonical_block_number();
2040
2041 let target_number = match target {
2042 PersistTarget::Head => canonical_head_number,
2043 PersistTarget::Threshold => {
2044 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2045 }
2046 };
2047
2048 debug!(
2049 target: "engine::tree",
2050 ?current_hash,
2051 ?last_persisted_number,
2052 ?canonical_head_number,
2053 ?target_number,
2054 "Returning canonical blocks to persist"
2055 );
2056 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
2057 if block.recovered_block().number() <= last_persisted_number {
2058 break;
2059 }
2060
2061 if block.recovered_block().number() <= target_number {
2062 blocks_to_persist.push(block.clone());
2063 }
2064
2065 current_hash = block.recovered_block().parent_hash();
2066 }
2067
2068 blocks_to_persist.reverse();
2070
2071 Ok(blocks_to_persist)
2072 }
2073
2074 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2082 if let Some(remove_above) = self.find_disk_reorg()? {
2085 self.remove_blocks(remove_above);
2086 return Ok(())
2087 }
2088
2089 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2090 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2091 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2092 number: self.persistence_state.last_persisted_block.number,
2093 hash: self.persistence_state.last_persisted_block.hash,
2094 });
2095 Ok(())
2096 }
2097
2098 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2105 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2106 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2107 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2109 return Ok(block.clone())
2110 }
2111
2112 let (block, senders) = self
2113 .provider
2114 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2115 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2116 .split_sealed();
2117 let mut execution_output = self
2118 .provider
2119 .get_state(block.header().number())?
2120 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2121 let hashed_state = self.provider.hashed_post_state(execution_output.state());
2122
2123 debug!(
2124 target: "engine::tree",
2125 number = ?block.number(),
2126 "computing block trie updates",
2127 );
2128 let db_provider = self.provider.database_provider_ro()?;
2129 let trie_updates = reth_trie_db::compute_block_trie_updates(
2130 &self.changeset_cache,
2131 &db_provider,
2132 block.number(),
2133 )?;
2134
2135 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2136 let sorted_trie_updates = Arc::new(trie_updates);
2137 let trie_data =
2139 ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2140
2141 let execution_output = Arc::new(BlockExecutionOutput {
2142 state: execution_output.bundle,
2143 result: BlockExecutionResult {
2144 receipts: execution_output.receipts.pop().unwrap_or_default(),
2145 requests: execution_output.requests.pop().unwrap_or_default(),
2146 gas_used: block.gas_used(),
2147 blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2148 },
2149 });
2150
2151 Ok(ExecutedBlock::new(
2152 Arc::new(RecoveredBlock::new_sealed(block, senders)),
2153 execution_output,
2154 trie_data,
2155 ))
2156 }
2157
2158 fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2162 if self.state.tree_state.contains_hash(&hash) {
2163 Ok(true)
2164 } else {
2165 self.provider.is_known(hash)
2166 }
2167 }
2168
2169 fn sealed_header_by_hash(
2171 &self,
2172 hash: B256,
2173 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2174 let header = self.state.tree_state.sealed_header_by_hash(&hash);
2176
2177 if header.is_some() {
2178 Ok(header)
2179 } else {
2180 self.provider.sealed_header_by_hash(hash)
2181 }
2182 }
2183
2184 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2191 self.state
2192 .buffer
2193 .lowest_ancestor(&hash)
2194 .map(|block| block.parent_hash())
2195 .unwrap_or_else(|| hash)
2196 }
2197
2198 fn latest_valid_hash_for_invalid_payload(
2209 &mut self,
2210 parent_hash: B256,
2211 ) -> ProviderResult<Option<B256>> {
2212 if self.has_block_by_hash(parent_hash)? {
2214 return Ok(Some(parent_hash))
2215 }
2216
2217 let mut current_hash = parent_hash;
2220 let mut current_block = self.state.invalid_headers.get(¤t_hash);
2221 while let Some(block_with_parent) = current_block {
2222 current_hash = block_with_parent.parent;
2223 current_block = self.state.invalid_headers.get(¤t_hash);
2224
2225 if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2228 return Ok(Some(current_hash))
2229 }
2230 }
2231 Ok(None)
2232 }
2233
2234 fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2238 let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2239 Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2243 Some(_) => Some(parent_hash),
2244 None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2245 };
2246
2247 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2248 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2249 })
2250 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2251 }
2252
2253 fn is_sync_target_head(&self, block_hash: B256) -> bool {
2257 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2258 return target.head_block_hash == block_hash
2259 }
2260 false
2261 }
2262
2263 fn is_any_sync_target(&self, block_hash: B256) -> bool {
2267 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2268 return target.contains(block_hash)
2269 }
2270 false
2271 }
2272
2273 fn check_invalid_ancestor_with_head(
2279 &mut self,
2280 check: B256,
2281 head: &SealedBlock<N::Block>,
2282 ) -> ProviderResult<Option<PayloadStatus>> {
2283 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2285
2286 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2287 }
2288
2289 fn on_invalid_new_payload(
2291 &mut self,
2292 head: SealedBlock<N::Block>,
2293 invalid: BlockWithParent,
2294 ) -> ProviderResult<PayloadStatus> {
2295 let status = self.prepare_invalid_response(invalid.parent)?;
2297
2298 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2300 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2301
2302 Ok(status)
2303 }
2304
2305 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2319 let parent_hash = payload.parent_hash();
2320 let block_hash = payload.block_hash();
2321
2322 if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2324 return Some(entry);
2325 }
2326
2327 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2328 if lowest_buffered_ancestor == block_hash {
2329 lowest_buffered_ancestor = parent_hash;
2330 }
2331
2332 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2334 }
2335
2336 fn handle_invalid_ancestor_payload(
2345 &mut self,
2346 payload: T::ExecutionData,
2347 invalid: BlockWithParent,
2348 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2349 let parent_hash = payload.parent_hash();
2350 let num_hash = payload.num_hash();
2351
2352 let block = match self.payload_validator.convert_payload_to_block(payload) {
2358 Ok(block) => block,
2359 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2360 };
2361
2362 Ok(self.on_invalid_new_payload(block, invalid)?)
2363 }
2364
2365 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2368 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2370
2371 match self.prepare_invalid_response(header.parent) {
2373 Ok(status) => Ok(Some(status)),
2374 Err(err) => {
2375 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2376 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2378 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2379 })))
2380 }
2381 }
2382 }
2383
2384 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2387 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2388 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2389 return Err(e)
2390 }
2391
2392 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2393 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2394 return Err(e)
2395 }
2396
2397 Ok(())
2398 }
2399
2400 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2402 fn try_connect_buffered_blocks(
2403 &mut self,
2404 parent: BlockNumHash,
2405 ) -> Result<(), InsertBlockFatalError> {
2406 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2407
2408 if blocks.is_empty() {
2409 return Ok(())
2411 }
2412
2413 let now = Instant::now();
2414 let block_count = blocks.len();
2415 for child in blocks {
2416 let child_num_hash = child.num_hash();
2417 match self.insert_block(child) {
2418 Ok(res) => {
2419 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2420 if self.is_any_sync_target(child_num_hash.hash) &&
2421 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2422 {
2423 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2424 self.make_canonical(child_num_hash.hash)?;
2427 }
2428 }
2429 Err(err) => {
2430 if let InsertPayloadError::Block(err) = err {
2431 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2432 if let Err(fatal) = self.on_insert_block_error(err) {
2433 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2434 return Err(fatal)
2435 }
2436 }
2437 }
2438 }
2439 }
2440
2441 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2442 Ok(())
2443 }
2444
2445 fn buffer_block(
2447 &mut self,
2448 block: SealedBlock<N::Block>,
2449 ) -> Result<(), InsertBlockError<N::Block>> {
2450 if let Err(err) = self.validate_block(&block) {
2451 return Err(InsertBlockError::consensus_error(err, block))
2452 }
2453 self.state.buffer.insert_block(block);
2454 Ok(())
2455 }
2456
2457 #[inline]
2462 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2463 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2464 }
2465
2466 #[inline]
2469 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2470 if block > local_tip {
2471 Some(block - local_tip)
2472 } else {
2473 None
2474 }
2475 }
2476
2477 fn backfill_sync_target(
2484 &self,
2485 canonical_tip_num: u64,
2486 target_block_number: u64,
2487 downloaded_block: Option<BlockNumHash>,
2488 ) -> Option<B256> {
2489 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2490
2491 let exceeds_backfill_threshold =
2493 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2494 (Some(downloaded_block), Some(state))
2496 if downloaded_block.hash == state.finalized_block_hash =>
2497 {
2498 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2499 }
2500 _ => match sync_target_state
2501 .as_ref()
2502 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2503 {
2504 Some(buffered_finalized) => {
2505 self.exceeds_backfill_run_threshold(
2508 canonical_tip_num,
2509 buffered_finalized.number(),
2510 )
2511 }
2512 None => {
2513 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2515 }
2516 },
2517 };
2518
2519 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2521 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2523 Err(err) => {
2524 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2525 }
2526 Ok(None) => {
2527 if !state.finalized_block_hash.is_zero() {
2529 return Some(state.finalized_block_hash)
2532 }
2533
2534 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2547 return Some(state.head_block_hash)
2548 }
2549 Ok(Some(_)) => {
2550 }
2552 }
2553 }
2554
2555 None
2556 }
2557
2558 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2561 let mut canonical = self.state.tree_state.current_canonical_head;
2562 let mut persisted = self.persistence_state.last_persisted_block;
2563
2564 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2565 Ok(self
2566 .sealed_header_by_hash(num_hash.hash)?
2567 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2568 .parent_num_hash())
2569 };
2570
2571 while canonical.number > persisted.number {
2574 canonical = parent_num_hash(canonical)?;
2575 }
2576
2577 if canonical == persisted {
2579 return Ok(None);
2580 }
2581
2582 while persisted.number > canonical.number {
2588 persisted = parent_num_hash(persisted)?;
2589 }
2590
2591 debug_assert_eq!(persisted.number, canonical.number);
2592
2593 while persisted.hash != canonical.hash {
2595 canonical = parent_num_hash(canonical)?;
2596 persisted = parent_num_hash(persisted)?;
2597 }
2598
2599 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2600
2601 Ok(Some(persisted.number))
2602 }
2603
2604 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2608 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2609 let start = Instant::now();
2610
2611 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2613
2614 let tip = chain_update.tip().clone_sealed_header();
2615 let notification = chain_update.to_chain_notification();
2616
2617 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2619 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2620 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2621 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2622
2623 self.update_reorg_metrics(old.len(), old_first);
2624 self.reinsert_reorged_blocks(new.clone());
2625 self.reinsert_reorged_blocks(old.clone());
2626 }
2627
2628 self.canonical_in_memory_state.update_chain(chain_update);
2630 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2631
2632 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2634
2635 self.canonical_in_memory_state.notify_canon_state(notification);
2637
2638 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2640 Box::new(tip),
2641 start.elapsed(),
2642 ));
2643 }
2644
2645 fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2647 if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2648 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2649 first_reorged_block <= finalized.number
2650 {
2651 self.metrics.tree.reorgs.finalized.increment(1);
2652 } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2653 first_reorged_block <= safe.number
2654 {
2655 self.metrics.tree.reorgs.safe.increment(1);
2656 } else {
2657 self.metrics.tree.reorgs.head.increment(1);
2658 }
2659 } else {
2660 debug_unreachable!("Reorged chain doesn't have any blocks");
2661 }
2662 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2663 }
2664
2665 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2667 for block in new_chain {
2668 if self
2669 .state
2670 .tree_state
2671 .executed_block_by_hash(block.recovered_block().hash())
2672 .is_none()
2673 {
2674 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2675 self.state.tree_state.insert_executed(block);
2676 }
2677 }
2678 }
2679
2680 fn on_disconnected_downloaded_block(
2685 &self,
2686 downloaded_block: BlockNumHash,
2687 missing_parent: BlockNumHash,
2688 head: BlockNumHash,
2689 ) -> Option<TreeEvent> {
2690 if let Some(target) =
2692 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2693 {
2694 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2695 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2696 }
2697
2698 let request = if let Some(distance) =
2708 self.distance_from_local_tip(head.number, missing_parent.number)
2709 {
2710 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2711 DownloadRequest::BlockRange(missing_parent.hash, distance)
2712 } else {
2713 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2714 DownloadRequest::single_block(missing_parent.hash)
2717 };
2718
2719 Some(TreeEvent::Download(request))
2720 }
2721
2722 fn on_valid_downloaded_block(
2729 &mut self,
2730 block_num_hash: BlockNumHash,
2731 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2732 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2735 sync_target.contains(block_num_hash.hash)
2736 {
2737 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2738
2739 if sync_target.head_block_hash == block_num_hash.hash {
2740 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2742 sync_target_head: block_num_hash.hash,
2743 })))
2744 }
2745
2746 self.make_canonical(block_num_hash.hash)?;
2750 self.try_connect_buffered_blocks(block_num_hash)?;
2751
2752 if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2755 let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2756 trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2757 return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2758 }
2759
2760 return Ok(None)
2761 }
2762 trace!(target: "engine::tree", "appended downloaded block");
2763 self.try_connect_buffered_blocks(block_num_hash)?;
2764 Ok(None)
2765 }
2766
2767 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2773 fn on_downloaded_block(
2774 &mut self,
2775 block: SealedBlock<N::Block>,
2776 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2777 let block_num_hash = block.num_hash();
2778 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2779 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2780 return Ok(None)
2781 }
2782
2783 if !self.backfill_sync_state.is_idle() {
2784 return Ok(None)
2785 }
2786
2787 match self.insert_block(block) {
2789 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2790 return self.on_valid_downloaded_block(block_num_hash);
2791 }
2792 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2793 return Ok(self.on_disconnected_downloaded_block(
2796 block_num_hash,
2797 missing_ancestor,
2798 head,
2799 ))
2800 }
2801 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2802 trace!(target: "engine::tree", "downloaded block already executed");
2803 }
2804 Err(err) => {
2805 if let InsertPayloadError::Block(err) = err {
2806 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2807 if let Err(fatal) = self.on_insert_block_error(err) {
2808 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2809 return Err(fatal)
2810 }
2811 }
2812 }
2813 }
2814 Ok(None)
2815 }
2816
2817 fn insert_payload(
2826 &mut self,
2827 payload: T::ExecutionData,
2828 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2829 self.insert_block_or_payload(
2830 payload.block_with_parent(),
2831 payload,
2832 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2833 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2834 )
2835 }
2836
2837 fn insert_block(
2838 &mut self,
2839 block: SealedBlock<N::Block>,
2840 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2841 self.insert_block_or_payload(
2842 block.block_with_parent(),
2843 block,
2844 |validator, block, ctx| validator.validate_block(block, ctx),
2845 |_, block| Ok(block),
2846 )
2847 }
2848
2849 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2866 fn insert_block_or_payload<Input, Err>(
2867 &mut self,
2868 block_id: BlockWithParent,
2869 input: Input,
2870 execute: impl FnOnce(
2871 &mut V,
2872 Input,
2873 TreeCtx<'_, N>,
2874 )
2875 -> Result<(ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>), Err>,
2876 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2877 ) -> Result<InsertPayloadOk, Err>
2878 where
2879 Err: From<InsertBlockError<N::Block>>,
2880 {
2881 let block_insert_start = Instant::now();
2882 let block_num_hash = block_id.block;
2883 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2884
2885 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2887 convert_to_block(self, input)?;
2888 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2889 }
2890
2891 if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2894 match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2895 Err(err) => {
2896 let block = convert_to_block(self, input)?;
2897 return Err(InsertBlockError::new(block, err.into()).into());
2898 }
2899 Ok(Some(_)) => {
2900 convert_to_block(self, input)?;
2901 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2902 }
2903 Ok(None) => {}
2904 }
2905 }
2906
2907 match self.state_provider_builder(block_id.parent) {
2909 Err(err) => {
2910 let block = convert_to_block(self, input)?;
2911 return Err(InsertBlockError::new(block, err.into()).into());
2912 }
2913 Ok(None) => {
2914 let block = convert_to_block(self, input)?;
2915
2916 let missing_ancestor = self
2919 .state
2920 .buffer
2921 .lowest_ancestor(&block.parent_hash())
2922 .map(|block| block.parent_num_hash())
2923 .unwrap_or_else(|| block.parent_num_hash());
2924
2925 self.state.buffer.insert_block(block);
2926
2927 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2928 head: self.state.tree_state.current_canonical_head,
2929 missing_ancestor,
2930 }))
2931 }
2932 Ok(Some(_)) => {}
2933 }
2934
2935 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2940
2941 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2942
2943 let start = Instant::now();
2944
2945 let (executed, timing_stats) = execute(&mut self.payload_validator, input, ctx)?;
2946
2947 if let Some(stats) = timing_stats {
2950 if let Some(threshold) = self.config.slow_block_threshold() {
2951 let total_duration = stats.execution_duration + stats.state_hash_duration;
2952 if total_duration > threshold {
2953 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2954 stats: stats.clone(),
2955 commit_duration: None,
2956 total_duration,
2957 }));
2958 }
2959 }
2960 self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
2961 }
2962
2963 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2965 {
2966 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2967 self.canonical_in_memory_state.set_pending_block(executed.clone());
2968 }
2969
2970 self.state.tree_state.insert_executed(executed.clone());
2971 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2972
2973 let elapsed = start.elapsed();
2975 let engine_event = if is_fork {
2976 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2977 } else {
2978 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2979 };
2980 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2981
2982 self.metrics
2983 .engine
2984 .block_insert_total_duration
2985 .record(block_insert_start.elapsed().as_secs_f64());
2986 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2987 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2988 }
2989
2990 fn on_insert_block_error(
2996 &mut self,
2997 error: InsertBlockError<N::Block>,
2998 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2999 let (block, error) = error.split();
3000
3001 let validation_err = error.ensure_validation_error()?;
3004
3005 warn!(
3009 target: "engine::tree",
3010 invalid_hash=%block.hash(),
3011 invalid_number=block.number(),
3012 %validation_err,
3013 "Invalid block error on new payload",
3014 );
3015 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3016
3017 self.state.invalid_headers.insert(block.block_with_parent());
3019 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
3020 Box::new(block),
3021 )));
3022
3023 Ok(PayloadStatus::new(
3024 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3025 latest_valid_hash,
3026 ))
3027 }
3028
3029 fn on_new_payload_error(
3031 &mut self,
3032 error: NewPayloadError,
3033 payload_num_hash: NumHash,
3034 parent_hash: B256,
3035 ) -> ProviderResult<PayloadStatus> {
3036 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3037 let latest_valid_hash =
3040 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3041 None
3045 } else {
3046 self.latest_valid_hash_for_invalid_payload(parent_hash)?
3047 };
3048
3049 let status = PayloadStatusEnum::from(error);
3050 Ok(PayloadStatus::new(status, latest_valid_hash))
3051 }
3052
3053 pub fn find_canonical_header(
3055 &self,
3056 hash: B256,
3057 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3058 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3059
3060 if canonical.is_none() {
3061 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3062 }
3063
3064 Ok(canonical)
3065 }
3066
3067 fn update_finalized_block(
3069 &self,
3070 finalized_block_hash: B256,
3071 ) -> Result<(), OnForkChoiceUpdated> {
3072 if finalized_block_hash.is_zero() {
3073 return Ok(())
3074 }
3075
3076 match self.find_canonical_header(finalized_block_hash) {
3077 Ok(None) => {
3078 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3079 return Err(OnForkChoiceUpdated::invalid_state())
3081 }
3082 Ok(Some(finalized)) => {
3083 if Some(finalized.num_hash()) !=
3084 self.canonical_in_memory_state.get_finalized_num_hash()
3085 {
3086 let _ = self.persistence.save_finalized_block_number(finalized.number());
3089 self.canonical_in_memory_state.set_finalized(finalized.clone());
3090 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3092 }
3093 }
3094 Err(err) => {
3095 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3096 }
3097 }
3098
3099 Ok(())
3100 }
3101
3102 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3104 if safe_block_hash.is_zero() {
3105 return Ok(())
3106 }
3107
3108 match self.find_canonical_header(safe_block_hash) {
3109 Ok(None) => {
3110 debug!(target: "engine::tree", "Safe block not found in canonical chain");
3111 return Err(OnForkChoiceUpdated::invalid_state())
3113 }
3114 Ok(Some(safe)) => {
3115 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3116 let _ = self.persistence.save_safe_block_number(safe.number());
3119 self.canonical_in_memory_state.set_safe(safe.clone());
3120 self.metrics.tree.safe_block_height.set(safe.number() as f64);
3122 }
3123 }
3124 Err(err) => {
3125 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3126 }
3127 }
3128
3129 Ok(())
3130 }
3131
3132 fn ensure_consistent_forkchoice_state(
3141 &self,
3142 state: ForkchoiceState,
3143 ) -> Result<(), OnForkChoiceUpdated> {
3144 self.update_finalized_block(state.finalized_block_hash)?;
3150
3151 self.update_safe_block(state.safe_block_hash)
3157 }
3158
3159 fn process_payload_attributes(
3174 &self,
3175 attributes: T::PayloadAttributes,
3176 head: &N::BlockHeader,
3177 state: ForkchoiceState,
3178 ) -> OnForkChoiceUpdated {
3179 if let Err(err) =
3180 self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3181 {
3182 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3183 return OnForkChoiceUpdated::invalid_payload_attributes()
3184 }
3185
3186 let cache = if self.config.share_execution_cache_with_payload_builder() {
3192 self.payload_validator.cache_for(state.head_block_hash)
3193 } else {
3194 None
3195 };
3196
3197 let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3198 self.payload_validator.sparse_trie_handle_for(
3199 state.head_block_hash,
3200 head.state_root(),
3201 &self.state,
3202 )
3203 } else {
3204 None
3205 };
3206
3207 let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3210 parent_hash: state.head_block_hash,
3211 attributes,
3212 cache,
3213 trie_handle,
3214 });
3215
3216 OnForkChoiceUpdated::updated_with_pending_payload_id(
3228 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3229 pending_payload_id,
3230 )
3231 }
3232
3233 pub(crate) fn remove_before(
3240 &mut self,
3241 upper_bound: BlockNumHash,
3242 finalized_hash: Option<B256>,
3243 ) -> ProviderResult<()> {
3244 let num = if let Some(hash) = finalized_hash {
3247 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3248 } else {
3249 None
3250 };
3251
3252 self.state.tree_state.remove_until(
3253 upper_bound,
3254 self.persistence_state.last_persisted_block.hash,
3255 num,
3256 );
3257 Ok(())
3258 }
3259
3260 pub fn state_provider_builder(
3265 &self,
3266 hash: B256,
3267 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3268 where
3269 P: BlockReader + StateProviderFactory + StateReader + Clone,
3270 {
3271 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3272 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3273 return Ok(Some(StateProviderBuilder::new(
3275 self.provider.clone(),
3276 historical,
3277 Some(blocks),
3278 )))
3279 }
3280
3281 if let Some(header) = self.provider.header(hash)? {
3283 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3284 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3287 }
3288
3289 debug!(target: "engine::tree", %hash, "no canonical state found for block");
3290 Ok(None)
3291 }
3292}
3293
3294#[derive(Debug)]
3296enum LoopEvent<T, N>
3297where
3298 N: NodePrimitives,
3299 T: PayloadTypes,
3300{
3301 EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3303 PersistenceComplete {
3305 result: PersistenceResult,
3307 start_time: Instant,
3309 },
3310 Disconnected,
3312}
3313
3314#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3320pub enum BlockStatus {
3321 Valid,
3323 Disconnected {
3325 head: BlockNumHash,
3327 missing_ancestor: BlockNumHash,
3329 },
3330}
3331
3332#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3337pub enum InsertPayloadOk {
3338 AlreadySeen(BlockStatus),
3340 Inserted(BlockStatus),
3342}
3343
3344#[derive(Debug, Clone, Copy)]
3346enum PersistTarget {
3347 Threshold,
3349 Head,
3351}
3352
3353#[derive(Debug, Clone, Copy, Default)]
3355pub struct CacheWaitDurations {
3356 pub execution_cache: Duration,
3358 pub sparse_trie: Duration,
3360}
3361
3362pub trait WaitForCaches {
3367 fn wait_for_caches(&self) -> CacheWaitDurations;
3371}