1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError, InsertBlockValidationError};
15use reth_chain_state::{
16 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17 MemoryOverlayStateProvider, NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22 ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29 FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32 BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33 DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34 StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35 StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48 oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64pub mod types;
65
66use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
67pub use block_buffer::BlockBuffer;
68pub use invalid_headers::InvalidHeaderCache;
69pub use metrics::EngineApiMetrics;
70pub use payload_processor::*;
71pub use payload_validator::{BasicEngineValidator, EngineValidator};
72pub use persistence_state::PersistenceState;
73pub use reth_engine_primitives::TreeConfig;
74pub use reth_execution_cache::{
75 CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider, ExecutionCache,
76 PayloadExecutionCache, SavedCache,
77};
78pub use types::{ValidationOutcome, ValidationOutput};
79
80pub mod state;
81
82pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
92
93const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
98
99#[derive(Clone, Debug)]
101pub struct StateProviderBuilder<N: NodePrimitives, P> {
102 provider_factory: P,
104 historical: B256,
106 overlay: Option<Vec<ExecutedBlock<N>>>,
108}
109
110impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
111 pub const fn new(
114 provider_factory: P,
115 historical: B256,
116 overlay: Option<Vec<ExecutedBlock<N>>>,
117 ) -> Self {
118 Self { provider_factory, historical, overlay }
119 }
120}
121
122impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
123where
124 P: BlockReader + StateProviderFactory + StateReader + Clone,
125{
126 pub fn build(&self) -> ProviderResult<StateProviderBox> {
128 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
129 if let Some(overlay) = self.overlay.clone() {
130 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
131 }
132 Ok(provider)
133 }
134}
135
136#[derive(Debug)]
140pub struct EngineApiTreeState<N: NodePrimitives> {
141 tree_state: TreeState<N>,
143 forkchoice_state_tracker: ForkchoiceStateTracker,
145 buffer: BlockBuffer<N::Block>,
147 invalid_headers: InvalidHeaderCache,
150}
151
152impl<N: NodePrimitives> EngineApiTreeState<N> {
153 fn new(
154 block_buffer_limit: u32,
155 max_invalid_header_cache_length: u32,
156 invalid_header_hit_eviction_threshold: u8,
157 canonical_block: BlockNumHash,
158 engine_kind: EngineApiKind,
159 ) -> Self {
160 Self {
161 invalid_headers: InvalidHeaderCache::new(
162 max_invalid_header_cache_length,
163 invalid_header_hit_eviction_threshold,
164 ),
165 buffer: BlockBuffer::new(block_buffer_limit),
166 tree_state: TreeState::new(canonical_block, engine_kind),
167 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
168 }
169 }
170
171 pub const fn tree_state(&self) -> &TreeState<N> {
173 &self.tree_state
174 }
175
176 pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
178 self.invalid_headers.get(hash).is_some()
179 }
180}
181
182#[derive(Debug)]
184pub struct TreeOutcome<T> {
185 pub outcome: T,
187 pub event: Option<TreeEvent>,
189 pub already_seen: bool,
192}
193
194impl<T> TreeOutcome<T> {
195 pub const fn new(outcome: T) -> Self {
197 Self { outcome, event: None, already_seen: false }
198 }
199
200 pub fn with_event(mut self, event: TreeEvent) -> Self {
202 self.event = Some(event);
203 self
204 }
205
206 pub const fn with_already_seen(mut self, value: bool) -> Self {
208 self.already_seen = value;
209 self
210 }
211}
212
213#[derive(Debug)]
215pub struct TryInsertPayloadResult {
216 pub status: PayloadStatus,
220 pub already_seen: bool,
222}
223
224impl TryInsertPayloadResult {
225 #[inline]
227 pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
228 TreeOutcome::new(self.status).with_already_seen(self.already_seen)
229 }
230}
231
232#[derive(Debug)]
234pub enum TreeEvent {
235 TreeAction(TreeAction),
237 BackfillAction(BackfillAction),
239 Download(DownloadRequest),
241}
242
243impl TreeEvent {
244 const fn is_backfill_action(&self) -> bool {
246 matches!(self, Self::BackfillAction(_))
247 }
248}
249
250#[derive(Debug)]
252pub enum TreeAction {
253 MakeCanonical {
255 sync_target_head: B256,
257 },
258}
259
260pub struct EngineApiTreeHandler<N, P, T, V, C>
265where
266 N: NodePrimitives,
267 T: PayloadTypes,
268 C: ConfigureEvm<Primitives = N> + 'static,
269{
270 provider: P,
271 consensus: Arc<dyn FullConsensus<N>>,
272 payload_validator: V,
273 state: EngineApiTreeState<N>,
275 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
284 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
286 outgoing: UnboundedSender<EngineApiEvent<N>>,
288 persistence: PersistenceHandle<N>,
290 persistence_state: PersistenceState,
292 backfill_sync_state: BackfillSyncState,
294 canonical_in_memory_state: CanonicalInMemoryState<N>,
297 payload_builder: PayloadBuilderHandle<T>,
300 config: TreeConfig,
302 metrics: EngineApiMetrics,
304 engine_kind: EngineApiKind,
306 evm_config: C,
308 changeset_cache: ChangesetCache,
310 execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
314 building_payload: bool,
317 runtime: reth_tasks::Runtime,
319}
320
321impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
322 for EngineApiTreeHandler<N, P, T, V, C>
323where
324 N: NodePrimitives,
325 C: Debug + ConfigureEvm<Primitives = N>,
326{
327 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328 f.debug_struct("EngineApiTreeHandler")
329 .field("provider", &self.provider)
330 .field("consensus", &self.consensus)
331 .field("payload_validator", &self.payload_validator)
332 .field("state", &self.state)
333 .field("incoming_tx", &self.incoming_tx)
334 .field("persistence", &self.persistence)
335 .field("persistence_state", &self.persistence_state)
336 .field("backfill_sync_state", &self.backfill_sync_state)
337 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
338 .field("payload_builder", &self.payload_builder)
339 .field("config", &self.config)
340 .field("metrics", &self.metrics)
341 .field("engine_kind", &self.engine_kind)
342 .field("evm_config", &self.evm_config)
343 .field("changeset_cache", &self.changeset_cache)
344 .field("execution_timing_stats", &self.execution_timing_stats.len())
345 .field("runtime", &self.runtime)
346 .finish()
347 }
348}
349
350impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
351where
352 N: NodePrimitives,
353 P: DatabaseProviderFactory
354 + BlockReader<Block = N::Block, Header = N::BlockHeader>
355 + StateProviderFactory
356 + StateReader<Receipt = N::Receipt>
357 + HashedPostStateProvider
358 + Clone
359 + 'static,
360 P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
361 + StageCheckpointReader
362 + ChangeSetReader
363 + StorageChangeSetReader
364 + StorageSettingsCache,
365 C: ConfigureEvm<Primitives = N> + 'static,
366 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
367 V: EngineValidator<T> + WaitForCaches,
368{
369 #[expect(clippy::too_many_arguments)]
371 pub fn new(
372 provider: P,
373 consensus: Arc<dyn FullConsensus<N>>,
374 payload_validator: V,
375 outgoing: UnboundedSender<EngineApiEvent<N>>,
376 state: EngineApiTreeState<N>,
377 canonical_in_memory_state: CanonicalInMemoryState<N>,
378 persistence: PersistenceHandle<N>,
379 persistence_state: PersistenceState,
380 payload_builder: PayloadBuilderHandle<T>,
381 config: TreeConfig,
382 engine_kind: EngineApiKind,
383 evm_config: C,
384 changeset_cache: ChangesetCache,
385 runtime: reth_tasks::Runtime,
386 ) -> Self {
387 let (incoming_tx, incoming) = crossbeam_channel::unbounded();
388
389 Self {
390 provider,
391 consensus,
392 payload_validator,
393 incoming,
394 outgoing,
395 persistence,
396 persistence_state,
397 backfill_sync_state: BackfillSyncState::Idle,
398 state,
399 canonical_in_memory_state,
400 payload_builder,
401 config,
402 metrics: Default::default(),
403 incoming_tx,
404 engine_kind,
405 evm_config,
406 changeset_cache,
407 execution_timing_stats: HashMap::new(),
408 building_payload: false,
409 runtime,
410 }
411 }
412
413 #[expect(clippy::complexity)]
419 pub fn spawn_new(
420 provider: P,
421 consensus: Arc<dyn FullConsensus<N>>,
422 payload_validator: V,
423 persistence: PersistenceHandle<N>,
424 payload_builder: PayloadBuilderHandle<T>,
425 canonical_in_memory_state: CanonicalInMemoryState<N>,
426 config: TreeConfig,
427 kind: EngineApiKind,
428 evm_config: C,
429 changeset_cache: ChangesetCache,
430 runtime: reth_tasks::Runtime,
431 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
432 {
433 let best_block_number = provider.best_block_number().unwrap_or(0);
434 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
435
436 let persistence_state = PersistenceState {
437 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
438 rx: None,
439 };
440
441 let (tx, outgoing) = unbounded_channel();
442 let state = EngineApiTreeState::new(
443 config.block_buffer_limit(),
444 config.max_invalid_header_cache_length(),
445 config.invalid_header_hit_eviction_threshold(),
446 header.num_hash(),
447 kind,
448 );
449
450 let task = Self::new(
451 provider,
452 consensus,
453 payload_validator,
454 tx,
455 state,
456 canonical_in_memory_state,
457 persistence,
458 persistence_state,
459 payload_builder,
460 config,
461 kind,
462 evm_config,
463 changeset_cache,
464 runtime,
465 );
466 let incoming = task.incoming_tx.clone();
467 spawn_os_thread("engine", || {
468 increase_thread_priority();
469 task.run()
470 });
471 (incoming, outgoing)
472 }
473
474 fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
476 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
477 PayloadStatusEnum::Valid,
478 Some(state.head_block_hash),
479 )))
480 }
481
482 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
484 self.incoming_tx.clone()
485 }
486
487 const fn persistence_gap(&self) -> u64 {
490 self.state
491 .tree_state
492 .canonical_block_number()
493 .saturating_sub(self.persistence_state.last_persisted_block.number)
494 }
495
496 const fn should_backpressure(&self) -> bool {
501 self.persistence_state.in_progress() &&
502 self.persistence_gap() >= self.config.persistence_backpressure_threshold()
503 }
504
505 pub fn run(mut self) {
509 loop {
510 match self.try_poll_persistence() {
534 Ok(true) => {
535 if let Err(err) = self.advance_persistence() {
536 error!(target: "engine::tree", %err, "Advancing persistence failed");
537 return
538 }
539 continue;
540 }
541 Ok(false) => {}
542 Err(err) => {
543 error!(target: "engine::tree", %err, "Polling persistence failed");
544 return
545 }
546 }
547
548 let event = if self.should_backpressure() {
549 self.metrics.engine.backpressure_active.set(1.0);
550 let stall_start = Instant::now();
551 let event = self.wait_for_persistence_event();
552 self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
553 event
554 } else {
555 self.metrics.engine.backpressure_active.set(0.0);
556 self.wait_for_event()
557 };
558
559 match event {
560 LoopEvent::EngineMessage(msg) => {
561 debug!(target: "engine::tree", %msg, "received new engine message");
562 match self.on_engine_message(msg) {
563 Ok(ops::ControlFlow::Break(())) => return,
564 Ok(ops::ControlFlow::Continue(())) => {}
565 Err(fatal) => {
566 error!(target: "engine::tree", %fatal, "insert block fatal error");
567 return
568 }
569 }
570 }
571 LoopEvent::PersistenceComplete { result, start_time } => {
572 if let Err(err) = self.on_persistence_complete(result, start_time) {
573 error!(target: "engine::tree", %err, "Persistence complete handling failed");
574 return
575 }
576 }
577 LoopEvent::Disconnected => {
578 error!(target: "engine::tree", "Channel disconnected");
579 return
580 }
581 }
582
583 if let Err(err) = self.advance_persistence() {
588 error!(target: "engine::tree", %err, "Advancing persistence failed");
589 return
590 }
591 }
592 }
593
594 fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
600 let maybe_persistence = self.persistence_state.rx.take();
601
602 if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
603 match persistence_rx.recv() {
604 Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
605 Err(_) => LoopEvent::Disconnected,
606 }
607 } else {
608 self.wait_for_event()
609 }
610 }
611
612 fn wait_for_event(&mut self) -> LoopEvent<T, N> {
618 let maybe_persistence = self.persistence_state.rx.take();
620
621 if let Some((persistence_rx, start_time, action)) = maybe_persistence {
622 crossbeam_channel::select_biased! {
625 recv(persistence_rx) -> result => {
626 match result {
628 Ok(result) => LoopEvent::PersistenceComplete {
629 result,
630 start_time,
631 },
632 Err(_) => LoopEvent::Disconnected,
633 }
634 },
635 recv(self.incoming) -> msg => {
636 self.persistence_state.rx = Some((persistence_rx, start_time, action));
638 match msg {
639 Ok(m) => LoopEvent::EngineMessage(m),
640 Err(_) => LoopEvent::Disconnected,
641 }
642 },
643 }
644 } else {
645 match self.incoming.recv() {
647 Ok(m) => LoopEvent::EngineMessage(m),
648 Err(_) => LoopEvent::Disconnected,
649 }
650 }
651 }
652
653 fn on_downloaded(
659 &mut self,
660 mut blocks: Vec<SealedBlock<N::Block>>,
661 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
662 if blocks.is_empty() {
663 return Ok(None)
665 }
666
667 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
668 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
669 for block in blocks.drain(..batch) {
670 if let Some(event) = self.on_downloaded_block(block)? {
671 let needs_backfill = event.is_backfill_action();
672 self.on_tree_event(event)?;
673 if needs_backfill {
674 return Ok(None)
676 }
677 }
678 }
679
680 if !blocks.is_empty() {
682 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
683 }
684
685 Ok(None)
686 }
687
688 #[instrument(
703 level = "debug",
704 target = "engine::tree",
705 skip_all,
706 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
707 )]
708 fn on_new_payload(
709 &mut self,
710 payload: T::ExecutionData,
711 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
712 trace!(target: "engine::tree", "invoked new payload");
713
714 let start = Instant::now();
716
717 let num_hash = payload.num_hash();
744 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
745 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
746
747 let block_hash = num_hash.hash;
748
749 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
751 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
752 return Ok(TreeOutcome::new(status));
753 }
754
755 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
757
758 let mut outcome = if self.backfill_sync_state.is_idle() {
759 self.try_insert_payload(payload)?.into_outcome()
760 } else {
761 TreeOutcome::new(self.try_buffer_payload(payload)?)
762 };
763
764 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
766 if self.state.tree_state.canonical_block_hash() != block_hash {
768 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
769 sync_target_head: block_hash,
770 }));
771 }
772 }
773
774 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
776
777 Ok(outcome)
778 }
779
780 #[instrument(level = "debug", target = "engine::tree", skip_all)]
782 fn try_insert_payload(
783 &mut self,
784 payload: T::ExecutionData,
785 ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
786 let block_hash = payload.block_hash();
787 let num_hash = payload.num_hash();
788 let parent_hash = payload.parent_hash();
789 let mut latest_valid_hash = None;
790
791 match self.insert_payload(payload) {
792 Ok(status) => {
793 let (status, already_seen) = match status {
794 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
795 latest_valid_hash = Some(block_hash);
796 self.try_connect_buffered_blocks(num_hash)?;
797 (PayloadStatusEnum::Valid, false)
798 }
799 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
800 latest_valid_hash = Some(block_hash);
801 (PayloadStatusEnum::Valid, true)
802 }
803 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
804 (PayloadStatusEnum::Syncing, false)
805 }
806 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
807 (PayloadStatusEnum::Syncing, true)
809 }
810 };
811
812 Ok(TryInsertPayloadResult {
813 status: PayloadStatus::new(status, latest_valid_hash),
814 already_seen,
815 })
816 }
817 Err(error) => {
818 let status = match error {
819 InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
820 InsertPayloadError::Payload(error) => {
821 self.on_new_payload_error(error, num_hash, parent_hash)?
822 }
823 };
824
825 Ok(TryInsertPayloadResult { status, already_seen: false })
826 }
827 }
828 }
829
830 fn try_buffer_payload(
839 &mut self,
840 payload: T::ExecutionData,
841 ) -> Result<PayloadStatus, InsertBlockFatalError> {
842 let parent_hash = payload.parent_hash();
843 let num_hash = payload.num_hash();
844
845 match self.payload_validator.convert_payload_to_block(payload) {
846 Ok(block) => {
848 if let Err(error) = self.buffer_block(block) {
849 Ok(self.on_insert_block_error(error)?)
850 } else {
851 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
852 }
853 }
854 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
855 }
856 }
857
858 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
865 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
867 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
868 self.metrics.engine.executed_new_block_cache_miss.increment(1);
869 return Ok(None)
870 };
871
872 let new_head_number = new_head_block.recovered_block().number();
873 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
874
875 let mut new_chain = vec![new_head_block.clone()];
876 let mut current_hash = new_head_block.recovered_block().parent_hash();
877 let mut current_number = new_head_number - 1;
878
879 while current_number > current_canonical_number {
884 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
885 {
886 current_hash = block.recovered_block().parent_hash();
887 current_number -= 1;
888 new_chain.push(block);
889 } else {
890 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
891 return Ok(None)
894 }
895 }
896
897 if current_hash == self.state.tree_state.current_canonical_head.hash {
900 new_chain.reverse();
901
902 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
904 }
905
906 let mut old_chain = Vec::new();
908 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
909
910 while current_canonical_number > current_number {
913 let block = self.canonical_block_by_hash(old_hash)?;
914 old_hash = block.recovered_block().parent_hash();
915 old_chain.push(block);
916 current_canonical_number -= 1;
917 }
918
919 debug_assert_eq!(current_number, current_canonical_number);
921
922 while old_hash != current_hash {
925 let block = self.canonical_block_by_hash(old_hash)?;
926 old_hash = block.recovered_block().parent_hash();
927 old_chain.push(block);
928
929 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
930 {
931 current_hash = block.recovered_block().parent_hash();
932 new_chain.push(block);
933 } else {
934 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
936 return Ok(None)
937 }
938 }
939 new_chain.reverse();
940 old_chain.reverse();
941
942 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
943 }
944
945 fn update_latest_block_to_canonical_ancestor(
957 &mut self,
958 canonical_header: &SealedHeader<N::BlockHeader>,
959 ) -> ProviderResult<()> {
960 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
961 let current_head_number = self.state.tree_state.canonical_block_number();
962 let new_head_number = canonical_header.number();
963 let new_head_hash = canonical_header.hash();
964
965 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
967
968 if new_head_number < current_head_number {
970 debug!(
971 target: "engine::tree",
972 current_head = current_head_number,
973 new_head = new_head_number,
974 new_head_hash = ?new_head_hash,
975 "FCU unwind detected: reverting to canonical ancestor"
976 );
977
978 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
979 } else {
980 debug!(
981 target: "engine::tree",
982 previous_head = current_head_number,
983 new_head = new_head_number,
984 new_head_hash = ?new_head_hash,
985 "Advancing latest block to canonical ancestor"
986 );
987 self.handle_chain_advance_or_same_height(canonical_header)
988 }
989 }
990
991 fn handle_canonical_chain_unwind(
994 &self,
995 current_head_number: u64,
996 canonical_header: &SealedHeader<N::BlockHeader>,
997 ) -> ProviderResult<()> {
998 let new_head_number = canonical_header.number();
999 debug!(
1000 target: "engine::tree",
1001 from = current_head_number,
1002 to = new_head_number,
1003 "Handling unwind: collecting blocks to remove from in-memory state"
1004 );
1005
1006 let old_blocks =
1008 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
1009
1010 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1012 }
1013
1014 fn collect_blocks_for_canonical_unwind(
1016 &self,
1017 new_head_number: u64,
1018 current_head_number: u64,
1019 ) -> Vec<ExecutedBlock<N>> {
1020 let mut old_blocks =
1021 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1022
1023 for block_num in (new_head_number + 1)..=current_head_number {
1024 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1025 let executed_block = block_state.block_ref().clone();
1026 old_blocks.push(executed_block);
1027 debug!(
1028 target: "engine::tree",
1029 block_number = block_num,
1030 "Collected block for removal from in-memory state"
1031 );
1032 }
1033 }
1034
1035 if old_blocks.is_empty() {
1036 debug!(
1037 target: "engine::tree",
1038 "No blocks found in memory to remove, will clear and reset state"
1039 );
1040 }
1041
1042 old_blocks
1043 }
1044
1045 fn apply_canonical_ancestor_via_reorg(
1047 &self,
1048 canonical_header: &SealedHeader<N::BlockHeader>,
1049 old_blocks: Vec<ExecutedBlock<N>>,
1050 ) -> ProviderResult<()> {
1051 let new_head_hash = canonical_header.hash();
1052 let new_head_number = canonical_header.number();
1053
1054 let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1056 self.canonical_in_memory_state
1058 .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1059
1060 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1063
1064 debug!(
1065 target: "engine::tree",
1066 block_number = new_head_number,
1067 block_hash = ?new_head_hash,
1068 "Successfully loaded canonical ancestor into memory via reorg"
1069 );
1070
1071 Ok(())
1072 }
1073
1074 fn handle_chain_advance_or_same_height(
1076 &self,
1077 canonical_header: &SealedHeader<N::BlockHeader>,
1078 ) -> ProviderResult<()> {
1079 self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1081
1082 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1084
1085 Ok(())
1086 }
1087
1088 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1090 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1092 return Ok(());
1093 }
1094
1095 let executed_block = self.canonical_block_by_hash(block_hash)?;
1097 self.canonical_in_memory_state
1098 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1099
1100 debug!(
1101 target: "engine::tree",
1102 block_number,
1103 block_hash = ?block_hash,
1104 "Added canonical block to in-memory state"
1105 );
1106
1107 Ok(())
1108 }
1109
1110 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1119 fn on_forkchoice_updated(
1120 &mut self,
1121 state: ForkchoiceState,
1122 attrs: Option<T::PayloadAttributes>,
1123 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1124 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1125
1126 self.building_payload = attrs.is_some() && self.config.suppress_persistence_during_build();
1127
1128 self.record_forkchoice_metrics();
1130
1131 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1133 return Ok(TreeOutcome::new(early_result));
1134 }
1135
1136 if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1138 return Ok(result);
1139 }
1140
1141 if let Some(result) = self.apply_chain_update(state, &attrs)? {
1144 return Ok(result);
1145 }
1146
1147 self.handle_missing_block(state)
1149 }
1150
1151 fn record_forkchoice_metrics(&self) {
1153 self.canonical_in_memory_state.on_forkchoice_update_received();
1154 }
1155
1156 fn validate_forkchoice_state(
1161 &mut self,
1162 state: ForkchoiceState,
1163 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1164 if state.head_block_hash.is_zero() {
1165 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1166 }
1167
1168 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1171 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1172 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1173 }
1174
1175 if !self.backfill_sync_state.is_idle() {
1176 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1179 return Ok(Some(OnForkChoiceUpdated::syncing()));
1180 }
1181
1182 Ok(None)
1183 }
1184
1185 fn handle_canonical_head(
1191 &self,
1192 state: ForkchoiceState,
1193 attrs: &Option<T::PayloadAttributes>, ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1195 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1210 return Ok(None);
1211 }
1212
1213 trace!(target: "engine::tree", "fcu head hash is already canonical");
1214
1215 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1217 return Ok(Some(TreeOutcome::new(outcome)));
1219 }
1220
1221 if let Some(attr) = attrs {
1223 let tip = self
1224 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1225 .ok_or_else(|| {
1226 ProviderError::HeaderNotFound(state.head_block_hash.into())
1229 })?;
1230 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1232 return Ok(Some(TreeOutcome::new(updated)));
1233 }
1234
1235 Ok(Some(Self::valid_outcome(state)))
1237 }
1238
1239 fn apply_chain_update(
1251 &mut self,
1252 state: ForkchoiceState,
1253 attrs: &Option<T::PayloadAttributes>,
1254 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1255 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1257 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1258
1259 if self.engine_kind.is_opstack() ||
1262 self.config.always_process_payload_attributes_on_canonical_head()
1263 {
1264 if self.config.unwind_canonical_header() {
1270 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1271 }
1272
1273 if let Some(attr) = attrs {
1274 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1275 let updated =
1277 self.process_payload_attributes(attr.clone(), &canonical_header, state);
1278 return Ok(Some(TreeOutcome::new(updated)));
1279 }
1280 }
1281
1282 return Ok(Some(Self::valid_outcome(state)));
1293 }
1294
1295 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1297 let tip = chain_update.tip().clone_sealed_header();
1298 self.on_canonical_chain_update(chain_update);
1299
1300 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1302 return Ok(Some(TreeOutcome::new(outcome)));
1304 }
1305
1306 if let Some(attr) = attrs {
1307 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1309 return Ok(Some(TreeOutcome::new(updated)));
1310 }
1311
1312 return Ok(Some(Self::valid_outcome(state)));
1313 }
1314
1315 Ok(None)
1316 }
1317
1318 fn handle_missing_block(
1323 &self,
1324 state: ForkchoiceState,
1325 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1326 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1333 !state.safe_block_hash.is_zero() &&
1335 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1336 {
1337 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1338 state.safe_block_hash
1339 } else {
1340 state.head_block_hash
1341 };
1342
1343 let target = self.lowest_buffered_ancestor_or(target);
1344 trace!(target: "engine::tree", %target, "downloading missing block");
1345
1346 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1347 PayloadStatusEnum::Syncing,
1348 )))
1349 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1350 }
1351
1352 fn remove_blocks(&mut self, new_tip_num: u64) {
1355 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1356 if new_tip_num < self.persistence_state.last_persisted_block.number {
1357 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1358 let (tx, rx) = crossbeam_channel::bounded(1);
1359 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1360 self.persistence_state.start_remove(new_tip_num, rx);
1361 }
1362 }
1363
1364 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1367 if blocks_to_persist.is_empty() {
1368 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1369 return
1370 }
1371
1372 let highest_num_hash = blocks_to_persist
1374 .iter()
1375 .max_by_key(|block| block.recovered_block().number())
1376 .map(|b| b.recovered_block().num_hash())
1377 .expect("Checked non-empty persisting blocks");
1378
1379 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1380 let (tx, rx) = crossbeam_channel::bounded(1);
1381 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1382
1383 self.persistence_state.start_save(highest_num_hash, rx);
1384 }
1385
1386 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1391 if !self.persistence_state.in_progress() {
1392 if let Some(new_tip_num) = self.find_disk_reorg()? {
1393 self.remove_blocks(new_tip_num)
1394 } else if self.should_persist() {
1395 let blocks_to_persist =
1396 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1397 self.persist_blocks(blocks_to_persist);
1398 }
1399 }
1400
1401 Ok(())
1402 }
1403
1404 fn finish_termination(
1409 &mut self,
1410 pending_termination: oneshot::Sender<()>,
1411 ) -> Result<(), AdvancePersistenceError> {
1412 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1413 let result = self.persist_until_complete();
1414 let _ = pending_termination.send(());
1415 result
1416 }
1417
1418 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1420 loop {
1421 if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1423 debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1424 let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1425 self.on_persistence_complete(result, start_time)?;
1426 }
1427
1428 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1429
1430 if blocks_to_persist.is_empty() {
1431 debug!(target: "engine::tree", "persistence complete, signaling termination");
1432 return Ok(())
1433 }
1434
1435 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1436 self.persist_blocks(blocks_to_persist);
1437 }
1438 }
1439
1440 fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1444 let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1445 return Ok(false);
1446 };
1447
1448 match rx.try_recv() {
1449 Ok(result) => {
1450 self.on_persistence_complete(result, start_time)?;
1451 Ok(true)
1452 }
1453 Err(crossbeam_channel::TryRecvError::Empty) => {
1454 self.persistence_state.rx = Some((rx, start_time, action));
1456 Ok(false)
1457 }
1458 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1459 Err(AdvancePersistenceError::ChannelClosed)
1460 }
1461 }
1462 }
1463
1464 fn on_persistence_complete(
1466 &mut self,
1467 result: PersistenceResult,
1468 start_time: Instant,
1469 ) -> Result<(), AdvancePersistenceError> {
1470 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1471
1472 let commit_duration = result.commit_duration;
1473 let Some(BlockNumHash {
1474 hash: last_persisted_block_hash,
1475 number: last_persisted_block_number,
1476 }) = result.last_block
1477 else {
1478 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1480 return Ok(())
1481 };
1482
1483 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1484 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1485
1486 let min_threshold =
1490 last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1491 let eviction_threshold =
1492 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1493 finalized.number.min(min_threshold)
1495 } else {
1496 min_threshold
1498 };
1499 debug!(
1500 target: "engine::tree",
1501 last_persisted = last_persisted_block_number,
1502 finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1503 eviction_threshold,
1504 "Evicting changesets below threshold"
1505 );
1506 self.changeset_cache.evict(eviction_threshold);
1507
1508 self.state.tree_state.invalidate_cached_overlay();
1510
1511 self.on_new_persisted_block()?;
1512
1513 if let Some(prepared) = self.state.tree_state.prepare_canonical_overlay() {
1517 self.runtime.spawn_blocking_named("prepare-overlay", move || {
1518 let _ = prepared.overlay.get(prepared.anchor_hash);
1519 });
1520 }
1521
1522 self.purge_timing_stats(last_persisted_block_number, commit_duration);
1523
1524 Ok(())
1525 }
1526
1527 fn on_engine_message(
1531 &mut self,
1532 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1533 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1534 match msg {
1535 FromEngine::Event(event) => match event {
1536 FromOrchestrator::BackfillSyncStarted => {
1537 debug!(target: "engine::tree", "received backfill sync started event");
1538 self.backfill_sync_state = BackfillSyncState::Active;
1539 }
1540 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1541 self.on_backfill_sync_finished(ctrl)?;
1542 }
1543 FromOrchestrator::Terminate { tx } => {
1544 debug!(target: "engine::tree", "received terminate request");
1545 if let Err(err) = self.finish_termination(tx) {
1546 error!(target: "engine::tree", %err, "Termination failed");
1547 }
1548 return Ok(ops::ControlFlow::Break(()))
1549 }
1550 },
1551 FromEngine::Request(request) => {
1552 match request {
1553 EngineApiRequest::InsertExecutedBlock(payload) => {
1554 let block_num_hash = payload.recovered_block.num_hash();
1555 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1556 return Ok(ops::ControlFlow::Continue(()))
1558 }
1559
1560 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
1561 return Ok(ops::ControlFlow::Continue(()))
1563 }
1564
1565 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1566 let now = Instant::now();
1567
1568 let block = match self
1569 .payload_validator
1570 .on_inserted_executed_block(payload, &self.state)
1571 {
1572 Ok(block) => block,
1573 Err(err) => {
1574 warn!(target: "engine::tree", %err, block=?block_num_hash, "Failed to insert already executed block");
1575 return Ok(ops::ControlFlow::Continue(()))
1576 }
1577 };
1578
1579 if self.state.tree_state.canonical_block_hash() ==
1582 block.recovered_block().parent_hash()
1583 {
1584 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1585 self.canonical_in_memory_state.set_pending_block(block.clone());
1586 }
1587
1588 self.state.tree_state.insert_executed(block.clone());
1589 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1590 self.emit_event(EngineApiEvent::BeaconConsensus(
1591 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1592 ));
1593 }
1594 EngineApiRequest::Beacon(request) => {
1595 match request {
1596 BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1597 let has_attrs = payload_attrs.is_some();
1598
1599 let start = Instant::now();
1600 let mut output = self.on_forkchoice_updated(state, payload_attrs);
1601
1602 if let Ok(res) = &mut output {
1603 self.state
1605 .forkchoice_state_tracker
1606 .set_latest(state, res.outcome.forkchoice_status());
1607
1608 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1610 state,
1611 res.outcome.forkchoice_status(),
1612 ));
1613
1614 self.on_maybe_tree_event(res.event.take())?;
1616 }
1617
1618 if let Err(ref err) = output {
1619 error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1620 }
1621
1622 self.metrics.engine.forkchoice_updated.update_response_metrics(
1623 start,
1624 &mut self.metrics.engine.new_payload.latest_finish_at,
1625 has_attrs,
1626 &output,
1627 );
1628
1629 if let Err(err) =
1630 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1631 {
1632 self.metrics
1633 .engine
1634 .failed_forkchoice_updated_response_deliveries
1635 .increment(1);
1636 warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1637 }
1638 }
1639 BeaconEngineMessage::NewPayload { payload, tx } => {
1640 let start = Instant::now();
1641 let gas_used = payload.gas_used();
1642 let num_hash = payload.num_hash();
1643 let mut output = self.on_new_payload(payload);
1644 self.metrics.engine.new_payload.update_response_metrics(
1645 start,
1646 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1647 &output,
1648 gas_used,
1649 );
1650
1651 let maybe_event =
1652 output.as_mut().ok().and_then(|out| out.event.take());
1653
1654 if let Err(err) =
1656 tx.send(output.map(|o| o.outcome).map_err(|e| {
1657 BeaconOnNewPayloadError::Internal(Box::new(e))
1658 }))
1659 {
1660 warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1661 self.metrics
1662 .engine
1663 .failed_new_payload_response_deliveries
1664 .increment(1);
1665 }
1666
1667 self.on_maybe_tree_event(maybe_event)?;
1669 }
1670 BeaconEngineMessage::RethNewPayload {
1671 payload,
1672 wait_for_persistence,
1673 wait_for_caches,
1674 tx,
1675 enqueued_at,
1676 } => {
1677 debug!(
1678 target: "engine::tree",
1679 wait_for_persistence,
1680 wait_for_caches,
1681 "Processing reth_newPayload"
1682 );
1683
1684 let backpressure_wait = enqueued_at.elapsed();
1685
1686 let explicit_persistence_wait = if wait_for_persistence {
1687 let pending_persistence = self.persistence_state.rx.take();
1688 if let Some((rx, start_time, _action)) = pending_persistence {
1689 let (persistence_tx, persistence_rx) =
1690 std::sync::mpsc::channel();
1691 self.runtime.spawn_blocking_named(
1692 "wait-persist",
1693 move || {
1694 let start = Instant::now();
1695 let result = rx
1696 .recv()
1697 .expect("persistence state channel closed");
1698 let _ = persistence_tx.send((
1699 result,
1700 start_time,
1701 start.elapsed(),
1702 ));
1703 },
1704 );
1705 let (result, start_time, wait_duration) = persistence_rx
1706 .recv()
1707 .expect("persistence result channel closed");
1708 let _ = self.on_persistence_complete(result, start_time);
1709 wait_duration
1710 } else {
1711 Duration::ZERO
1712 }
1713 } else {
1714 Duration::ZERO
1715 };
1716
1717 let cache_wait = wait_for_caches
1718 .then(|| self.payload_validator.wait_for_caches());
1719
1720 let start = Instant::now();
1721 let gas_used = payload.gas_used();
1722 let num_hash = payload.num_hash();
1723 let mut output = self.on_new_payload(payload);
1724 let latency = start.elapsed();
1725 self.metrics.engine.new_payload.update_response_metrics(
1726 start,
1727 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1728 &output,
1729 gas_used,
1730 );
1731
1732 let maybe_event =
1733 output.as_mut().ok().and_then(|out| out.event.take());
1734
1735 let timings = NewPayloadTimings {
1736 latency,
1737 persistence_wait: backpressure_wait + explicit_persistence_wait,
1738 execution_cache_wait: cache_wait
1739 .map(|wait| wait.execution_cache),
1740 sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1741 };
1742 if let Err(err) =
1743 tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1744 BeaconOnNewPayloadError::Internal(Box::new(e))
1745 }))
1746 {
1747 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1748 self.metrics
1749 .engine
1750 .failed_new_payload_response_deliveries
1751 .increment(1);
1752 }
1753
1754 self.on_maybe_tree_event(maybe_event)?;
1755 }
1756 }
1757 }
1758 }
1759 }
1760 FromEngine::DownloadedBlocks(blocks) => {
1761 if let Some(event) = self.on_downloaded(blocks)? {
1762 self.on_tree_event(event)?;
1763 }
1764 }
1765 }
1766 Ok(ops::ControlFlow::Continue(()))
1767 }
1768
1769 fn on_backfill_sync_finished(
1783 &mut self,
1784 ctrl: ControlFlow,
1785 ) -> Result<(), InsertBlockFatalError> {
1786 debug!(target: "engine::tree", "received backfill sync finished event");
1787 self.backfill_sync_state = BackfillSyncState::Idle;
1788
1789 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1791 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1792 self.state.invalid_headers.insert(**bad_block);
1794
1795 Some(*target)
1797 } else {
1798 ctrl.block_number()
1800 };
1801
1802 let Some(backfill_height) = backfill_height else { return Ok(()) };
1804
1805 let Some(backfill_num_hash) = self
1811 .provider
1812 .block_hash(backfill_height)?
1813 .map(|hash| BlockNumHash { hash, number: backfill_height })
1814 else {
1815 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1816 return Ok(())
1817 };
1818
1819 if ctrl.is_unwind() {
1820 self.state.tree_state.reset(backfill_num_hash)
1823 } else {
1824 self.state.tree_state.remove_until(
1825 backfill_num_hash,
1826 self.persistence_state.last_persisted_block.hash,
1827 Some(backfill_num_hash),
1828 );
1829 }
1830
1831 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1832 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1833
1834 self.state.buffer.remove_old_blocks(backfill_height);
1836 self.purge_timing_stats(backfill_height, None);
1837 self.canonical_in_memory_state.clear_state();
1840
1841 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1842 self.state.tree_state.set_canonical_head(new_head.num_hash());
1845 self.persistence_state.finish(new_head.hash(), new_head.number());
1846
1847 self.canonical_in_memory_state.set_canonical_head(new_head);
1849 }
1850
1851 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1854 else {
1855 return Ok(())
1856 };
1857 if sync_target_state.finalized_block_hash.is_zero() {
1858 return Ok(())
1860 }
1861 let newest_finalized = self
1863 .state
1864 .buffer
1865 .block(&sync_target_state.finalized_block_hash)
1866 .map(|block| block.number());
1867
1868 if let Some(backfill_target) =
1874 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1875 self.backfill_sync_target(progress, finalized_number, None)
1878 })
1879 {
1880 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1882 backfill_target.into(),
1883 )));
1884 return Ok(())
1885 };
1886
1887 if let Some(lowest_buffered) =
1889 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1890 {
1891 let current_head_num = self.state.tree_state.current_canonical_head.number;
1892 let target_head_num = lowest_buffered.number();
1893
1894 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1895 {
1896 debug!(
1898 target: "engine::tree",
1899 %current_head_num,
1900 %target_head_num,
1901 %distance,
1902 "Backfill complete, downloading remaining blocks to reach FCU target"
1903 );
1904
1905 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1906 lowest_buffered.parent_hash(),
1907 distance,
1908 )));
1909 return Ok(());
1910 }
1911 } else {
1912 debug!(
1915 target: "engine::tree",
1916 head_hash = %sync_target_state.head_block_hash,
1917 "Backfill complete but head block not buffered, requesting download"
1918 );
1919 self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1920 sync_target_state.head_block_hash,
1921 )));
1922 return Ok(());
1923 }
1924
1925 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1927 }
1928
1929 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1933 if let Some(chain_update) = self.on_new_head(target)? {
1934 self.on_canonical_chain_update(chain_update);
1935 }
1936
1937 self.on_canonicalized_sync_target(target);
1938
1939 Ok(())
1940 }
1941
1942 fn on_canonicalized_sync_target(&mut self, target: B256) {
1944 let Some(sync_target_state) = self
1945 .state
1946 .forkchoice_state_tracker
1947 .sync_target_state()
1948 .filter(|state| state.head_block_hash == target)
1949 else {
1950 return;
1951 };
1952
1953 if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
1954 debug!(
1955 target: "engine::tree",
1956 head = %sync_target_state.head_block_hash,
1957 safe = %sync_target_state.safe_block_hash,
1958 finalized = %sync_target_state.finalized_block_hash,
1959 ?outcome,
1960 "Canonicalized sync target head before safe/finalized could be applied"
1961 );
1962 return;
1963 }
1964
1965 self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
1966 }
1967
1968 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1970 if let Some(event) = event {
1971 self.on_tree_event(event)?;
1972 }
1973
1974 Ok(())
1975 }
1976
1977 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1981 match event {
1982 TreeEvent::TreeAction(action) => match action {
1983 TreeAction::MakeCanonical { sync_target_head } => {
1984 self.make_canonical(sync_target_head)?;
1985 }
1986 },
1987 TreeEvent::BackfillAction(action) => {
1988 self.emit_event(EngineApiEvent::BackfillAction(action));
1989 }
1990 TreeEvent::Download(action) => {
1991 self.emit_event(EngineApiEvent::Download(action));
1992 }
1993 }
1994
1995 Ok(())
1996 }
1997
1998 fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
2005 let threshold = self.config.slow_block_threshold();
2006 let check_slow = commit_duration.is_some() && threshold.is_some();
2007
2008 let keys_to_remove: Vec<B256> = self
2010 .execution_timing_stats
2011 .iter()
2012 .filter(|(_, stats)| stats.block_number <= below_number)
2013 .map(|(k, _)| *k)
2014 .collect();
2015
2016 for key in keys_to_remove {
2017 let stats = self.execution_timing_stats.remove(&key).expect("key just found");
2018 if check_slow {
2019 let commit_dur = commit_duration.expect("checked above");
2020 let total_duration =
2022 stats.execution_duration + stats.state_hash_duration + commit_dur;
2023
2024 if total_duration > threshold.expect("checked above") {
2025 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2026 stats,
2027 commit_duration: Some(commit_dur),
2028 total_duration,
2029 }));
2030 }
2031 }
2032 }
2033 }
2034
2035 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
2037 let event = event.into();
2038
2039 if event.is_backfill_action() {
2040 debug_assert_eq!(
2041 self.backfill_sync_state,
2042 BackfillSyncState::Idle,
2043 "backfill action should only be emitted when backfill is idle"
2044 );
2045
2046 if self.persistence_state.in_progress() {
2047 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
2050 return
2051 }
2052
2053 self.backfill_sync_state = BackfillSyncState::Pending;
2054 self.metrics.engine.pipeline_runs.increment(1);
2055 debug!(target: "engine::tree", "emitting backfill action event");
2056 }
2057
2058 let _ = self.outgoing.send(event).inspect_err(
2059 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2060 );
2061 }
2062
2063 pub const fn should_persist(&self) -> bool {
2067 if self.building_payload {
2068 return false
2069 }
2070
2071 if !self.backfill_sync_state.is_idle() {
2072 return false
2074 }
2075
2076 let min_block = self.persistence_state.last_persisted_block.number;
2077 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2078 self.config.persistence_threshold()
2079 }
2080
2081 fn get_canonical_blocks_to_persist(
2084 &self,
2085 target: PersistTarget,
2086 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2087 debug_assert!(!self.persistence_state.in_progress());
2090
2091 let mut blocks_to_persist = Vec::new();
2092 let mut current_hash = self.state.tree_state.canonical_block_hash();
2093 let last_persisted_number = self.persistence_state.last_persisted_block.number;
2094 let canonical_head_number = self.state.tree_state.canonical_block_number();
2095
2096 let target_number = match target {
2097 PersistTarget::Head => canonical_head_number,
2098 PersistTarget::Threshold => {
2099 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2100 }
2101 };
2102
2103 debug!(
2104 target: "engine::tree",
2105 ?current_hash,
2106 ?last_persisted_number,
2107 ?canonical_head_number,
2108 ?target_number,
2109 "Returning canonical blocks to persist"
2110 );
2111 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
2112 if block.recovered_block().number() <= last_persisted_number {
2113 break;
2114 }
2115
2116 if block.recovered_block().number() <= target_number {
2117 blocks_to_persist.push(block.clone());
2118 }
2119
2120 current_hash = block.recovered_block().parent_hash();
2121 }
2122
2123 blocks_to_persist.reverse();
2125
2126 Ok(blocks_to_persist)
2127 }
2128
2129 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2137 if let Some(remove_above) = self.find_disk_reorg()? {
2140 self.remove_blocks(remove_above);
2141 return Ok(())
2142 }
2143
2144 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2145 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2146 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2147 number: self.persistence_state.last_persisted_block.number,
2148 hash: self.persistence_state.last_persisted_block.hash,
2149 });
2150 Ok(())
2151 }
2152
2153 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2160 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2161 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2162 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2164 return Ok(block.clone())
2165 }
2166
2167 let (block, senders) = self
2168 .provider
2169 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2170 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2171 .split_sealed();
2172 let mut execution_output = self
2173 .provider
2174 .get_state(block.header().number())?
2175 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2176 let hashed_state = self.provider.hashed_post_state(execution_output.state());
2177
2178 debug!(
2179 target: "engine::tree",
2180 number = ?block.number(),
2181 "computing block trie updates",
2182 );
2183 let db_provider = self.provider.database_provider_ro()?;
2184 let trie_updates = reth_trie_db::compute_block_trie_updates(
2185 &self.changeset_cache,
2186 &db_provider,
2187 block.number(),
2188 )?;
2189
2190 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2191 let sorted_trie_updates = Arc::new(trie_updates);
2192 let trie_data =
2194 ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
2195
2196 let execution_output = Arc::new(BlockExecutionOutput {
2197 state: execution_output.bundle,
2198 result: BlockExecutionResult {
2199 receipts: execution_output.receipts.pop().unwrap_or_default(),
2200 requests: execution_output.requests.pop().unwrap_or_default(),
2201 gas_used: block.gas_used(),
2202 blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2203 },
2204 });
2205
2206 Ok(ExecutedBlock::new(
2207 Arc::new(RecoveredBlock::new_sealed(block, senders)),
2208 execution_output,
2209 trie_data,
2210 ))
2211 }
2212
2213 fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2217 if self.state.tree_state.contains_hash(&hash) {
2218 Ok(true)
2219 } else {
2220 self.provider.is_known(hash)
2221 }
2222 }
2223
2224 fn sealed_header_by_hash(
2226 &self,
2227 hash: B256,
2228 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2229 let header = self.state.tree_state.sealed_header_by_hash(&hash);
2231
2232 if header.is_some() {
2233 Ok(header)
2234 } else {
2235 self.provider.sealed_header_by_hash(hash)
2236 }
2237 }
2238
2239 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2246 self.state
2247 .buffer
2248 .lowest_ancestor(&hash)
2249 .map(|block| block.parent_hash())
2250 .unwrap_or_else(|| hash)
2251 }
2252
2253 fn latest_valid_hash_for_invalid_payload(
2264 &mut self,
2265 parent_hash: B256,
2266 ) -> ProviderResult<Option<B256>> {
2267 if self.has_block_by_hash(parent_hash)? {
2269 return Ok(Some(parent_hash))
2270 }
2271
2272 let mut current_hash = parent_hash;
2275 let mut current_block = self.state.invalid_headers.get(¤t_hash);
2276 while let Some(block_with_parent) = current_block {
2277 current_hash = block_with_parent.parent;
2278 current_block = self.state.invalid_headers.get(¤t_hash);
2279
2280 if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2283 return Ok(Some(current_hash))
2284 }
2285 }
2286 Ok(None)
2287 }
2288
2289 fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2293 let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2294 Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2298 Some(_) => Some(parent_hash),
2299 None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2300 };
2301
2302 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2303 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2304 })
2305 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2306 }
2307
2308 fn is_sync_target_head(&self, block_hash: B256) -> bool {
2312 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2313 return target.head_block_hash == block_hash
2314 }
2315 false
2316 }
2317
2318 fn is_any_sync_target(&self, block_hash: B256) -> bool {
2322 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2323 return target.contains(block_hash)
2324 }
2325 false
2326 }
2327
2328 fn check_invalid_ancestor_with_head(
2334 &mut self,
2335 check: B256,
2336 head: &SealedBlock<N::Block>,
2337 ) -> ProviderResult<Option<PayloadStatus>> {
2338 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2340
2341 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2342 }
2343
2344 fn on_invalid_new_payload(
2346 &mut self,
2347 head: SealedBlock<N::Block>,
2348 invalid: BlockWithParent,
2349 ) -> ProviderResult<PayloadStatus> {
2350 let status = self.prepare_invalid_response(invalid.parent)?;
2352
2353 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2355 self.emit_event(ConsensusEngineEvent::InvalidBlock {
2356 block: Box::new(head),
2357 error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2358 });
2359
2360 Ok(status)
2361 }
2362
2363 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2377 let parent_hash = payload.parent_hash();
2378 let block_hash = payload.block_hash();
2379
2380 if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2382 return Some(entry);
2383 }
2384
2385 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2386 if lowest_buffered_ancestor == block_hash {
2387 lowest_buffered_ancestor = parent_hash;
2388 }
2389
2390 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2392 }
2393
2394 fn handle_invalid_ancestor_payload(
2403 &mut self,
2404 payload: T::ExecutionData,
2405 invalid: BlockWithParent,
2406 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2407 let parent_hash = payload.parent_hash();
2408 let num_hash = payload.num_hash();
2409
2410 let block = match self.payload_validator.convert_payload_to_block(payload) {
2416 Ok(block) => block,
2417 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2418 };
2419
2420 Ok(self.on_invalid_new_payload(block, invalid)?)
2421 }
2422
2423 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2426 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2428
2429 match self.prepare_invalid_response(header.parent) {
2431 Ok(status) => Ok(Some(status)),
2432 Err(err) => {
2433 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2434 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2436 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2437 })))
2438 }
2439 }
2440 }
2441
2442 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2445 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2446 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2447 return Err(e)
2448 }
2449
2450 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2451 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2452 return Err(e)
2453 }
2454
2455 Ok(())
2456 }
2457
2458 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2460 fn try_connect_buffered_blocks(
2461 &mut self,
2462 parent: BlockNumHash,
2463 ) -> Result<(), InsertBlockFatalError> {
2464 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2465
2466 if blocks.is_empty() {
2467 return Ok(())
2469 }
2470
2471 let now = Instant::now();
2472 let block_count = blocks.len();
2473 for child in blocks {
2474 let child_num_hash = child.num_hash();
2475 match self.insert_block(child) {
2476 Ok(res) => {
2477 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2478 if self.is_any_sync_target(child_num_hash.hash) &&
2479 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2480 {
2481 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2482 self.make_canonical(child_num_hash.hash)?;
2485 }
2486 }
2487 Err(err) => {
2488 if let InsertPayloadError::Block(err) = err {
2489 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2490 if let Err(fatal) = self.on_insert_block_error(err) {
2491 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2492 return Err(fatal)
2493 }
2494 }
2495 }
2496 }
2497 }
2498
2499 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2500 Ok(())
2501 }
2502
2503 fn buffer_block(
2505 &mut self,
2506 block: SealedBlock<N::Block>,
2507 ) -> Result<(), InsertBlockError<N::Block>> {
2508 if let Err(err) = self.validate_block(&block) {
2509 return Err(InsertBlockError::consensus_error(err, block))
2510 }
2511 self.state.buffer.insert_block(block);
2512 Ok(())
2513 }
2514
2515 #[inline]
2520 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2521 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2522 }
2523
2524 #[inline]
2527 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2528 if block > local_tip {
2529 Some(block - local_tip)
2530 } else {
2531 None
2532 }
2533 }
2534
2535 fn backfill_sync_target(
2542 &self,
2543 canonical_tip_num: u64,
2544 target_block_number: u64,
2545 downloaded_block: Option<BlockNumHash>,
2546 ) -> Option<B256> {
2547 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2548
2549 let exceeds_backfill_threshold =
2551 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2552 (Some(downloaded_block), Some(state))
2554 if downloaded_block.hash == state.finalized_block_hash =>
2555 {
2556 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2557 }
2558 _ => match sync_target_state
2559 .as_ref()
2560 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2561 {
2562 Some(buffered_finalized) => {
2563 self.exceeds_backfill_run_threshold(
2566 canonical_tip_num,
2567 buffered_finalized.number(),
2568 )
2569 }
2570 None => {
2571 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2573 }
2574 },
2575 };
2576
2577 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2579 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2581 Err(err) => {
2582 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2583 }
2584 Ok(None) => {
2585 if !state.finalized_block_hash.is_zero() {
2587 return Some(state.finalized_block_hash)
2590 }
2591
2592 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2605 return Some(state.head_block_hash)
2606 }
2607 Ok(Some(_)) => {
2608 }
2610 }
2611 }
2612
2613 None
2614 }
2615
2616 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2619 let mut canonical = self.state.tree_state.current_canonical_head;
2620 let mut persisted = self.persistence_state.last_persisted_block;
2621
2622 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2623 Ok(self
2624 .sealed_header_by_hash(num_hash.hash)?
2625 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2626 .parent_num_hash())
2627 };
2628
2629 while canonical.number > persisted.number {
2632 canonical = parent_num_hash(canonical)?;
2633 }
2634
2635 if canonical == persisted {
2637 return Ok(None);
2638 }
2639
2640 while persisted.number > canonical.number {
2646 persisted = parent_num_hash(persisted)?;
2647 }
2648
2649 debug_assert_eq!(persisted.number, canonical.number);
2650
2651 while persisted.hash != canonical.hash {
2653 canonical = parent_num_hash(canonical)?;
2654 persisted = parent_num_hash(persisted)?;
2655 }
2656
2657 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2658
2659 Ok(Some(persisted.number))
2660 }
2661
2662 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2666 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2667 let start = Instant::now();
2668
2669 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2671
2672 let tip = chain_update.tip().clone_sealed_header();
2673 let notification = chain_update.to_chain_notification();
2674
2675 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2677 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2678 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2679 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2680
2681 self.update_reorg_metrics(old.len(), old_first);
2682 self.reinsert_reorged_blocks(new.clone());
2683 self.reinsert_reorged_blocks(old.clone());
2684 }
2685
2686 self.canonical_in_memory_state.update_chain(chain_update);
2688 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2689
2690 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2692
2693 self.canonical_in_memory_state.notify_canon_state(notification);
2695
2696 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2698 Box::new(tip),
2699 start.elapsed(),
2700 ));
2701 }
2702
2703 fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2705 if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2706 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2707 first_reorged_block <= finalized.number
2708 {
2709 self.metrics.tree.reorgs.finalized.increment(1);
2710 } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2711 first_reorged_block <= safe.number
2712 {
2713 self.metrics.tree.reorgs.safe.increment(1);
2714 } else {
2715 self.metrics.tree.reorgs.head.increment(1);
2716 }
2717 } else {
2718 debug_unreachable!("Reorged chain doesn't have any blocks");
2719 }
2720 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2721 }
2722
2723 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2725 for block in new_chain {
2726 if self
2727 .state
2728 .tree_state
2729 .executed_block_by_hash(block.recovered_block().hash())
2730 .is_none()
2731 {
2732 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2733 self.state.tree_state.insert_executed(block);
2734 }
2735 }
2736 }
2737
2738 fn on_disconnected_downloaded_block(
2743 &self,
2744 downloaded_block: BlockNumHash,
2745 missing_parent: BlockNumHash,
2746 head: BlockNumHash,
2747 ) -> Option<TreeEvent> {
2748 if let Some(target) =
2750 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2751 {
2752 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2753 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2754 }
2755
2756 let request = if let Some(distance) =
2766 self.distance_from_local_tip(head.number, missing_parent.number)
2767 {
2768 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2769 DownloadRequest::BlockRange(missing_parent.hash, distance)
2770 } else {
2771 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2772 DownloadRequest::single_block(missing_parent.hash)
2775 };
2776
2777 Some(TreeEvent::Download(request))
2778 }
2779
2780 fn on_valid_downloaded_block(
2787 &mut self,
2788 block_num_hash: BlockNumHash,
2789 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2790 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2793 sync_target.contains(block_num_hash.hash)
2794 {
2795 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2796
2797 if sync_target.head_block_hash == block_num_hash.hash {
2798 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2800 sync_target_head: block_num_hash.hash,
2801 })))
2802 }
2803
2804 self.make_canonical(block_num_hash.hash)?;
2808 self.try_connect_buffered_blocks(block_num_hash)?;
2809
2810 if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2813 let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2814 trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2815 return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2816 }
2817
2818 return Ok(None)
2819 }
2820 trace!(target: "engine::tree", "appended downloaded block");
2821 self.try_connect_buffered_blocks(block_num_hash)?;
2822 Ok(None)
2823 }
2824
2825 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2831 fn on_downloaded_block(
2832 &mut self,
2833 block: SealedBlock<N::Block>,
2834 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2835 let block_num_hash = block.num_hash();
2836 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2837 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2838 return Ok(None)
2839 }
2840
2841 if !self.backfill_sync_state.is_idle() {
2842 return Ok(None)
2843 }
2844
2845 match self.insert_block(block) {
2847 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2848 return self.on_valid_downloaded_block(block_num_hash);
2849 }
2850 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2851 return Ok(self.on_disconnected_downloaded_block(
2854 block_num_hash,
2855 missing_ancestor,
2856 head,
2857 ))
2858 }
2859 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2860 trace!(target: "engine::tree", "downloaded block already executed");
2861 }
2862 Err(err) => {
2863 if let InsertPayloadError::Block(err) = err {
2864 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2865 if let Err(fatal) = self.on_insert_block_error(err) {
2866 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2867 return Err(fatal)
2868 }
2869 }
2870 }
2871 }
2872 Ok(None)
2873 }
2874
2875 fn insert_payload(
2884 &mut self,
2885 payload: T::ExecutionData,
2886 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2887 self.insert_block_or_payload(
2888 payload.block_with_parent(),
2889 payload,
2890 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2891 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2892 )
2893 }
2894
2895 fn insert_block(
2896 &mut self,
2897 block: SealedBlock<N::Block>,
2898 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2899 self.insert_block_or_payload(
2900 block.block_with_parent(),
2901 block,
2902 |validator, block, ctx| validator.validate_block(block, ctx),
2903 |_, block| Ok(block),
2904 )
2905 }
2906
2907 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2924 fn insert_block_or_payload<Input, Err>(
2925 &mut self,
2926 block_id: BlockWithParent,
2927 input: Input,
2928 execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ValidationOutput<N>, Err>,
2929 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2930 ) -> Result<InsertPayloadOk, Err>
2931 where
2932 Err: From<InsertBlockError<N::Block>>,
2933 {
2934 let block_insert_start = Instant::now();
2935 let block_num_hash = block_id.block;
2936 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2937
2938 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2940 convert_to_block(self, input)?;
2941 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2942 }
2943
2944 if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2947 match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2948 Err(err) => {
2949 let block = convert_to_block(self, input)?;
2950 return Err(InsertBlockError::new(block, err.into()).into());
2951 }
2952 Ok(Some(_)) => {
2953 convert_to_block(self, input)?;
2954 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2955 }
2956 Ok(None) => {}
2957 }
2958 }
2959
2960 match self.state_provider_builder(block_id.parent) {
2962 Err(err) => {
2963 let block = convert_to_block(self, input)?;
2964 return Err(InsertBlockError::new(block, err.into()).into());
2965 }
2966 Ok(None) => {
2967 let block = convert_to_block(self, input)?;
2968
2969 let missing_ancestor = self
2972 .state
2973 .buffer
2974 .lowest_ancestor(&block.parent_hash())
2975 .map(|block| block.parent_num_hash())
2976 .unwrap_or_else(|| block.parent_num_hash());
2977
2978 self.state.buffer.insert_block(block);
2979
2980 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2981 head: self.state.tree_state.current_canonical_head,
2982 missing_ancestor,
2983 }))
2984 }
2985 Ok(Some(_)) => {}
2986 }
2987
2988 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2993
2994 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2995
2996 let start = Instant::now();
2997
2998 let ValidationOutput { executed_block: executed, execution_timing_stats: timing_stats } =
2999 execute(&mut self.payload_validator, input, ctx)?;
3000
3001 if let Some(stats) = timing_stats {
3004 if let Some(threshold) = self.config.slow_block_threshold() {
3005 let total_duration = stats.execution_duration + stats.state_hash_duration;
3006 if total_duration > threshold {
3007 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
3008 stats: stats.clone(),
3009 commit_duration: None,
3010 total_duration,
3011 }));
3012 }
3013 }
3014 self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
3015 }
3016
3017 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
3019 {
3020 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
3021 self.canonical_in_memory_state.set_pending_block(executed.clone());
3022 }
3023
3024 self.state.tree_state.insert_executed(executed.clone());
3025 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
3026
3027 let elapsed = start.elapsed();
3029 let engine_event = if is_fork {
3030 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
3031 } else {
3032 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
3033 };
3034 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
3035
3036 self.metrics
3037 .engine
3038 .block_insert_total_duration
3039 .record(block_insert_start.elapsed().as_secs_f64());
3040 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
3041 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
3042 }
3043
3044 fn on_insert_block_error(
3050 &mut self,
3051 error: InsertBlockError<N::Block>,
3052 ) -> Result<PayloadStatus, InsertBlockFatalError> {
3053 let (block, error) = error.split();
3054
3055 let validation_err = error.ensure_validation_error()?;
3058
3059 warn!(
3063 target: "engine::tree",
3064 invalid_hash=%block.hash(),
3065 invalid_number=block.number(),
3066 %validation_err,
3067 "Invalid block error on new payload",
3068 );
3069 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3070
3071 let is_transient = match &validation_err {
3073 InsertBlockValidationError::Consensus(err) => self.consensus.is_transient_error(err),
3074 _ => false,
3075 };
3076 if is_transient {
3077 warn!(
3078 target: "engine::tree",
3079 invalid_hash=%block.hash(),
3080 invalid_number=block.number(),
3081 %validation_err,
3082 "Skipping invalid header cache insert for transient validation error",
3083 );
3084 } else {
3085 self.state.invalid_headers.insert(block.block_with_parent());
3086 }
3087 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock {
3088 block: Box::new(block),
3089 error: validation_err.to_string(),
3090 }));
3091
3092 Ok(PayloadStatus::new(
3093 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3094 latest_valid_hash,
3095 ))
3096 }
3097
3098 fn on_new_payload_error(
3100 &mut self,
3101 error: NewPayloadError,
3102 payload_num_hash: NumHash,
3103 parent_hash: B256,
3104 ) -> ProviderResult<PayloadStatus> {
3105 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3106 let latest_valid_hash =
3109 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3110 None
3114 } else {
3115 self.latest_valid_hash_for_invalid_payload(parent_hash)?
3116 };
3117
3118 let status = PayloadStatusEnum::from(error);
3119 Ok(PayloadStatus::new(status, latest_valid_hash))
3120 }
3121
3122 pub fn find_canonical_header(
3124 &self,
3125 hash: B256,
3126 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3127 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3128
3129 if canonical.is_none() {
3130 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3131 }
3132
3133 Ok(canonical)
3134 }
3135
3136 fn update_finalized_block(
3138 &self,
3139 finalized_block_hash: B256,
3140 ) -> Result<(), OnForkChoiceUpdated> {
3141 if finalized_block_hash.is_zero() {
3142 return Ok(())
3143 }
3144
3145 match self.find_canonical_header(finalized_block_hash) {
3146 Ok(None) => {
3147 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3148 return Err(OnForkChoiceUpdated::invalid_state())
3150 }
3151 Ok(Some(finalized)) => {
3152 if Some(finalized.num_hash()) !=
3153 self.canonical_in_memory_state.get_finalized_num_hash()
3154 {
3155 let _ = self.persistence.save_finalized_block_number(finalized.number());
3158 self.canonical_in_memory_state.set_finalized(finalized.clone());
3159 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3161 }
3162 }
3163 Err(err) => {
3164 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3165 }
3166 }
3167
3168 Ok(())
3169 }
3170
3171 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3173 if safe_block_hash.is_zero() {
3174 return Ok(())
3175 }
3176
3177 match self.find_canonical_header(safe_block_hash) {
3178 Ok(None) => {
3179 debug!(target: "engine::tree", "Safe block not found in canonical chain");
3180 return Err(OnForkChoiceUpdated::invalid_state())
3182 }
3183 Ok(Some(safe)) => {
3184 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3185 let _ = self.persistence.save_safe_block_number(safe.number());
3188 self.canonical_in_memory_state.set_safe(safe.clone());
3189 self.metrics.tree.safe_block_height.set(safe.number() as f64);
3191 }
3192 }
3193 Err(err) => {
3194 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3195 }
3196 }
3197
3198 Ok(())
3199 }
3200
3201 fn ensure_consistent_forkchoice_state(
3210 &self,
3211 state: ForkchoiceState,
3212 ) -> Result<(), OnForkChoiceUpdated> {
3213 self.update_finalized_block(state.finalized_block_hash)?;
3219
3220 self.update_safe_block(state.safe_block_hash)
3226 }
3227
3228 fn process_payload_attributes(
3243 &self,
3244 attributes: T::PayloadAttributes,
3245 head: &N::BlockHeader,
3246 state: ForkchoiceState,
3247 ) -> OnForkChoiceUpdated {
3248 if let Err(err) =
3249 self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3250 {
3251 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3252 return OnForkChoiceUpdated::invalid_payload_attributes()
3253 }
3254
3255 let cache = if self.config.share_execution_cache_with_payload_builder() {
3261 self.payload_validator.cache_for(state.head_block_hash)
3262 } else {
3263 None
3264 };
3265
3266 let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3267 self.payload_validator.sparse_trie_handle_for(
3268 state.head_block_hash,
3269 head.state_root(),
3270 &self.state,
3271 )
3272 } else {
3273 None
3274 };
3275
3276 let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3279 parent_hash: state.head_block_hash,
3280 attributes,
3281 cache,
3282 trie_handle,
3283 });
3284
3285 OnForkChoiceUpdated::updated_with_pending_payload_id(
3297 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3298 pending_payload_id,
3299 )
3300 }
3301
3302 pub(crate) fn remove_before(
3309 &mut self,
3310 upper_bound: BlockNumHash,
3311 finalized_hash: Option<B256>,
3312 ) -> ProviderResult<()> {
3313 let num = if let Some(hash) = finalized_hash {
3316 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3317 } else {
3318 None
3319 };
3320
3321 self.state.tree_state.remove_until(
3322 upper_bound,
3323 self.persistence_state.last_persisted_block.hash,
3324 num,
3325 );
3326 Ok(())
3327 }
3328
3329 pub fn state_provider_builder(
3334 &self,
3335 hash: B256,
3336 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3337 where
3338 P: BlockReader + StateProviderFactory + StateReader + Clone,
3339 {
3340 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3341 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3342 return Ok(Some(StateProviderBuilder::new(
3344 self.provider.clone(),
3345 historical,
3346 Some(blocks),
3347 )))
3348 }
3349
3350 if let Some(header) = self.provider.header(hash)? {
3352 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3353 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3356 }
3357
3358 debug!(target: "engine::tree", %hash, "no canonical state found for block");
3359 Ok(None)
3360 }
3361}
3362
3363#[derive(Debug)]
3365enum LoopEvent<T, N>
3366where
3367 N: NodePrimitives,
3368 T: PayloadTypes,
3369{
3370 EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3372 PersistenceComplete {
3374 result: PersistenceResult,
3376 start_time: Instant,
3378 },
3379 Disconnected,
3381}
3382
3383#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3389pub enum BlockStatus {
3390 Valid,
3392 Disconnected {
3394 head: BlockNumHash,
3396 missing_ancestor: BlockNumHash,
3398 },
3399}
3400
3401#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3406pub enum InsertPayloadOk {
3407 AlreadySeen(BlockStatus),
3409 Inserted(BlockStatus),
3411}
3412
3413#[derive(Debug, Clone, Copy)]
3415enum PersistTarget {
3416 Threshold,
3418 Head,
3420}
3421
3422#[derive(Debug, Clone, Copy, Default)]
3424pub struct CacheWaitDurations {
3425 pub execution_cache: Duration,
3427 pub sparse_trie: Duration,
3429}
3430
3431pub trait WaitForCaches {
3436 fn wait_for_caches(&self) -> CacheWaitDurations;
3440}