1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
17 NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22 ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{
28 BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
29};
30use reth_primitives_traits::{
31 FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
32};
33use reth_provider::{
34 BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
35 DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
36 StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
37 StorageSettingsCache, TransactionVariant,
38};
39use reth_revm::database::StateProviderDatabase;
40use reth_stages_api::ControlFlow;
41use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
42use reth_trie_db::ChangesetCache;
43use revm::interpreter::debug_unreachable;
44use state::TreeState;
45use std::{fmt::Debug, ops, sync::Arc, time::Duration};
46
47use crossbeam_channel::{Receiver, Sender};
48use tokio::sync::{
49 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
50 oneshot,
51};
52use tracing::*;
53
54mod block_buffer;
55mod cached_state;
56pub mod error;
57pub mod instrumented_state;
58mod invalid_headers;
59mod metrics;
60pub mod payload_processor;
61pub mod payload_validator;
62mod persistence_state;
63pub mod precompile_cache;
64#[cfg(test)]
65mod tests;
66mod trie_updates;
67
68use crate::tree::error::AdvancePersistenceError;
69pub use block_buffer::BlockBuffer;
70pub use cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache};
71pub use invalid_headers::InvalidHeaderCache;
72pub use metrics::EngineApiMetrics;
73pub use payload_processor::*;
74pub use payload_validator::{BasicEngineValidator, EngineValidator};
75pub use persistence_state::PersistenceState;
76pub use reth_engine_primitives::TreeConfig;
77
78pub mod state;
79
80pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
96
97#[derive(Clone, Debug)]
99pub struct StateProviderBuilder<N: NodePrimitives, P> {
100 provider_factory: P,
102 historical: B256,
104 overlay: Option<Vec<ExecutedBlock<N>>>,
106}
107
108impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
109 pub const fn new(
112 provider_factory: P,
113 historical: B256,
114 overlay: Option<Vec<ExecutedBlock<N>>>,
115 ) -> Self {
116 Self { provider_factory, historical, overlay }
117 }
118}
119
120impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
121where
122 P: BlockReader + StateProviderFactory + StateReader + Clone,
123{
124 pub fn build(&self) -> ProviderResult<StateProviderBox> {
126 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
127 if let Some(overlay) = self.overlay.clone() {
128 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
129 }
130 Ok(provider)
131 }
132}
133
134#[derive(Debug)]
138pub struct EngineApiTreeState<N: NodePrimitives> {
139 tree_state: TreeState<N>,
141 forkchoice_state_tracker: ForkchoiceStateTracker,
143 buffer: BlockBuffer<N::Block>,
145 invalid_headers: InvalidHeaderCache,
148}
149
150impl<N: NodePrimitives> EngineApiTreeState<N> {
151 fn new(
152 block_buffer_limit: u32,
153 max_invalid_header_cache_length: u32,
154 canonical_block: BlockNumHash,
155 engine_kind: EngineApiKind,
156 ) -> Self {
157 Self {
158 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
159 buffer: BlockBuffer::new(block_buffer_limit),
160 tree_state: TreeState::new(canonical_block, engine_kind),
161 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
162 }
163 }
164
165 pub const fn tree_state(&self) -> &TreeState<N> {
167 &self.tree_state
168 }
169
170 pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
172 self.invalid_headers.get(hash).is_some()
173 }
174}
175
176#[derive(Debug)]
178pub struct TreeOutcome<T> {
179 pub outcome: T,
181 pub event: Option<TreeEvent>,
183}
184
185impl<T> TreeOutcome<T> {
186 pub const fn new(outcome: T) -> Self {
188 Self { outcome, event: None }
189 }
190
191 pub fn with_event(mut self, event: TreeEvent) -> Self {
193 self.event = Some(event);
194 self
195 }
196}
197
198#[derive(Debug)]
200pub enum TreeEvent {
201 TreeAction(TreeAction),
203 BackfillAction(BackfillAction),
205 Download(DownloadRequest),
207}
208
209impl TreeEvent {
210 const fn is_backfill_action(&self) -> bool {
212 matches!(self, Self::BackfillAction(_))
213 }
214}
215
216#[derive(Debug)]
218pub enum TreeAction {
219 MakeCanonical {
221 sync_target_head: B256,
223 },
224}
225
226pub struct EngineApiTreeHandler<N, P, T, V, C>
231where
232 N: NodePrimitives,
233 T: PayloadTypes,
234 C: ConfigureEvm<Primitives = N> + 'static,
235{
236 provider: P,
237 consensus: Arc<dyn FullConsensus<N>>,
238 payload_validator: V,
239 state: EngineApiTreeState<N>,
241 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
250 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
252 outgoing: UnboundedSender<EngineApiEvent<N>>,
254 persistence: PersistenceHandle<N>,
256 persistence_state: PersistenceState,
258 backfill_sync_state: BackfillSyncState,
260 canonical_in_memory_state: CanonicalInMemoryState<N>,
263 payload_builder: PayloadBuilderHandle<T>,
266 config: TreeConfig,
268 metrics: EngineApiMetrics,
270 engine_kind: EngineApiKind,
272 evm_config: C,
274 changeset_cache: ChangesetCache,
276 use_hashed_state: bool,
279 runtime: reth_tasks::Runtime,
281}
282
283impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
284 for EngineApiTreeHandler<N, P, T, V, C>
285where
286 N: NodePrimitives,
287 C: Debug + ConfigureEvm<Primitives = N>,
288{
289 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290 f.debug_struct("EngineApiTreeHandler")
291 .field("provider", &self.provider)
292 .field("consensus", &self.consensus)
293 .field("payload_validator", &self.payload_validator)
294 .field("state", &self.state)
295 .field("incoming_tx", &self.incoming_tx)
296 .field("persistence", &self.persistence)
297 .field("persistence_state", &self.persistence_state)
298 .field("backfill_sync_state", &self.backfill_sync_state)
299 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
300 .field("payload_builder", &self.payload_builder)
301 .field("config", &self.config)
302 .field("metrics", &self.metrics)
303 .field("engine_kind", &self.engine_kind)
304 .field("evm_config", &self.evm_config)
305 .field("changeset_cache", &self.changeset_cache)
306 .field("use_hashed_state", &self.use_hashed_state)
307 .field("runtime", &self.runtime)
308 .finish()
309 }
310}
311
312impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
313where
314 N: NodePrimitives,
315 P: DatabaseProviderFactory
316 + BlockReader<Block = N::Block, Header = N::BlockHeader>
317 + StateProviderFactory
318 + StateReader<Receipt = N::Receipt>
319 + HashedPostStateProvider
320 + Clone
321 + 'static,
322 P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
323 + StageCheckpointReader
324 + ChangeSetReader
325 + StorageChangeSetReader
326 + StorageSettingsCache,
327 C: ConfigureEvm<Primitives = N> + 'static,
328 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
329 V: EngineValidator<T> + WaitForCaches,
330{
331 #[expect(clippy::too_many_arguments)]
333 pub fn new(
334 provider: P,
335 consensus: Arc<dyn FullConsensus<N>>,
336 payload_validator: V,
337 outgoing: UnboundedSender<EngineApiEvent<N>>,
338 state: EngineApiTreeState<N>,
339 canonical_in_memory_state: CanonicalInMemoryState<N>,
340 persistence: PersistenceHandle<N>,
341 persistence_state: PersistenceState,
342 payload_builder: PayloadBuilderHandle<T>,
343 config: TreeConfig,
344 engine_kind: EngineApiKind,
345 evm_config: C,
346 changeset_cache: ChangesetCache,
347 use_hashed_state: bool,
348 runtime: reth_tasks::Runtime,
349 ) -> Self {
350 let (incoming_tx, incoming) = crossbeam_channel::unbounded();
351
352 Self {
353 provider,
354 consensus,
355 payload_validator,
356 incoming,
357 outgoing,
358 persistence,
359 persistence_state,
360 backfill_sync_state: BackfillSyncState::Idle,
361 state,
362 canonical_in_memory_state,
363 payload_builder,
364 config,
365 metrics: Default::default(),
366 incoming_tx,
367 engine_kind,
368 evm_config,
369 changeset_cache,
370 use_hashed_state,
371 runtime,
372 }
373 }
374
375 #[expect(clippy::complexity)]
381 pub fn spawn_new(
382 provider: P,
383 consensus: Arc<dyn FullConsensus<N>>,
384 payload_validator: V,
385 persistence: PersistenceHandle<N>,
386 payload_builder: PayloadBuilderHandle<T>,
387 canonical_in_memory_state: CanonicalInMemoryState<N>,
388 config: TreeConfig,
389 kind: EngineApiKind,
390 evm_config: C,
391 changeset_cache: ChangesetCache,
392 use_hashed_state: bool,
393 runtime: reth_tasks::Runtime,
394 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
395 {
396 let best_block_number = provider.best_block_number().unwrap_or(0);
397 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
398
399 let persistence_state = PersistenceState {
400 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
401 rx: None,
402 };
403
404 let (tx, outgoing) = unbounded_channel();
405 let state = EngineApiTreeState::new(
406 config.block_buffer_limit(),
407 config.max_invalid_header_cache_length(),
408 header.num_hash(),
409 kind,
410 );
411
412 let task = Self::new(
413 provider,
414 consensus,
415 payload_validator,
416 tx,
417 state,
418 canonical_in_memory_state,
419 persistence,
420 persistence_state,
421 payload_builder,
422 config,
423 kind,
424 evm_config,
425 changeset_cache,
426 use_hashed_state,
427 runtime,
428 );
429 let incoming = task.incoming_tx.clone();
430 spawn_os_thread("engine", || {
431 increase_thread_priority();
432 task.run()
433 });
434 (incoming, outgoing)
435 }
436
437 fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
439 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
440 PayloadStatusEnum::Valid,
441 Some(state.head_block_hash),
442 )))
443 }
444
445 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
447 self.incoming_tx.clone()
448 }
449
450 pub fn run(mut self) {
454 loop {
455 match self.wait_for_event() {
456 LoopEvent::EngineMessage(msg) => {
457 debug!(target: "engine::tree", %msg, "received new engine message");
458 match self.on_engine_message(msg) {
459 Ok(ops::ControlFlow::Break(())) => return,
460 Ok(ops::ControlFlow::Continue(())) => {}
461 Err(fatal) => {
462 error!(target: "engine::tree", %fatal, "insert block fatal error");
463 return
464 }
465 }
466 }
467 LoopEvent::PersistenceComplete { result, start_time } => {
468 if let Err(err) = self.on_persistence_complete(result, start_time) {
469 error!(target: "engine::tree", %err, "Persistence complete handling failed");
470 return
471 }
472 }
473 LoopEvent::Disconnected => {
474 error!(target: "engine::tree", "Channel disconnected");
475 return
476 }
477 }
478
479 if let Err(err) = self.advance_persistence() {
484 error!(target: "engine::tree", %err, "Advancing persistence failed");
485 return
486 }
487 }
488 }
489
490 fn wait_for_event(&mut self) -> LoopEvent<T, N> {
496 let maybe_persistence = self.persistence_state.rx.take();
498
499 if let Some((persistence_rx, start_time, action)) = maybe_persistence {
500 crossbeam_channel::select_biased! {
503 recv(persistence_rx) -> result => {
504 match result {
506 Ok(value) => LoopEvent::PersistenceComplete {
507 result: value,
508 start_time,
509 },
510 Err(_) => LoopEvent::Disconnected,
511 }
512 },
513 recv(self.incoming) -> msg => {
514 self.persistence_state.rx = Some((persistence_rx, start_time, action));
516 match msg {
517 Ok(m) => LoopEvent::EngineMessage(m),
518 Err(_) => LoopEvent::Disconnected,
519 }
520 },
521 }
522 } else {
523 match self.incoming.recv() {
525 Ok(m) => LoopEvent::EngineMessage(m),
526 Err(_) => LoopEvent::Disconnected,
527 }
528 }
529 }
530
531 fn on_downloaded(
537 &mut self,
538 mut blocks: Vec<SealedBlock<N::Block>>,
539 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
540 if blocks.is_empty() {
541 return Ok(None)
543 }
544
545 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
546 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
547 for block in blocks.drain(..batch) {
548 if let Some(event) = self.on_downloaded_block(block)? {
549 let needs_backfill = event.is_backfill_action();
550 self.on_tree_event(event)?;
551 if needs_backfill {
552 return Ok(None)
554 }
555 }
556 }
557
558 if !blocks.is_empty() {
560 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
561 }
562
563 Ok(None)
564 }
565
566 #[instrument(
581 level = "debug",
582 target = "engine::tree",
583 skip_all,
584 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
585 )]
586 fn on_new_payload(
587 &mut self,
588 payload: T::ExecutionData,
589 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
590 trace!(target: "engine::tree", "invoked new payload");
591
592 let start = Instant::now();
594
595 let num_hash = payload.num_hash();
622 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
623 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
624
625 let block_hash = num_hash.hash;
626
627 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
629 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
630 return Ok(TreeOutcome::new(status));
631 }
632
633 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
635
636 let status = if self.backfill_sync_state.is_idle() {
637 self.try_insert_payload(payload)?
638 } else {
639 self.try_buffer_payload(payload)?
640 };
641
642 let mut outcome = TreeOutcome::new(status);
643 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
645 if self.state.tree_state.canonical_block_hash() != block_hash {
647 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
648 sync_target_head: block_hash,
649 }));
650 }
651 }
652
653 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
655
656 Ok(outcome)
657 }
658
659 #[instrument(level = "debug", target = "engine::tree", skip_all)]
666 fn try_insert_payload(
667 &mut self,
668 payload: T::ExecutionData,
669 ) -> Result<PayloadStatus, InsertBlockFatalError> {
670 let block_hash = payload.block_hash();
671 let num_hash = payload.num_hash();
672 let parent_hash = payload.parent_hash();
673 let mut latest_valid_hash = None;
674
675 match self.insert_payload(payload) {
676 Ok(status) => {
677 let status = match status {
678 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
679 latest_valid_hash = Some(block_hash);
680 self.try_connect_buffered_blocks(num_hash)?;
681 PayloadStatusEnum::Valid
682 }
683 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
684 latest_valid_hash = Some(block_hash);
685 PayloadStatusEnum::Valid
686 }
687 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
688 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
689 PayloadStatusEnum::Syncing
691 }
692 };
693
694 Ok(PayloadStatus::new(status, latest_valid_hash))
695 }
696 Err(error) => match error {
697 InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
698 InsertPayloadError::Payload(error) => {
699 Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
700 }
701 },
702 }
703 }
704
705 fn try_buffer_payload(
714 &mut self,
715 payload: T::ExecutionData,
716 ) -> Result<PayloadStatus, InsertBlockFatalError> {
717 let parent_hash = payload.parent_hash();
718 let num_hash = payload.num_hash();
719
720 match self.payload_validator.convert_payload_to_block(payload) {
721 Ok(block) => {
723 if let Err(error) = self.buffer_block(block) {
724 Ok(self.on_insert_block_error(error)?)
725 } else {
726 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
727 }
728 }
729 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
730 }
731 }
732
733 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
740 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
742 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
743 self.metrics.engine.executed_new_block_cache_miss.increment(1);
744 return Ok(None)
745 };
746
747 let new_head_number = new_head_block.recovered_block().number();
748 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
749
750 let mut new_chain = vec![new_head_block.clone()];
751 let mut current_hash = new_head_block.recovered_block().parent_hash();
752 let mut current_number = new_head_number - 1;
753
754 while current_number > current_canonical_number {
759 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
760 {
761 current_hash = block.recovered_block().parent_hash();
762 current_number -= 1;
763 new_chain.push(block);
764 } else {
765 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
766 return Ok(None)
769 }
770 }
771
772 if current_hash == self.state.tree_state.current_canonical_head.hash {
775 new_chain.reverse();
776
777 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
779 }
780
781 let mut old_chain = Vec::new();
783 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
784
785 while current_canonical_number > current_number {
788 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
789 old_hash = block.recovered_block().parent_hash();
790 old_chain.push(block);
791 current_canonical_number -= 1;
792 } else {
793 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
795 return Ok(None)
796 }
797 }
798
799 debug_assert_eq!(current_number, current_canonical_number);
801
802 while old_hash != current_hash {
805 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
806 old_hash = block.recovered_block().parent_hash();
807 old_chain.push(block);
808 } else {
809 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
811 return Ok(None)
812 }
813
814 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
815 {
816 current_hash = block.recovered_block().parent_hash();
817 new_chain.push(block);
818 } else {
819 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
821 return Ok(None)
822 }
823 }
824 new_chain.reverse();
825 old_chain.reverse();
826
827 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
828 }
829
830 fn update_latest_block_to_canonical_ancestor(
842 &mut self,
843 canonical_header: &SealedHeader<N::BlockHeader>,
844 ) -> ProviderResult<()> {
845 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
846 let current_head_number = self.state.tree_state.canonical_block_number();
847 let new_head_number = canonical_header.number();
848 let new_head_hash = canonical_header.hash();
849
850 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
852
853 if new_head_number < current_head_number {
855 debug!(
856 target: "engine::tree",
857 current_head = current_head_number,
858 new_head = new_head_number,
859 new_head_hash = ?new_head_hash,
860 "FCU unwind detected: reverting to canonical ancestor"
861 );
862
863 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
864 } else {
865 debug!(
866 target: "engine::tree",
867 previous_head = current_head_number,
868 new_head = new_head_number,
869 new_head_hash = ?new_head_hash,
870 "Advancing latest block to canonical ancestor"
871 );
872 self.handle_chain_advance_or_same_height(canonical_header)
873 }
874 }
875
876 fn handle_canonical_chain_unwind(
879 &self,
880 current_head_number: u64,
881 canonical_header: &SealedHeader<N::BlockHeader>,
882 ) -> ProviderResult<()> {
883 let new_head_number = canonical_header.number();
884 debug!(
885 target: "engine::tree",
886 from = current_head_number,
887 to = new_head_number,
888 "Handling unwind: collecting blocks to remove from in-memory state"
889 );
890
891 let old_blocks =
893 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
894
895 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
897 }
898
899 fn collect_blocks_for_canonical_unwind(
901 &self,
902 new_head_number: u64,
903 current_head_number: u64,
904 ) -> Vec<ExecutedBlock<N>> {
905 let mut old_blocks =
906 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
907
908 for block_num in (new_head_number + 1)..=current_head_number {
909 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
910 let executed_block = block_state.block_ref().clone();
911 old_blocks.push(executed_block);
912 debug!(
913 target: "engine::tree",
914 block_number = block_num,
915 "Collected block for removal from in-memory state"
916 );
917 }
918 }
919
920 if old_blocks.is_empty() {
921 debug!(
922 target: "engine::tree",
923 "No blocks found in memory to remove, will clear and reset state"
924 );
925 }
926
927 old_blocks
928 }
929
930 fn apply_canonical_ancestor_via_reorg(
932 &self,
933 canonical_header: &SealedHeader<N::BlockHeader>,
934 old_blocks: Vec<ExecutedBlock<N>>,
935 ) -> ProviderResult<()> {
936 let new_head_hash = canonical_header.hash();
937 let new_head_number = canonical_header.number();
938
939 match self.canonical_block_by_hash(new_head_hash)? {
941 Some(executed_block) => {
942 self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
944 new: vec![executed_block],
945 old: old_blocks,
946 });
947
948 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
951
952 debug!(
953 target: "engine::tree",
954 block_number = new_head_number,
955 block_hash = ?new_head_hash,
956 "Successfully loaded canonical ancestor into memory via reorg"
957 );
958 }
959 None => {
960 warn!(
962 target: "engine::tree",
963 block_hash = ?new_head_hash,
964 "Could not find canonical ancestor block, updating header only"
965 );
966 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
967 }
968 }
969
970 Ok(())
971 }
972
973 fn handle_chain_advance_or_same_height(
975 &self,
976 canonical_header: &SealedHeader<N::BlockHeader>,
977 ) -> ProviderResult<()> {
978 self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
980
981 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
983
984 Ok(())
985 }
986
987 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
989 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
991 return Ok(());
992 }
993
994 if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
996 self.canonical_in_memory_state
997 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
998
999 debug!(
1000 target: "engine::tree",
1001 block_number,
1002 block_hash = ?block_hash,
1003 "Added canonical block to in-memory state"
1004 );
1005 }
1006
1007 Ok(())
1008 }
1009
1010 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1019 fn on_forkchoice_updated(
1020 &mut self,
1021 state: ForkchoiceState,
1022 attrs: Option<T::PayloadAttributes>,
1023 version: EngineApiMessageVersion,
1024 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1025 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1026
1027 self.record_forkchoice_metrics();
1029
1030 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1032 return Ok(TreeOutcome::new(early_result));
1033 }
1034
1035 if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
1037 return Ok(result);
1038 }
1039
1040 if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1043 return Ok(result);
1044 }
1045
1046 self.handle_missing_block(state)
1048 }
1049
1050 fn record_forkchoice_metrics(&self) {
1052 self.canonical_in_memory_state.on_forkchoice_update_received();
1053 }
1054
1055 fn validate_forkchoice_state(
1060 &mut self,
1061 state: ForkchoiceState,
1062 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1063 if state.head_block_hash.is_zero() {
1064 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1065 }
1066
1067 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1070 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1071 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1072 }
1073
1074 if !self.backfill_sync_state.is_idle() {
1075 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1078 return Ok(Some(OnForkChoiceUpdated::syncing()));
1079 }
1080
1081 Ok(None)
1082 }
1083
1084 fn handle_canonical_head(
1090 &self,
1091 state: ForkchoiceState,
1092 attrs: &Option<T::PayloadAttributes>, version: EngineApiMessageVersion,
1094 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1095 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1110 return Ok(None);
1111 }
1112
1113 trace!(target: "engine::tree", "fcu head hash is already canonical");
1114
1115 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1117 return Ok(Some(TreeOutcome::new(outcome)));
1119 }
1120
1121 if let Some(attr) = attrs {
1123 let tip = self
1124 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1125 .ok_or_else(|| {
1126 ProviderError::HeaderNotFound(state.head_block_hash.into())
1129 })?;
1130 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1132 return Ok(Some(TreeOutcome::new(updated)));
1133 }
1134
1135 Ok(Some(Self::valid_outcome(state)))
1137 }
1138
1139 fn apply_chain_update(
1151 &mut self,
1152 state: ForkchoiceState,
1153 attrs: &Option<T::PayloadAttributes>,
1154 version: EngineApiMessageVersion,
1155 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1156 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1158 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1159
1160 if self.engine_kind.is_opstack() ||
1163 self.config.always_process_payload_attributes_on_canonical_head()
1164 {
1165 if self.config.unwind_canonical_header() {
1171 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1172 }
1173
1174 if let Some(attr) = attrs {
1175 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1176 let updated = self.process_payload_attributes(
1178 attr.clone(),
1179 &canonical_header,
1180 state,
1181 version,
1182 );
1183 return Ok(Some(TreeOutcome::new(updated)));
1184 }
1185 }
1186
1187 return Ok(Some(Self::valid_outcome(state)));
1198 }
1199
1200 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1202 let tip = chain_update.tip().clone_sealed_header();
1203 self.on_canonical_chain_update(chain_update);
1204
1205 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1207 return Ok(Some(TreeOutcome::new(outcome)));
1209 }
1210
1211 if let Some(attr) = attrs {
1212 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1214 return Ok(Some(TreeOutcome::new(updated)));
1215 }
1216
1217 return Ok(Some(Self::valid_outcome(state)));
1218 }
1219
1220 Ok(None)
1221 }
1222
1223 fn handle_missing_block(
1228 &self,
1229 state: ForkchoiceState,
1230 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1231 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1238 !state.safe_block_hash.is_zero() &&
1240 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1241 {
1242 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1243 state.safe_block_hash
1244 } else {
1245 state.head_block_hash
1246 };
1247
1248 let target = self.lowest_buffered_ancestor_or(target);
1249 trace!(target: "engine::tree", %target, "downloading missing block");
1250
1251 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1252 PayloadStatusEnum::Syncing,
1253 )))
1254 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1255 }
1256
1257 fn remove_blocks(&mut self, new_tip_num: u64) {
1260 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1261 if new_tip_num < self.persistence_state.last_persisted_block.number {
1262 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1263 let (tx, rx) = crossbeam_channel::bounded(1);
1264 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1265 self.persistence_state.start_remove(new_tip_num, rx);
1266 }
1267 }
1268
1269 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1272 if blocks_to_persist.is_empty() {
1273 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1274 return
1275 }
1276
1277 let highest_num_hash = blocks_to_persist
1279 .iter()
1280 .max_by_key(|block| block.recovered_block().number())
1281 .map(|b| b.recovered_block().num_hash())
1282 .expect("Checked non-empty persisting blocks");
1283
1284 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1285 let (tx, rx) = crossbeam_channel::bounded(1);
1286 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1287
1288 self.persistence_state.start_save(highest_num_hash, rx);
1289 }
1290
1291 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1296 if !self.persistence_state.in_progress() {
1297 if let Some(new_tip_num) = self.find_disk_reorg()? {
1298 self.remove_blocks(new_tip_num)
1299 } else if self.should_persist() {
1300 let blocks_to_persist =
1301 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1302 self.persist_blocks(blocks_to_persist);
1303 }
1304 }
1305
1306 Ok(())
1307 }
1308
1309 fn finish_termination(
1314 &mut self,
1315 pending_termination: oneshot::Sender<()>,
1316 ) -> Result<(), AdvancePersistenceError> {
1317 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1318 let result = self.persist_until_complete();
1319 let _ = pending_termination.send(());
1320 result
1321 }
1322
1323 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1325 loop {
1326 if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
1328 let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1329 self.on_persistence_complete(result, start_time)?;
1330 }
1331
1332 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1333
1334 if blocks_to_persist.is_empty() {
1335 debug!(target: "engine::tree", "persistence complete, signaling termination");
1336 return Ok(())
1337 }
1338
1339 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1340 self.persist_blocks(blocks_to_persist);
1341 }
1342 }
1343
1344 #[cfg(test)]
1348 pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1349 let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1350 return Ok(false);
1351 };
1352
1353 match rx.try_recv() {
1354 Ok(result) => {
1355 self.on_persistence_complete(result, start_time)?;
1356 Ok(true)
1357 }
1358 Err(crossbeam_channel::TryRecvError::Empty) => {
1359 self.persistence_state.rx = Some((rx, start_time, action));
1361 Ok(false)
1362 }
1363 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1364 Err(AdvancePersistenceError::ChannelClosed)
1365 }
1366 }
1367 }
1368
1369 fn on_persistence_complete(
1371 &mut self,
1372 last_persisted_hash_num: Option<BlockNumHash>,
1373 start_time: Instant,
1374 ) -> Result<(), AdvancePersistenceError> {
1375 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1376
1377 let Some(BlockNumHash {
1378 hash: last_persisted_block_hash,
1379 number: last_persisted_block_number,
1380 }) = last_persisted_hash_num
1381 else {
1382 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1384 return Ok(())
1385 };
1386
1387 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1388 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1389
1390 let min_threshold =
1394 last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1395 let eviction_threshold =
1396 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1397 finalized.number.min(min_threshold)
1399 } else {
1400 min_threshold
1402 };
1403 debug!(
1404 target: "engine::tree",
1405 last_persisted = last_persisted_block_number,
1406 finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1407 eviction_threshold,
1408 "Evicting changesets below threshold"
1409 );
1410 self.changeset_cache.evict(eviction_threshold);
1411
1412 self.state.tree_state.invalidate_cached_overlay();
1414
1415 self.on_new_persisted_block()?;
1416
1417 if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1421 self.runtime.spawn_blocking_named("prepare-overlay", move || {
1422 let _ = overlay.get();
1423 });
1424 }
1425
1426 Ok(())
1427 }
1428
1429 fn on_engine_message(
1433 &mut self,
1434 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1435 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1436 match msg {
1437 FromEngine::Event(event) => match event {
1438 FromOrchestrator::BackfillSyncStarted => {
1439 debug!(target: "engine::tree", "received backfill sync started event");
1440 self.backfill_sync_state = BackfillSyncState::Active;
1441 }
1442 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1443 self.on_backfill_sync_finished(ctrl)?;
1444 }
1445 FromOrchestrator::Terminate { tx } => {
1446 debug!(target: "engine::tree", "received terminate request");
1447 if let Err(err) = self.finish_termination(tx) {
1448 error!(target: "engine::tree", %err, "Termination failed");
1449 }
1450 return Ok(ops::ControlFlow::Break(()))
1451 }
1452 },
1453 FromEngine::Request(request) => {
1454 match request {
1455 EngineApiRequest::InsertExecutedBlock(block) => {
1456 let block_num_hash = block.recovered_block().num_hash();
1457 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1458 return Ok(ops::ControlFlow::Continue(()))
1460 }
1461
1462 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1463 let now = Instant::now();
1464
1465 if self.state.tree_state.canonical_block_hash() ==
1468 block.recovered_block().parent_hash()
1469 {
1470 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1471 self.canonical_in_memory_state.set_pending_block(block.clone());
1472 }
1473
1474 self.state.tree_state.insert_executed(block.clone());
1475 self.payload_validator.on_inserted_executed_block(block.clone());
1476 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1477 self.emit_event(EngineApiEvent::BeaconConsensus(
1478 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1479 ));
1480 }
1481 EngineApiRequest::Beacon(request) => {
1482 match request {
1483 BeaconEngineMessage::ForkchoiceUpdated {
1484 state,
1485 payload_attrs,
1486 tx,
1487 version,
1488 } => {
1489 let has_attrs = payload_attrs.is_some();
1490
1491 let start = Instant::now();
1492 let mut output =
1493 self.on_forkchoice_updated(state, payload_attrs, version);
1494
1495 if let Ok(res) = &mut output {
1496 self.state
1498 .forkchoice_state_tracker
1499 .set_latest(state, res.outcome.forkchoice_status());
1500
1501 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1503 state,
1504 res.outcome.forkchoice_status(),
1505 ));
1506
1507 self.on_maybe_tree_event(res.event.take())?;
1509 }
1510
1511 if let Err(ref err) = output {
1512 error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1513 }
1514
1515 self.metrics.engine.forkchoice_updated.update_response_metrics(
1516 start,
1517 &mut self.metrics.engine.new_payload.latest_finish_at,
1518 has_attrs,
1519 &output,
1520 );
1521
1522 if let Err(err) =
1523 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1524 {
1525 self.metrics
1526 .engine
1527 .failed_forkchoice_updated_response_deliveries
1528 .increment(1);
1529 warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1530 }
1531 }
1532 BeaconEngineMessage::NewPayload { payload, tx } => {
1533 let start = Instant::now();
1534 let gas_used = payload.gas_used();
1535 let num_hash = payload.num_hash();
1536 let mut output = self.on_new_payload(payload);
1537 self.metrics.engine.new_payload.update_response_metrics(
1538 start,
1539 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1540 &output,
1541 gas_used,
1542 );
1543
1544 let maybe_event =
1545 output.as_mut().ok().and_then(|out| out.event.take());
1546
1547 if let Err(err) =
1549 tx.send(output.map(|o| o.outcome).map_err(|e| {
1550 BeaconOnNewPayloadError::Internal(Box::new(e))
1551 }))
1552 {
1553 warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1554 self.metrics
1555 .engine
1556 .failed_new_payload_response_deliveries
1557 .increment(1);
1558 }
1559
1560 self.on_maybe_tree_event(maybe_event)?;
1562 }
1563 BeaconEngineMessage::RethNewPayload { payload, tx } => {
1564 debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload");
1573
1574 let pending_persistence = self.persistence_state.rx.take();
1575 let persistence_rx = if let Some((rx, start_time, _action)) =
1576 pending_persistence
1577 {
1578 let (persistence_tx, persistence_rx) =
1579 std::sync::mpsc::channel();
1580 self.runtime.spawn_blocking_named("wait-persist", move || {
1581 let start = Instant::now();
1582 let result =
1583 rx.recv().expect("persistence state channel closed");
1584 let _ = persistence_tx.send((
1585 result,
1586 start_time,
1587 start.elapsed(),
1588 ));
1589 });
1590 Some(persistence_rx)
1591 } else {
1592 None
1593 };
1594
1595 let cache_wait = self.payload_validator.wait_for_caches();
1596
1597 let persistence_wait = if let Some(persistence_rx) = persistence_rx
1598 {
1599 let (result, start_time, wait_duration) = persistence_rx
1600 .recv()
1601 .expect("persistence result channel closed");
1602 let _ = self.on_persistence_complete(result, start_time);
1603 Some(wait_duration)
1604 } else {
1605 None
1606 };
1607
1608 debug!(
1609 target: "engine::tree",
1610 ?persistence_wait,
1611 execution_cache_wait = ?cache_wait.execution_cache,
1612 sparse_trie_wait = ?cache_wait.sparse_trie,
1613 "Persistence finished and caches updated for reth_newPayload"
1614 );
1615
1616 let start = Instant::now();
1617 let gas_used = payload.gas_used();
1618 let num_hash = payload.num_hash();
1619 let mut output = self.on_new_payload(payload);
1620 let latency = start.elapsed();
1621 self.metrics.engine.new_payload.update_response_metrics(
1622 start,
1623 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1624 &output,
1625 gas_used,
1626 );
1627
1628 let maybe_event =
1629 output.as_mut().ok().and_then(|out| out.event.take());
1630
1631 let timings = NewPayloadTimings {
1632 latency,
1633 persistence_wait,
1634 execution_cache_wait: cache_wait.execution_cache,
1635 sparse_trie_wait: cache_wait.sparse_trie,
1636 };
1637 if let Err(err) =
1638 tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1639 BeaconOnNewPayloadError::Internal(Box::new(e))
1640 }))
1641 {
1642 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1643 self.metrics
1644 .engine
1645 .failed_new_payload_response_deliveries
1646 .increment(1);
1647 }
1648
1649 self.on_maybe_tree_event(maybe_event)?;
1650 }
1651 }
1652 }
1653 }
1654 }
1655 FromEngine::DownloadedBlocks(blocks) => {
1656 if let Some(event) = self.on_downloaded(blocks)? {
1657 self.on_tree_event(event)?;
1658 }
1659 }
1660 }
1661 Ok(ops::ControlFlow::Continue(()))
1662 }
1663
1664 fn on_backfill_sync_finished(
1678 &mut self,
1679 ctrl: ControlFlow,
1680 ) -> Result<(), InsertBlockFatalError> {
1681 debug!(target: "engine::tree", "received backfill sync finished event");
1682 self.backfill_sync_state = BackfillSyncState::Idle;
1683
1684 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1686 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1687 self.state.invalid_headers.insert(**bad_block);
1689
1690 Some(*target)
1692 } else {
1693 ctrl.block_number()
1695 };
1696
1697 let Some(backfill_height) = backfill_height else { return Ok(()) };
1699
1700 let Some(backfill_num_hash) = self
1706 .provider
1707 .block_hash(backfill_height)?
1708 .map(|hash| BlockNumHash { hash, number: backfill_height })
1709 else {
1710 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1711 return Ok(())
1712 };
1713
1714 if ctrl.is_unwind() {
1715 self.state.tree_state.reset(backfill_num_hash)
1718 } else {
1719 self.state.tree_state.remove_until(
1720 backfill_num_hash,
1721 self.persistence_state.last_persisted_block.hash,
1722 Some(backfill_num_hash),
1723 );
1724 }
1725
1726 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1727 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1728
1729 self.state.buffer.remove_old_blocks(backfill_height);
1731 self.canonical_in_memory_state.clear_state();
1734
1735 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1736 self.state.tree_state.set_canonical_head(new_head.num_hash());
1739 self.persistence_state.finish(new_head.hash(), new_head.number());
1740
1741 self.canonical_in_memory_state.set_canonical_head(new_head);
1743 }
1744
1745 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1748 else {
1749 return Ok(())
1750 };
1751 if sync_target_state.finalized_block_hash.is_zero() {
1752 return Ok(())
1754 }
1755 let newest_finalized = self
1757 .state
1758 .buffer
1759 .block(&sync_target_state.finalized_block_hash)
1760 .map(|block| block.number());
1761
1762 if let Some(backfill_target) =
1768 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1769 self.backfill_sync_target(progress, finalized_number, None)
1772 })
1773 {
1774 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1776 backfill_target.into(),
1777 )));
1778 return Ok(())
1779 };
1780
1781 if let Some(lowest_buffered) =
1783 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1784 {
1785 let current_head_num = self.state.tree_state.current_canonical_head.number;
1786 let target_head_num = lowest_buffered.number();
1787
1788 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1789 {
1790 debug!(
1792 target: "engine::tree",
1793 %current_head_num,
1794 %target_head_num,
1795 %distance,
1796 "Backfill complete, downloading remaining blocks to reach FCU target"
1797 );
1798
1799 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1800 lowest_buffered.parent_hash(),
1801 distance,
1802 )));
1803 return Ok(());
1804 }
1805 } else {
1806 debug!(
1809 target: "engine::tree",
1810 head_hash = %sync_target_state.head_block_hash,
1811 "Backfill complete but head block not buffered, requesting download"
1812 );
1813 self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1814 sync_target_state.head_block_hash,
1815 )));
1816 return Ok(());
1817 }
1818
1819 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1821 }
1822
1823 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1827 if let Some(chain_update) = self.on_new_head(target)? {
1828 self.on_canonical_chain_update(chain_update);
1829 }
1830
1831 Ok(())
1832 }
1833
1834 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1836 if let Some(event) = event {
1837 self.on_tree_event(event)?;
1838 }
1839
1840 Ok(())
1841 }
1842
1843 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1847 match event {
1848 TreeEvent::TreeAction(action) => match action {
1849 TreeAction::MakeCanonical { sync_target_head } => {
1850 self.make_canonical(sync_target_head)?;
1851 }
1852 },
1853 TreeEvent::BackfillAction(action) => {
1854 self.emit_event(EngineApiEvent::BackfillAction(action));
1855 }
1856 TreeEvent::Download(action) => {
1857 self.emit_event(EngineApiEvent::Download(action));
1858 }
1859 }
1860
1861 Ok(())
1862 }
1863
1864 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1866 let event = event.into();
1867
1868 if event.is_backfill_action() {
1869 debug_assert_eq!(
1870 self.backfill_sync_state,
1871 BackfillSyncState::Idle,
1872 "backfill action should only be emitted when backfill is idle"
1873 );
1874
1875 if self.persistence_state.in_progress() {
1876 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1879 return
1880 }
1881
1882 self.backfill_sync_state = BackfillSyncState::Pending;
1883 self.metrics.engine.pipeline_runs.increment(1);
1884 debug!(target: "engine::tree", "emitting backfill action event");
1885 }
1886
1887 let _ = self.outgoing.send(event).inspect_err(
1888 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1889 );
1890 }
1891
1892 pub const fn should_persist(&self) -> bool {
1896 if !self.backfill_sync_state.is_idle() {
1897 return false
1899 }
1900
1901 let min_block = self.persistence_state.last_persisted_block.number;
1902 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1903 self.config.persistence_threshold()
1904 }
1905
1906 fn get_canonical_blocks_to_persist(
1909 &self,
1910 target: PersistTarget,
1911 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1912 debug_assert!(!self.persistence_state.in_progress());
1915
1916 let mut blocks_to_persist = Vec::new();
1917 let mut current_hash = self.state.tree_state.canonical_block_hash();
1918 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1919 let canonical_head_number = self.state.tree_state.canonical_block_number();
1920
1921 let target_number = match target {
1922 PersistTarget::Head => canonical_head_number,
1923 PersistTarget::Threshold => {
1924 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1925 }
1926 };
1927
1928 debug!(
1929 target: "engine::tree",
1930 ?current_hash,
1931 ?last_persisted_number,
1932 ?canonical_head_number,
1933 ?target_number,
1934 "Returning canonical blocks to persist"
1935 );
1936 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1937 if block.recovered_block().number() <= last_persisted_number {
1938 break;
1939 }
1940
1941 if block.recovered_block().number() <= target_number {
1942 blocks_to_persist.push(block.clone());
1943 }
1944
1945 current_hash = block.recovered_block().parent_hash();
1946 }
1947
1948 blocks_to_persist.reverse();
1950
1951 Ok(blocks_to_persist)
1952 }
1953
1954 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1962 if let Some(remove_above) = self.find_disk_reorg()? {
1965 self.remove_blocks(remove_above);
1966 return Ok(())
1967 }
1968
1969 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1970 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1971 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1972 number: self.persistence_state.last_persisted_block.number,
1973 hash: self.persistence_state.last_persisted_block.hash,
1974 });
1975 Ok(())
1976 }
1977
1978 #[instrument(level = "debug", target = "engine::tree", skip(self))]
1985 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1986 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1987 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1989 return Ok(Some(block.clone()))
1990 }
1991
1992 let (block, senders) = self
1993 .provider
1994 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1995 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1996 .split_sealed();
1997 let mut execution_output = self
1998 .provider
1999 .get_state(block.header().number())?
2000 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2001 let hashed_state = self.provider.hashed_post_state(execution_output.state());
2002
2003 debug!(
2004 target: "engine::tree",
2005 number = ?block.number(),
2006 "computing block trie updates",
2007 );
2008 let db_provider = self.provider.database_provider_ro()?;
2009 let trie_updates = reth_trie_db::compute_block_trie_updates(
2010 &self.changeset_cache,
2011 &db_provider,
2012 block.number(),
2013 )?;
2014
2015 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2016 let sorted_trie_updates = Arc::new(trie_updates);
2017 let trie_data =
2019 ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2020
2021 let execution_output = Arc::new(BlockExecutionOutput {
2022 state: execution_output.bundle,
2023 result: BlockExecutionResult {
2024 receipts: execution_output.receipts.pop().unwrap_or_default(),
2025 requests: execution_output.requests.pop().unwrap_or_default(),
2026 gas_used: block.gas_used(),
2027 blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2028 },
2029 });
2030
2031 Ok(Some(ExecutedBlock::new(
2032 Arc::new(RecoveredBlock::new_sealed(block, senders)),
2033 execution_output,
2034 trie_data,
2035 )))
2036 }
2037
2038 fn sealed_header_by_hash(
2040 &self,
2041 hash: B256,
2042 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2043 let header = self.state.tree_state.sealed_header_by_hash(&hash);
2045
2046 if header.is_some() {
2047 Ok(header)
2048 } else {
2049 self.provider.sealed_header_by_hash(hash)
2050 }
2051 }
2052
2053 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2060 self.state
2061 .buffer
2062 .lowest_ancestor(&hash)
2063 .map(|block| block.parent_hash())
2064 .unwrap_or_else(|| hash)
2065 }
2066
2067 fn latest_valid_hash_for_invalid_payload(
2078 &mut self,
2079 parent_hash: B256,
2080 ) -> ProviderResult<Option<B256>> {
2081 if self.sealed_header_by_hash(parent_hash)?.is_some() {
2083 return Ok(Some(parent_hash))
2084 }
2085
2086 let mut current_hash = parent_hash;
2089 let mut current_block = self.state.invalid_headers.get(¤t_hash);
2090 while let Some(block_with_parent) = current_block {
2091 current_hash = block_with_parent.parent;
2092 current_block = self.state.invalid_headers.get(¤t_hash);
2093
2094 if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
2097 return Ok(Some(current_hash))
2098 }
2099 }
2100 Ok(None)
2101 }
2102
2103 fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2107 let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2108 Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2112 Some(_) => Some(parent_hash),
2113 None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2114 };
2115
2116 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2117 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2118 })
2119 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2120 }
2121
2122 fn is_sync_target_head(&self, block_hash: B256) -> bool {
2126 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2127 return target.head_block_hash == block_hash
2128 }
2129 false
2130 }
2131
2132 fn is_any_sync_target(&self, block_hash: B256) -> bool {
2136 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2137 return target.contains(block_hash)
2138 }
2139 false
2140 }
2141
2142 fn check_invalid_ancestor_with_head(
2148 &mut self,
2149 check: B256,
2150 head: &SealedBlock<N::Block>,
2151 ) -> ProviderResult<Option<PayloadStatus>> {
2152 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2154
2155 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2156 }
2157
2158 fn on_invalid_new_payload(
2160 &mut self,
2161 head: SealedBlock<N::Block>,
2162 invalid: BlockWithParent,
2163 ) -> ProviderResult<PayloadStatus> {
2164 let status = self.prepare_invalid_response(invalid.parent)?;
2166
2167 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2169 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2170
2171 Ok(status)
2172 }
2173
2174 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2186 let parent_hash = payload.parent_hash();
2187 let block_hash = payload.block_hash();
2188 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2189 if lowest_buffered_ancestor == block_hash {
2190 lowest_buffered_ancestor = parent_hash;
2191 }
2192
2193 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2195 }
2196
2197 fn handle_invalid_ancestor_payload(
2206 &mut self,
2207 payload: T::ExecutionData,
2208 invalid: BlockWithParent,
2209 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2210 let parent_hash = payload.parent_hash();
2211 let num_hash = payload.num_hash();
2212
2213 let block = match self.payload_validator.convert_payload_to_block(payload) {
2219 Ok(block) => block,
2220 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2221 };
2222
2223 Ok(self.on_invalid_new_payload(block, invalid)?)
2224 }
2225
2226 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2229 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2231
2232 match self.prepare_invalid_response(header.parent) {
2234 Ok(status) => Ok(Some(status)),
2235 Err(err) => {
2236 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2237 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2239 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2240 })))
2241 }
2242 }
2243 }
2244
2245 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2248 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2249 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2250 return Err(e)
2251 }
2252
2253 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2254 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2255 return Err(e)
2256 }
2257
2258 Ok(())
2259 }
2260
2261 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2263 fn try_connect_buffered_blocks(
2264 &mut self,
2265 parent: BlockNumHash,
2266 ) -> Result<(), InsertBlockFatalError> {
2267 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2268
2269 if blocks.is_empty() {
2270 return Ok(())
2272 }
2273
2274 let now = Instant::now();
2275 let block_count = blocks.len();
2276 for child in blocks {
2277 let child_num_hash = child.num_hash();
2278 match self.insert_block(child) {
2279 Ok(res) => {
2280 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2281 if self.is_any_sync_target(child_num_hash.hash) &&
2282 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2283 {
2284 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2285 self.make_canonical(child_num_hash.hash)?;
2288 }
2289 }
2290 Err(err) => {
2291 if let InsertPayloadError::Block(err) = err {
2292 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2293 if let Err(fatal) = self.on_insert_block_error(err) {
2294 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2295 return Err(fatal)
2296 }
2297 }
2298 }
2299 }
2300 }
2301
2302 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2303 Ok(())
2304 }
2305
2306 fn buffer_block(
2308 &mut self,
2309 block: SealedBlock<N::Block>,
2310 ) -> Result<(), InsertBlockError<N::Block>> {
2311 if let Err(err) = self.validate_block(&block) {
2312 return Err(InsertBlockError::consensus_error(err, block))
2313 }
2314 self.state.buffer.insert_block(block);
2315 Ok(())
2316 }
2317
2318 #[inline]
2323 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2324 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2325 }
2326
2327 #[inline]
2330 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2331 if block > local_tip {
2332 Some(block - local_tip)
2333 } else {
2334 None
2335 }
2336 }
2337
2338 fn backfill_sync_target(
2345 &self,
2346 canonical_tip_num: u64,
2347 target_block_number: u64,
2348 downloaded_block: Option<BlockNumHash>,
2349 ) -> Option<B256> {
2350 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2351
2352 let exceeds_backfill_threshold =
2354 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2355 (Some(downloaded_block), Some(state))
2357 if downloaded_block.hash == state.finalized_block_hash =>
2358 {
2359 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2360 }
2361 _ => match sync_target_state
2362 .as_ref()
2363 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2364 {
2365 Some(buffered_finalized) => {
2366 self.exceeds_backfill_run_threshold(
2369 canonical_tip_num,
2370 buffered_finalized.number(),
2371 )
2372 }
2373 None => {
2374 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2376 }
2377 },
2378 };
2379
2380 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2382 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2384 Err(err) => {
2385 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2386 }
2387 Ok(None) => {
2388 if !state.finalized_block_hash.is_zero() {
2390 return Some(state.finalized_block_hash)
2393 }
2394
2395 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2408 return Some(state.head_block_hash)
2409 }
2410 Ok(Some(_)) => {
2411 }
2413 }
2414 }
2415
2416 None
2417 }
2418
2419 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2422 let mut canonical = self.state.tree_state.current_canonical_head;
2423 let mut persisted = self.persistence_state.last_persisted_block;
2424
2425 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2426 Ok(self
2427 .sealed_header_by_hash(num_hash.hash)?
2428 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2429 .parent_num_hash())
2430 };
2431
2432 while canonical.number > persisted.number {
2435 canonical = parent_num_hash(canonical)?;
2436 }
2437
2438 if canonical == persisted {
2440 return Ok(None);
2441 }
2442
2443 while persisted.number > canonical.number {
2449 persisted = parent_num_hash(persisted)?;
2450 }
2451
2452 debug_assert_eq!(persisted.number, canonical.number);
2453
2454 while persisted.hash != canonical.hash {
2456 canonical = parent_num_hash(canonical)?;
2457 persisted = parent_num_hash(persisted)?;
2458 }
2459
2460 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2461
2462 Ok(Some(persisted.number))
2463 }
2464
2465 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2469 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2470 let start = Instant::now();
2471
2472 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2474
2475 let tip = chain_update.tip().clone_sealed_header();
2476 let notification = chain_update.to_chain_notification();
2477
2478 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2480 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2481 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2482 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2483
2484 self.update_reorg_metrics(old.len(), old_first);
2485 self.reinsert_reorged_blocks(new.clone());
2486
2487 if !self.use_hashed_state {
2490 self.reinsert_reorged_blocks(old.clone());
2491 }
2492 }
2493
2494 self.canonical_in_memory_state.update_chain(chain_update);
2496 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2497
2498 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2500
2501 self.canonical_in_memory_state.notify_canon_state(notification);
2503
2504 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2506 Box::new(tip),
2507 start.elapsed(),
2508 ));
2509 }
2510
2511 fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2513 if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2514 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2515 first_reorged_block <= finalized.number
2516 {
2517 self.metrics.tree.reorgs.finalized.increment(1);
2518 } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2519 first_reorged_block <= safe.number
2520 {
2521 self.metrics.tree.reorgs.safe.increment(1);
2522 } else {
2523 self.metrics.tree.reorgs.head.increment(1);
2524 }
2525 } else {
2526 debug_unreachable!("Reorged chain doesn't have any blocks");
2527 }
2528 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2529 }
2530
2531 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2533 for block in new_chain {
2534 if self
2535 .state
2536 .tree_state
2537 .executed_block_by_hash(block.recovered_block().hash())
2538 .is_none()
2539 {
2540 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2541 self.state.tree_state.insert_executed(block);
2542 }
2543 }
2544 }
2545
2546 fn on_disconnected_downloaded_block(
2551 &self,
2552 downloaded_block: BlockNumHash,
2553 missing_parent: BlockNumHash,
2554 head: BlockNumHash,
2555 ) -> Option<TreeEvent> {
2556 if let Some(target) =
2558 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2559 {
2560 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2561 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2562 }
2563
2564 let request = if let Some(distance) =
2574 self.distance_from_local_tip(head.number, missing_parent.number)
2575 {
2576 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2577 DownloadRequest::BlockRange(missing_parent.hash, distance)
2578 } else {
2579 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2580 DownloadRequest::single_block(missing_parent.hash)
2583 };
2584
2585 Some(TreeEvent::Download(request))
2586 }
2587
2588 fn on_valid_downloaded_block(
2595 &mut self,
2596 block_num_hash: BlockNumHash,
2597 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2598 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2601 sync_target.contains(block_num_hash.hash)
2602 {
2603 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2604
2605 if sync_target.head_block_hash == block_num_hash.hash {
2606 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2608 sync_target_head: block_num_hash.hash,
2609 })))
2610 }
2611
2612 self.make_canonical(block_num_hash.hash)?;
2616 self.try_connect_buffered_blocks(block_num_hash)?;
2617
2618 if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2621 let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2622 trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2623 return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2624 }
2625
2626 return Ok(None)
2627 }
2628 trace!(target: "engine::tree", "appended downloaded block");
2629 self.try_connect_buffered_blocks(block_num_hash)?;
2630 Ok(None)
2631 }
2632
2633 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2639 fn on_downloaded_block(
2640 &mut self,
2641 block: SealedBlock<N::Block>,
2642 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2643 let block_num_hash = block.num_hash();
2644 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2645 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2646 return Ok(None)
2647 }
2648
2649 if !self.backfill_sync_state.is_idle() {
2650 return Ok(None)
2651 }
2652
2653 match self.insert_block(block) {
2655 Ok(
2656 InsertPayloadOk::Inserted(BlockStatus::Valid) |
2657 InsertPayloadOk::AlreadySeen(BlockStatus::Valid),
2658 ) => {
2659 return self.on_valid_downloaded_block(block_num_hash);
2660 }
2661 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2662 return Ok(self.on_disconnected_downloaded_block(
2665 block_num_hash,
2666 missing_ancestor,
2667 head,
2668 ))
2669 }
2670 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2671 trace!(target: "engine::tree", "downloaded block already executed");
2672 }
2673 Err(err) => {
2674 if let InsertPayloadError::Block(err) = err {
2675 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2676 if let Err(fatal) = self.on_insert_block_error(err) {
2677 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2678 return Err(fatal)
2679 }
2680 }
2681 }
2682 }
2683 Ok(None)
2684 }
2685
2686 fn insert_payload(
2695 &mut self,
2696 payload: T::ExecutionData,
2697 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2698 self.insert_block_or_payload(
2699 payload.block_with_parent(),
2700 payload,
2701 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2702 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2703 )
2704 }
2705
2706 fn insert_block(
2707 &mut self,
2708 block: SealedBlock<N::Block>,
2709 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2710 self.insert_block_or_payload(
2711 block.block_with_parent(),
2712 block,
2713 |validator, block, ctx| validator.validate_block(block, ctx),
2714 |_, block| Ok(block),
2715 )
2716 }
2717
2718 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2735 fn insert_block_or_payload<Input, Err>(
2736 &mut self,
2737 block_id: BlockWithParent,
2738 input: Input,
2739 execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2740 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2741 ) -> Result<InsertPayloadOk, Err>
2742 where
2743 Err: From<InsertBlockError<N::Block>>,
2744 {
2745 let block_insert_start = Instant::now();
2746 let block_num_hash = block_id.block;
2747 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2748
2749 if self.state.tree_state.sealed_header_by_hash(&block_num_hash.hash).is_some() {
2751 convert_to_block(self, input)?;
2752 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2753 }
2754
2755 if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2758 match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2759 Err(err) => {
2760 let block = convert_to_block(self, input)?;
2761 return Err(InsertBlockError::new(block, err.into()).into());
2762 }
2763 Ok(Some(_)) => {
2764 convert_to_block(self, input)?;
2765 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2766 }
2767 Ok(None) => {}
2768 }
2769 }
2770
2771 match self.state_provider_builder(block_id.parent) {
2773 Err(err) => {
2774 let block = convert_to_block(self, input)?;
2775 return Err(InsertBlockError::new(block, err.into()).into());
2776 }
2777 Ok(None) => {
2778 let block = convert_to_block(self, input)?;
2779
2780 let missing_ancestor = self
2783 .state
2784 .buffer
2785 .lowest_ancestor(&block.parent_hash())
2786 .map(|block| block.parent_num_hash())
2787 .unwrap_or_else(|| block.parent_num_hash());
2788
2789 self.state.buffer.insert_block(block);
2790
2791 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2792 head: self.state.tree_state.current_canonical_head,
2793 missing_ancestor,
2794 }))
2795 }
2796 Ok(Some(_)) => {}
2797 }
2798
2799 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2804
2805 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2806
2807 let start = Instant::now();
2808
2809 let executed = execute(&mut self.payload_validator, input, ctx)?;
2810
2811 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2813 {
2814 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2815 self.canonical_in_memory_state.set_pending_block(executed.clone());
2816 }
2817
2818 self.state.tree_state.insert_executed(executed.clone());
2819 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2820
2821 let elapsed = start.elapsed();
2823 let engine_event = if is_fork {
2824 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2825 } else {
2826 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2827 };
2828 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2829
2830 self.metrics
2831 .engine
2832 .block_insert_total_duration
2833 .record(block_insert_start.elapsed().as_secs_f64());
2834 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2835 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2836 }
2837
2838 fn on_insert_block_error(
2844 &mut self,
2845 error: InsertBlockError<N::Block>,
2846 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2847 let (block, error) = error.split();
2848
2849 let validation_err = error.ensure_validation_error()?;
2852
2853 warn!(
2857 target: "engine::tree",
2858 invalid_hash=%block.hash(),
2859 invalid_number=block.number(),
2860 %validation_err,
2861 "Invalid block error on new payload",
2862 );
2863 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2864
2865 self.state.invalid_headers.insert(block.block_with_parent());
2867 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2868 Box::new(block),
2869 )));
2870
2871 Ok(PayloadStatus::new(
2872 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2873 latest_valid_hash,
2874 ))
2875 }
2876
2877 fn on_new_payload_error(
2879 &mut self,
2880 error: NewPayloadError,
2881 payload_num_hash: NumHash,
2882 parent_hash: B256,
2883 ) -> ProviderResult<PayloadStatus> {
2884 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2885 let latest_valid_hash =
2888 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2889 None
2893 } else {
2894 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2895 };
2896
2897 let status = PayloadStatusEnum::from(error);
2898 Ok(PayloadStatus::new(status, latest_valid_hash))
2899 }
2900
2901 pub fn find_canonical_header(
2903 &self,
2904 hash: B256,
2905 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2906 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2907
2908 if canonical.is_none() {
2909 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2910 }
2911
2912 Ok(canonical)
2913 }
2914
2915 fn update_finalized_block(
2917 &self,
2918 finalized_block_hash: B256,
2919 ) -> Result<(), OnForkChoiceUpdated> {
2920 if finalized_block_hash.is_zero() {
2921 return Ok(())
2922 }
2923
2924 match self.find_canonical_header(finalized_block_hash) {
2925 Ok(None) => {
2926 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2927 return Err(OnForkChoiceUpdated::invalid_state())
2929 }
2930 Ok(Some(finalized)) => {
2931 if Some(finalized.num_hash()) !=
2932 self.canonical_in_memory_state.get_finalized_num_hash()
2933 {
2934 let _ = self.persistence.save_finalized_block_number(finalized.number());
2937 self.canonical_in_memory_state.set_finalized(finalized.clone());
2938 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2940 }
2941 }
2942 Err(err) => {
2943 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2944 }
2945 }
2946
2947 Ok(())
2948 }
2949
2950 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2952 if safe_block_hash.is_zero() {
2953 return Ok(())
2954 }
2955
2956 match self.find_canonical_header(safe_block_hash) {
2957 Ok(None) => {
2958 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2959 return Err(OnForkChoiceUpdated::invalid_state())
2961 }
2962 Ok(Some(safe)) => {
2963 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2964 let _ = self.persistence.save_safe_block_number(safe.number());
2967 self.canonical_in_memory_state.set_safe(safe.clone());
2968 self.metrics.tree.safe_block_height.set(safe.number() as f64);
2970 }
2971 }
2972 Err(err) => {
2973 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2974 }
2975 }
2976
2977 Ok(())
2978 }
2979
2980 fn ensure_consistent_forkchoice_state(
2989 &self,
2990 state: ForkchoiceState,
2991 ) -> Result<(), OnForkChoiceUpdated> {
2992 self.update_finalized_block(state.finalized_block_hash)?;
2998
2999 self.update_safe_block(state.safe_block_hash)
3005 }
3006
3007 fn process_payload_attributes(
3012 &self,
3013 attrs: T::PayloadAttributes,
3014 head: &N::BlockHeader,
3015 state: ForkchoiceState,
3016 version: EngineApiMessageVersion,
3017 ) -> OnForkChoiceUpdated {
3018 if let Err(err) =
3019 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
3020 {
3021 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3022 return OnForkChoiceUpdated::invalid_payload_attributes()
3023 }
3024
3025 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
3030 state.head_block_hash,
3031 attrs,
3032 version as u8,
3033 ) {
3034 Ok(attributes) => {
3035 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
3038
3039 OnForkChoiceUpdated::updated_with_pending_payload_id(
3051 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3052 pending_payload_id,
3053 )
3054 }
3055 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
3056 }
3057 }
3058
3059 pub(crate) fn remove_before(
3066 &mut self,
3067 upper_bound: BlockNumHash,
3068 finalized_hash: Option<B256>,
3069 ) -> ProviderResult<()> {
3070 let num = if let Some(hash) = finalized_hash {
3073 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3074 } else {
3075 None
3076 };
3077
3078 self.state.tree_state.remove_until(
3079 upper_bound,
3080 self.persistence_state.last_persisted_block.hash,
3081 num,
3082 );
3083 Ok(())
3084 }
3085
3086 pub fn state_provider_builder(
3091 &self,
3092 hash: B256,
3093 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3094 where
3095 P: BlockReader + StateProviderFactory + StateReader + Clone,
3096 {
3097 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3098 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3099 return Ok(Some(StateProviderBuilder::new(
3101 self.provider.clone(),
3102 historical,
3103 Some(blocks),
3104 )))
3105 }
3106
3107 if let Some(header) = self.provider.header(hash)? {
3109 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3110 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3113 }
3114
3115 debug!(target: "engine::tree", %hash, "no canonical state found for block");
3116 Ok(None)
3117 }
3118}
3119
3120#[derive(Debug)]
3122enum LoopEvent<T, N>
3123where
3124 N: NodePrimitives,
3125 T: PayloadTypes,
3126{
3127 EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3129 PersistenceComplete {
3131 result: Option<BlockNumHash>,
3133 start_time: Instant,
3135 },
3136 Disconnected,
3138}
3139
3140#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3146pub enum BlockStatus {
3147 Valid,
3149 Disconnected {
3151 head: BlockNumHash,
3153 missing_ancestor: BlockNumHash,
3155 },
3156}
3157
3158#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3163pub enum InsertPayloadOk {
3164 AlreadySeen(BlockStatus),
3166 Inserted(BlockStatus),
3168}
3169
3170#[derive(Debug, Clone, Copy)]
3172enum PersistTarget {
3173 Threshold,
3175 Head,
3177}
3178
3179#[derive(Debug, Clone, Copy, Default)]
3181pub struct CacheWaitDurations {
3182 pub execution_cache: Duration,
3184 pub sparse_trie: Duration,
3186}
3187
3188pub trait WaitForCaches {
3193 fn wait_for_caches(&self) -> CacheWaitDurations;
3197}