1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::{map::B256Map, B256};
11use alloy_rpc_types_engine::{
12 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError, InsertBlockValidationError};
15use reth_chain_state::{
16 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, ExecutionTimingStats,
17 MemoryOverlayStateProvider, NewCanonicalChain, StateTrieOverlayManager,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22 ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, SlowBlockInfo,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::{BuildNewPayload, PayloadBuilderHandle};
27use reth_payload_primitives::{BuiltPayload, NewPayloadError, PayloadTypes};
28use reth_primitives_traits::{
29 FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
30};
31use reth_provider::{
32 BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33 DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34 StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35 StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::{spawn_os_thread, utils::increase_thread_priority, WorkerPool};
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{fmt::Debug, ops, sync::Arc, time::Duration};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48 oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53pub mod error;
54pub mod instrumented_state;
55mod invalid_headers;
56mod metrics;
57pub mod payload_processor;
58pub mod payload_validator;
59mod persistence_state;
60pub mod precompile_cache;
61#[cfg(test)]
62mod tests;
63mod trie_updates;
64pub mod types;
65
66use crate::{persistence::PersistenceResult, tree::error::AdvancePersistenceError};
67pub use block_buffer::BlockBuffer;
68pub use invalid_headers::InvalidHeaderCache;
69pub use metrics::EngineApiMetrics;
70pub use payload_processor::*;
71pub use payload_validator::{BasicEngineValidator, EngineValidator};
72pub use persistence_state::PersistenceState;
73pub use reth_engine_primitives::TreeConfig;
74pub use reth_execution_cache::{
75 CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider,
76 ExecutionCache, PayloadExecutionCache, SavedCache,
77};
78pub use types::{ValidationOutcome, ValidationOutput};
79
80pub mod state;
81
82pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
92
93const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
98
99#[derive(Clone, Debug)]
101pub struct StateProviderBuilder<N: NodePrimitives, P> {
102 provider_factory: P,
104 historical: B256,
106 overlay: Option<Vec<ExecutedBlock<N>>>,
108}
109
110impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
111 pub const fn new(
114 provider_factory: P,
115 historical: B256,
116 overlay: Option<Vec<ExecutedBlock<N>>>,
117 ) -> Self {
118 Self { provider_factory, historical, overlay }
119 }
120}
121
122impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
123where
124 P: BlockReader + StateProviderFactory + StateReader + Clone,
125{
126 pub fn build(&self) -> ProviderResult<StateProviderBox> {
128 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
129 if let Some(overlay) = self.overlay.clone() {
130 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
131 }
132 Ok(provider)
133 }
134}
135
136#[derive(Debug)]
140pub struct EngineApiTreeState<N: NodePrimitives> {
141 tree_state: TreeState<N>,
143 forkchoice_state_tracker: ForkchoiceStateTracker,
145 buffer: BlockBuffer<N::Block>,
147 invalid_headers: InvalidHeaderCache,
150}
151
152impl<N: NodePrimitives> EngineApiTreeState<N> {
153 fn new(
154 block_buffer_limit: u32,
155 max_invalid_header_cache_length: u32,
156 invalid_header_hit_eviction_threshold: u8,
157 canonical_block: BlockNumHash,
158 engine_kind: EngineApiKind,
159 state_trie_overlay_worker_pool: Arc<WorkerPool>,
160 ) -> Self {
161 Self {
162 invalid_headers: InvalidHeaderCache::new(
163 max_invalid_header_cache_length,
164 invalid_header_hit_eviction_threshold,
165 ),
166 buffer: BlockBuffer::new(block_buffer_limit),
167 tree_state: TreeState::new(
168 canonical_block,
169 engine_kind,
170 StateTrieOverlayManager::new(state_trie_overlay_worker_pool),
171 ),
172 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
173 }
174 }
175
176 pub const fn tree_state(&self) -> &TreeState<N> {
178 &self.tree_state
179 }
180
181 pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
183 self.invalid_headers.get(hash).is_some()
184 }
185}
186
187#[derive(Debug)]
189pub struct TreeOutcome<T> {
190 pub outcome: T,
192 pub event: Option<TreeEvent>,
194 pub already_seen: bool,
197}
198
199impl<T> TreeOutcome<T> {
200 pub const fn new(outcome: T) -> Self {
202 Self { outcome, event: None, already_seen: false }
203 }
204
205 pub fn with_event(mut self, event: TreeEvent) -> Self {
207 self.event = Some(event);
208 self
209 }
210
211 pub const fn with_already_seen(mut self, value: bool) -> Self {
213 self.already_seen = value;
214 self
215 }
216}
217
218#[derive(Debug)]
220pub struct TryInsertPayloadResult {
221 pub status: PayloadStatus,
225 pub already_seen: bool,
227}
228
229impl TryInsertPayloadResult {
230 #[inline]
232 pub fn into_outcome(self) -> TreeOutcome<PayloadStatus> {
233 TreeOutcome::new(self.status).with_already_seen(self.already_seen)
234 }
235}
236
237#[derive(Debug)]
239pub enum TreeEvent {
240 TreeAction(TreeAction),
242 BackfillAction(BackfillAction),
244 Download(DownloadRequest),
246}
247
248impl TreeEvent {
249 const fn is_backfill_action(&self) -> bool {
251 matches!(self, Self::BackfillAction(_))
252 }
253}
254
255#[derive(Debug)]
257pub enum TreeAction {
258 MakeCanonical {
260 sync_target_head: B256,
262 },
263}
264
265pub struct EngineApiTreeHandler<N, P, T, V, C>
270where
271 N: NodePrimitives,
272 T: PayloadTypes,
273 C: ConfigureEvm<Primitives = N> + 'static,
274{
275 provider: P,
276 consensus: Arc<dyn FullConsensus<N>>,
277 payload_validator: V,
278 state: EngineApiTreeState<N>,
280 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
289 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
291 outgoing: UnboundedSender<EngineApiEvent<N>>,
293 persistence: PersistenceHandle<N>,
295 persistence_state: PersistenceState,
297 backfill_sync_state: BackfillSyncState,
299 canonical_in_memory_state: CanonicalInMemoryState<N>,
302 payload_builder: PayloadBuilderHandle<T>,
305 config: TreeConfig,
307 metrics: EngineApiMetrics,
309 engine_kind: EngineApiKind,
311 evm_config: C,
313 changeset_cache: ChangesetCache,
315 execution_timing_stats: B256Map<Box<ExecutionTimingStats>>,
319 building_payload: bool,
322 runtime: reth_tasks::Runtime,
324}
325
326impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
327 for EngineApiTreeHandler<N, P, T, V, C>
328where
329 N: NodePrimitives,
330 C: Debug + ConfigureEvm<Primitives = N>,
331{
332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 f.debug_struct("EngineApiTreeHandler")
334 .field("provider", &self.provider)
335 .field("consensus", &self.consensus)
336 .field("payload_validator", &self.payload_validator)
337 .field("state", &self.state)
338 .field("incoming_tx", &self.incoming_tx)
339 .field("persistence", &self.persistence)
340 .field("persistence_state", &self.persistence_state)
341 .field("backfill_sync_state", &self.backfill_sync_state)
342 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
343 .field("payload_builder", &self.payload_builder)
344 .field("config", &self.config)
345 .field("metrics", &self.metrics)
346 .field("engine_kind", &self.engine_kind)
347 .field("evm_config", &self.evm_config)
348 .field("changeset_cache", &self.changeset_cache)
349 .field("execution_timing_stats", &self.execution_timing_stats.len())
350 .field("runtime", &self.runtime)
351 .finish()
352 }
353}
354
355impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
356where
357 N: NodePrimitives,
358 P: DatabaseProviderFactory
359 + BlockReader<Block = N::Block, Header = N::BlockHeader>
360 + StateProviderFactory
361 + StateReader<Receipt = N::Receipt>
362 + HashedPostStateProvider
363 + Clone
364 + 'static,
365 P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
366 + StageCheckpointReader
367 + ChangeSetReader
368 + StorageChangeSetReader
369 + StorageSettingsCache,
370 C: ConfigureEvm<Primitives = N> + 'static,
371 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
372 V: EngineValidator<T> + WaitForCaches,
373{
374 #[expect(clippy::too_many_arguments)]
376 pub fn new(
377 provider: P,
378 consensus: Arc<dyn FullConsensus<N>>,
379 payload_validator: V,
380 outgoing: UnboundedSender<EngineApiEvent<N>>,
381 state: EngineApiTreeState<N>,
382 canonical_in_memory_state: CanonicalInMemoryState<N>,
383 persistence: PersistenceHandle<N>,
384 persistence_state: PersistenceState,
385 payload_builder: PayloadBuilderHandle<T>,
386 config: TreeConfig,
387 engine_kind: EngineApiKind,
388 evm_config: C,
389 changeset_cache: ChangesetCache,
390 runtime: reth_tasks::Runtime,
391 ) -> Self {
392 let (incoming_tx, incoming) = crossbeam_channel::unbounded();
393
394 Self {
395 provider,
396 consensus,
397 payload_validator,
398 incoming,
399 outgoing,
400 persistence,
401 persistence_state,
402 backfill_sync_state: BackfillSyncState::Idle,
403 state,
404 canonical_in_memory_state,
405 payload_builder,
406 config,
407 metrics: Default::default(),
408 incoming_tx,
409 engine_kind,
410 evm_config,
411 changeset_cache,
412 execution_timing_stats: B256Map::default(),
413 building_payload: false,
414 runtime,
415 }
416 }
417
418 #[expect(clippy::complexity)]
424 pub fn spawn_new(
425 provider: P,
426 consensus: Arc<dyn FullConsensus<N>>,
427 payload_validator: V,
428 persistence: PersistenceHandle<N>,
429 payload_builder: PayloadBuilderHandle<T>,
430 canonical_in_memory_state: CanonicalInMemoryState<N>,
431 config: TreeConfig,
432 kind: EngineApiKind,
433 evm_config: C,
434 changeset_cache: ChangesetCache,
435 runtime: reth_tasks::Runtime,
436 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
437 {
438 let best_block_number = provider.best_block_number().unwrap_or(0);
439 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
440
441 let persistence_state = PersistenceState {
442 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
443 rx: None,
444 };
445
446 let (tx, outgoing) = unbounded_channel();
447 let state = EngineApiTreeState::new(
448 config.block_buffer_limit(),
449 config.max_invalid_header_cache_length(),
450 config.invalid_header_hit_eviction_threshold(),
451 header.num_hash(),
452 kind,
453 runtime.state_trie_overlay_worker_pool(),
454 );
455
456 let task = Self::new(
457 provider,
458 consensus,
459 payload_validator,
460 tx,
461 state,
462 canonical_in_memory_state,
463 persistence,
464 persistence_state,
465 payload_builder,
466 config,
467 kind,
468 evm_config,
469 changeset_cache,
470 runtime,
471 );
472 let incoming = task.incoming_tx.clone();
473 spawn_os_thread("engine", || {
474 increase_thread_priority();
475 task.run()
476 });
477 (incoming, outgoing)
478 }
479
480 fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
482 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
483 PayloadStatusEnum::Valid,
484 Some(state.head_block_hash),
485 )))
486 }
487
488 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
490 self.incoming_tx.clone()
491 }
492
493 const fn persistence_gap(&self) -> u64 {
496 self.state
497 .tree_state
498 .canonical_block_number()
499 .saturating_sub(self.persistence_state.last_persisted_block.number)
500 }
501
502 const fn should_backpressure(&self) -> bool {
507 self.persistence_state.in_progress() &&
508 self.persistence_gap() >= self.config.persistence_backpressure_threshold()
509 }
510
511 pub fn run(mut self) {
515 loop {
516 match self.try_poll_persistence() {
540 Ok(true) => {
541 if let Err(err) = self.advance_persistence() {
542 error!(target: "engine::tree", %err, "Advancing persistence failed");
543 return
544 }
545 continue;
546 }
547 Ok(false) => {}
548 Err(err) => {
549 error!(target: "engine::tree", %err, "Polling persistence failed");
550 return
551 }
552 }
553
554 let event = if self.should_backpressure() {
555 self.metrics.engine.backpressure_active.set(1.0);
556 let stall_start = Instant::now();
557 let event = self.wait_for_persistence_event();
558 self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
559 event
560 } else {
561 self.metrics.engine.backpressure_active.set(0.0);
562 self.wait_for_event()
563 };
564
565 match event {
566 LoopEvent::EngineMessage(msg) => {
567 debug!(target: "engine::tree", %msg, "received new engine message");
568 match self.on_engine_message(msg) {
569 Ok(ops::ControlFlow::Break(())) => return,
570 Ok(ops::ControlFlow::Continue(())) => {}
571 Err(fatal) => {
572 error!(target: "engine::tree", %fatal, "insert block fatal error");
573 return
574 }
575 }
576 }
577 LoopEvent::PersistenceComplete { result, start_time } => {
578 if let Err(err) = self.on_persistence_complete(result, start_time) {
579 error!(target: "engine::tree", %err, "Persistence complete handling failed");
580 return
581 }
582 }
583 LoopEvent::Disconnected => {
584 error!(target: "engine::tree", "Channel disconnected");
585 return
586 }
587 }
588
589 if let Err(err) = self.advance_persistence() {
594 error!(target: "engine::tree", %err, "Advancing persistence failed");
595 return
596 }
597 }
598 }
599
600 fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
606 let maybe_persistence = self.persistence_state.rx.take();
607
608 if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
609 match persistence_rx.recv() {
610 Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
611 Err(_) => LoopEvent::Disconnected,
612 }
613 } else {
614 self.wait_for_event()
615 }
616 }
617
618 fn wait_for_event(&mut self) -> LoopEvent<T, N> {
624 let maybe_persistence = self.persistence_state.rx.take();
626
627 if let Some((persistence_rx, start_time, action)) = maybe_persistence {
628 crossbeam_channel::select_biased! {
631 recv(persistence_rx) -> result => {
632 match result {
634 Ok(result) => LoopEvent::PersistenceComplete {
635 result,
636 start_time,
637 },
638 Err(_) => LoopEvent::Disconnected,
639 }
640 },
641 recv(self.incoming) -> msg => {
642 self.persistence_state.rx = Some((persistence_rx, start_time, action));
644 match msg {
645 Ok(m) => LoopEvent::EngineMessage(m),
646 Err(_) => LoopEvent::Disconnected,
647 }
648 },
649 }
650 } else {
651 match self.incoming.recv() {
653 Ok(m) => LoopEvent::EngineMessage(m),
654 Err(_) => LoopEvent::Disconnected,
655 }
656 }
657 }
658
659 fn on_downloaded(
665 &mut self,
666 mut blocks: Vec<SealedBlock<N::Block>>,
667 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
668 if blocks.is_empty() {
669 return Ok(None)
671 }
672
673 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
674 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
675 for block in blocks.drain(..batch) {
676 if let Some(event) = self.on_downloaded_block(block)? {
677 let needs_backfill = event.is_backfill_action();
678 self.on_tree_event(event)?;
679 if needs_backfill {
680 return Ok(None)
682 }
683 }
684 }
685
686 if !blocks.is_empty() {
688 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
689 }
690
691 Ok(None)
692 }
693
694 #[instrument(
709 level = "debug",
710 target = "engine::tree",
711 skip_all,
712 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
713 )]
714 fn on_new_payload(
715 &mut self,
716 payload: T::ExecutionData,
717 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
718 trace!(target: "engine::tree", "invoked new payload");
719
720 let start = Instant::now();
722
723 let num_hash = payload.num_hash();
750 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
751 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
752
753 let block_hash = num_hash.hash;
754
755 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
757 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
758 return Ok(TreeOutcome::new(status));
759 }
760
761 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
763
764 let mut outcome = if self.backfill_sync_state.is_idle() {
765 self.try_insert_payload(payload)?.into_outcome()
766 } else {
767 TreeOutcome::new(self.try_buffer_payload(payload)?)
768 };
769
770 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
772 if self.state.tree_state.canonical_block_hash() != block_hash {
774 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
775 sync_target_head: block_hash,
776 }));
777 }
778 }
779
780 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
782
783 Ok(outcome)
784 }
785
786 #[instrument(level = "debug", target = "engine::tree", skip_all)]
788 fn try_insert_payload(
789 &mut self,
790 payload: T::ExecutionData,
791 ) -> Result<TryInsertPayloadResult, InsertBlockFatalError> {
792 let block_hash = payload.block_hash();
793 let num_hash = payload.num_hash();
794 let parent_hash = payload.parent_hash();
795 let mut latest_valid_hash = None;
796
797 match self.insert_payload(payload) {
798 Ok(status) => {
799 let (status, already_seen) = match status {
800 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
801 latest_valid_hash = Some(block_hash);
802 self.try_connect_buffered_blocks(num_hash)?;
803 (PayloadStatusEnum::Valid, false)
804 }
805 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
806 latest_valid_hash = Some(block_hash);
807 (PayloadStatusEnum::Valid, true)
808 }
809 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) => {
810 (PayloadStatusEnum::Syncing, false)
811 }
812 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
813 (PayloadStatusEnum::Syncing, true)
815 }
816 };
817
818 Ok(TryInsertPayloadResult {
819 status: PayloadStatus::new(status, latest_valid_hash),
820 already_seen,
821 })
822 }
823 Err(error) => {
824 let status = match error {
825 InsertPayloadError::Block(error) => self.on_insert_block_error(error)?,
826 InsertPayloadError::Payload(error) => {
827 self.on_new_payload_error(error, num_hash, parent_hash)?
828 }
829 };
830
831 Ok(TryInsertPayloadResult { status, already_seen: false })
832 }
833 }
834 }
835
836 fn try_buffer_payload(
845 &mut self,
846 payload: T::ExecutionData,
847 ) -> Result<PayloadStatus, InsertBlockFatalError> {
848 let parent_hash = payload.parent_hash();
849 let num_hash = payload.num_hash();
850
851 match self.payload_validator.convert_payload_to_block(payload) {
852 Ok(block) => {
854 if let Err(error) = self.buffer_block(block) {
855 Ok(self.on_insert_block_error(error)?)
856 } else {
857 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
858 }
859 }
860 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
861 }
862 }
863
864 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
871 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
873 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
874 self.metrics.engine.executed_new_block_cache_miss.increment(1);
875 return Ok(None)
876 };
877
878 let new_head_number = new_head_block.recovered_block().number();
879 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
880
881 let mut new_chain = vec![new_head_block.clone()];
882 let mut current_hash = new_head_block.recovered_block().parent_hash();
883 let mut current_number = new_head_number - 1;
884
885 while current_number > current_canonical_number {
890 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
891 {
892 current_hash = block.recovered_block().parent_hash();
893 current_number -= 1;
894 new_chain.push(block);
895 } else {
896 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
897 return Ok(None)
900 }
901 }
902
903 if current_hash == self.state.tree_state.current_canonical_head.hash {
906 new_chain.reverse();
907
908 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
910 }
911
912 let mut old_chain = Vec::new();
914 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
915
916 while current_canonical_number > current_number {
919 let block = self.canonical_block_by_hash(old_hash)?;
920 old_hash = block.recovered_block().parent_hash();
921 old_chain.push(block);
922 current_canonical_number -= 1;
923 }
924
925 debug_assert_eq!(current_number, current_canonical_number);
927
928 while old_hash != current_hash {
931 let block = self.canonical_block_by_hash(old_hash)?;
932 old_hash = block.recovered_block().parent_hash();
933 old_chain.push(block);
934
935 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
936 {
937 current_hash = block.recovered_block().parent_hash();
938 new_chain.push(block);
939 } else {
940 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
942 return Ok(None)
943 }
944 }
945 new_chain.reverse();
946 old_chain.reverse();
947
948 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
949 }
950
951 fn update_latest_block_to_canonical_ancestor(
963 &mut self,
964 canonical_header: &SealedHeader<N::BlockHeader>,
965 ) -> ProviderResult<()> {
966 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
967 let current_head_number = self.state.tree_state.canonical_block_number();
968 let new_head_number = canonical_header.number();
969 let new_head_hash = canonical_header.hash();
970
971 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
973
974 if new_head_number < current_head_number {
976 debug!(
977 target: "engine::tree",
978 current_head = current_head_number,
979 new_head = new_head_number,
980 new_head_hash = ?new_head_hash,
981 "FCU unwind detected: reverting to canonical ancestor"
982 );
983
984 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
985 } else {
986 debug!(
987 target: "engine::tree",
988 previous_head = current_head_number,
989 new_head = new_head_number,
990 new_head_hash = ?new_head_hash,
991 "Advancing latest block to canonical ancestor"
992 );
993 self.handle_chain_advance_or_same_height(canonical_header)
994 }
995 }
996
997 fn handle_canonical_chain_unwind(
1000 &self,
1001 current_head_number: u64,
1002 canonical_header: &SealedHeader<N::BlockHeader>,
1003 ) -> ProviderResult<()> {
1004 let new_head_number = canonical_header.number();
1005 debug!(
1006 target: "engine::tree",
1007 from = current_head_number,
1008 to = new_head_number,
1009 "Handling unwind: collecting blocks to remove from in-memory state"
1010 );
1011
1012 let old_blocks =
1014 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
1015
1016 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
1018 }
1019
1020 fn collect_blocks_for_canonical_unwind(
1022 &self,
1023 new_head_number: u64,
1024 current_head_number: u64,
1025 ) -> Vec<ExecutedBlock<N>> {
1026 let mut old_blocks =
1027 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
1028
1029 for block_num in (new_head_number + 1)..=current_head_number {
1030 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
1031 let executed_block = block_state.block_ref().clone();
1032 old_blocks.push(executed_block);
1033 debug!(
1034 target: "engine::tree",
1035 block_number = block_num,
1036 "Collected block for removal from in-memory state"
1037 );
1038 }
1039 }
1040
1041 if old_blocks.is_empty() {
1042 debug!(
1043 target: "engine::tree",
1044 "No blocks found in memory to remove, will clear and reset state"
1045 );
1046 }
1047
1048 old_blocks
1049 }
1050
1051 fn apply_canonical_ancestor_via_reorg(
1053 &self,
1054 canonical_header: &SealedHeader<N::BlockHeader>,
1055 old_blocks: Vec<ExecutedBlock<N>>,
1056 ) -> ProviderResult<()> {
1057 let new_head_hash = canonical_header.hash();
1058 let new_head_number = canonical_header.number();
1059
1060 let executed_block = self.canonical_block_by_hash(new_head_hash)?;
1062 self.canonical_in_memory_state
1064 .update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
1065
1066 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1069
1070 debug!(
1071 target: "engine::tree",
1072 block_number = new_head_number,
1073 block_hash = ?new_head_hash,
1074 "Successfully loaded canonical ancestor into memory via reorg"
1075 );
1076
1077 Ok(())
1078 }
1079
1080 fn handle_chain_advance_or_same_height(
1082 &self,
1083 canonical_header: &SealedHeader<N::BlockHeader>,
1084 ) -> ProviderResult<()> {
1085 self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
1087
1088 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
1090
1091 Ok(())
1092 }
1093
1094 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
1096 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
1098 return Ok(());
1099 }
1100
1101 let executed_block = self.canonical_block_by_hash(block_hash)?;
1103 self.canonical_in_memory_state
1104 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
1105
1106 debug!(
1107 target: "engine::tree",
1108 block_number,
1109 block_hash = ?block_hash,
1110 "Added canonical block to in-memory state"
1111 );
1112
1113 Ok(())
1114 }
1115
1116 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
1125 fn on_forkchoice_updated(
1126 &mut self,
1127 state: ForkchoiceState,
1128 attrs: Option<T::PayloadAttributes>,
1129 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1130 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1131
1132 self.building_payload = attrs.is_some() && self.config.suppress_persistence_during_build();
1133
1134 self.record_forkchoice_metrics();
1136
1137 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1139 return Ok(TreeOutcome::new(early_result));
1140 }
1141
1142 if let Some(result) = self.handle_canonical_head(state, &attrs)? {
1144 return Ok(result);
1145 }
1146
1147 if let Some(result) = self.apply_chain_update(state, &attrs)? {
1150 return Ok(result);
1151 }
1152
1153 self.handle_missing_block(state)
1155 }
1156
1157 fn record_forkchoice_metrics(&self) {
1159 self.canonical_in_memory_state.on_forkchoice_update_received();
1160 }
1161
1162 fn validate_forkchoice_state(
1167 &mut self,
1168 state: ForkchoiceState,
1169 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1170 if state.head_block_hash.is_zero() {
1171 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1172 }
1173
1174 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1177 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1178 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1179 }
1180
1181 if !self.backfill_sync_state.is_idle() {
1182 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1185 return Ok(Some(OnForkChoiceUpdated::syncing()));
1186 }
1187
1188 Ok(None)
1189 }
1190
1191 fn handle_canonical_head(
1197 &self,
1198 state: ForkchoiceState,
1199 attrs: &Option<T::PayloadAttributes>, ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1201 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1216 return Ok(None);
1217 }
1218
1219 trace!(target: "engine::tree", "fcu head hash is already canonical");
1220
1221 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1223 return Ok(Some(TreeOutcome::new(outcome)));
1225 }
1226
1227 if let Some(attr) = attrs {
1229 let tip = self
1230 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1231 .ok_or_else(|| {
1232 ProviderError::HeaderNotFound(state.head_block_hash.into())
1235 })?;
1236 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1238 return Ok(Some(TreeOutcome::new(updated)));
1239 }
1240
1241 Ok(Some(Self::valid_outcome(state)))
1243 }
1244
1245 fn apply_chain_update(
1257 &mut self,
1258 state: ForkchoiceState,
1259 attrs: &Option<T::PayloadAttributes>,
1260 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1261 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1263 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1264
1265 if self.engine_kind.is_opstack() ||
1268 self.config.always_process_payload_attributes_on_canonical_head()
1269 {
1270 if self.config.unwind_canonical_header() {
1276 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1277 }
1278
1279 if let Some(attr) = attrs {
1280 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1281 let updated =
1283 self.process_payload_attributes(attr.clone(), &canonical_header, state);
1284 return Ok(Some(TreeOutcome::new(updated)));
1285 }
1286 }
1287
1288 return Ok(Some(Self::valid_outcome(state)));
1299 }
1300
1301 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1303 let tip = chain_update.tip().clone_sealed_header();
1304 self.on_canonical_chain_update(chain_update);
1305
1306 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1308 return Ok(Some(TreeOutcome::new(outcome)));
1310 }
1311
1312 if let Some(attr) = attrs {
1313 let updated = self.process_payload_attributes(attr.clone(), &tip, state);
1315 return Ok(Some(TreeOutcome::new(updated)));
1316 }
1317
1318 return Ok(Some(Self::valid_outcome(state)));
1319 }
1320
1321 Ok(None)
1322 }
1323
1324 fn handle_missing_block(
1329 &self,
1330 state: ForkchoiceState,
1331 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1332 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1339 !state.safe_block_hash.is_zero() &&
1341 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1342 {
1343 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1344 state.safe_block_hash
1345 } else {
1346 state.head_block_hash
1347 };
1348
1349 let target = self.lowest_buffered_ancestor_or(target);
1350 trace!(target: "engine::tree", %target, "downloading missing block");
1351
1352 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1353 PayloadStatusEnum::Syncing,
1354 )))
1355 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1356 }
1357
1358 fn remove_blocks(&mut self, new_tip_num: u64) {
1361 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1362 if new_tip_num < self.persistence_state.last_persisted_block.number {
1363 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1364 let (tx, rx) = crossbeam_channel::bounded(1);
1365 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1366 self.persistence_state.start_remove(new_tip_num, rx);
1367 }
1368 }
1369
1370 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1373 if blocks_to_persist.is_empty() {
1374 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1375 return
1376 }
1377
1378 let highest_num_hash = blocks_to_persist
1380 .iter()
1381 .max_by_key(|block| block.recovered_block().number())
1382 .map(|b| b.recovered_block().num_hash())
1383 .expect("Checked non-empty persisting blocks");
1384
1385 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1386 let (tx, rx) = crossbeam_channel::bounded(1);
1387 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1388
1389 self.persistence_state.start_save(highest_num_hash, rx);
1390 }
1391
1392 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1397 if !self.persistence_state.in_progress() {
1398 if let Some(new_tip_num) = self.find_disk_reorg()? {
1399 self.remove_blocks(new_tip_num)
1400 } else if self.should_persist() {
1401 let blocks_to_persist =
1402 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1403 self.persist_blocks(blocks_to_persist);
1404 }
1405 }
1406
1407 Ok(())
1408 }
1409
1410 fn finish_termination(
1415 &mut self,
1416 pending_termination: oneshot::Sender<()>,
1417 ) -> Result<(), AdvancePersistenceError> {
1418 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1419 let result = self.persist_until_complete();
1420 let _ = pending_termination.send(());
1421 result
1422 }
1423
1424 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1426 loop {
1427 if let Some((rx, start_time, action)) = self.persistence_state.rx.take() {
1429 debug!(target: "engine::tree", ?action, "waiting for in-flight persistence");
1430 let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1431 self.on_persistence_complete(result, start_time)?;
1432 }
1433
1434 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1435
1436 if blocks_to_persist.is_empty() {
1437 debug!(target: "engine::tree", "persistence complete, signaling termination");
1438 return Ok(())
1439 }
1440
1441 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1442 self.persist_blocks(blocks_to_persist);
1443 }
1444 }
1445
1446 fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1450 let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1451 return Ok(false);
1452 };
1453
1454 match rx.try_recv() {
1455 Ok(result) => {
1456 self.on_persistence_complete(result, start_time)?;
1457 Ok(true)
1458 }
1459 Err(crossbeam_channel::TryRecvError::Empty) => {
1460 self.persistence_state.rx = Some((rx, start_time, action));
1462 Ok(false)
1463 }
1464 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1465 Err(AdvancePersistenceError::ChannelClosed)
1466 }
1467 }
1468 }
1469
1470 fn on_persistence_complete(
1472 &mut self,
1473 result: PersistenceResult,
1474 start_time: Instant,
1475 ) -> Result<(), AdvancePersistenceError> {
1476 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1477
1478 let commit_duration = result.commit_duration;
1479 let Some(BlockNumHash {
1480 hash: last_persisted_block_hash,
1481 number: last_persisted_block_number,
1482 }) = result.last_block
1483 else {
1484 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1486 return Ok(())
1487 };
1488
1489 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1490 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1491
1492 let min_threshold =
1496 last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1497 let eviction_threshold =
1498 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1499 finalized.number.min(min_threshold)
1501 } else {
1502 min_threshold
1504 };
1505 debug!(
1506 target: "engine::tree",
1507 last_persisted = last_persisted_block_number,
1508 finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1509 eviction_threshold,
1510 "Evicting changesets below threshold"
1511 );
1512 self.changeset_cache.evict(eviction_threshold);
1513
1514 self.on_new_persisted_block()?;
1515
1516 self.purge_timing_stats(last_persisted_block_number, commit_duration);
1517
1518 Ok(())
1519 }
1520
1521 fn on_engine_message(
1525 &mut self,
1526 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1527 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1528 match msg {
1529 FromEngine::Event(event) => match event {
1530 FromOrchestrator::BackfillSyncStarted => {
1531 debug!(target: "engine::tree", "received backfill sync started event");
1532 self.backfill_sync_state = BackfillSyncState::Active;
1533 }
1534 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1535 self.on_backfill_sync_finished(ctrl)?;
1536 }
1537 FromOrchestrator::Terminate { tx } => {
1538 debug!(target: "engine::tree", "received terminate request");
1539 if let Err(err) = self.finish_termination(tx) {
1540 error!(target: "engine::tree", %err, "Termination failed");
1541 }
1542 return Ok(ops::ControlFlow::Break(()))
1543 }
1544 },
1545 FromEngine::Request(request) => {
1546 match request {
1547 EngineApiRequest::InsertExecutedBlock(payload) => {
1548 let block_num_hash = payload.recovered_block.num_hash();
1549 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1550 return Ok(ops::ControlFlow::Continue(()))
1552 }
1553
1554 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
1555 return Ok(ops::ControlFlow::Continue(()))
1557 }
1558
1559 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1560 let now = Instant::now();
1561
1562 let block = match self
1563 .payload_validator
1564 .on_inserted_executed_block(payload, &self.state)
1565 {
1566 Ok(block) => block,
1567 Err(err) => {
1568 warn!(target: "engine::tree", %err, block=?block_num_hash, "Failed to insert already executed block");
1569 return Ok(ops::ControlFlow::Continue(()))
1570 }
1571 };
1572
1573 if self.state.tree_state.canonical_block_hash() ==
1576 block.recovered_block().parent_hash()
1577 {
1578 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1579 self.canonical_in_memory_state.set_pending_block(block.clone());
1580 }
1581
1582 self.state.tree_state.insert_executed(block.clone());
1583 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1584 self.emit_event(EngineApiEvent::BeaconConsensus(
1585 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1586 ));
1587 }
1588 EngineApiRequest::Beacon(request) => {
1589 match request {
1590 BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
1591 let has_attrs = payload_attrs.is_some();
1592
1593 let start = Instant::now();
1594 let mut output = self.on_forkchoice_updated(state, payload_attrs);
1595
1596 if let Ok(res) = &mut output {
1597 self.state
1599 .forkchoice_state_tracker
1600 .set_latest(state, res.outcome.forkchoice_status());
1601
1602 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1604 state,
1605 res.outcome.forkchoice_status(),
1606 ));
1607
1608 self.on_maybe_tree_event(res.event.take())?;
1610 }
1611
1612 if let Err(ref err) = output {
1613 error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1614 }
1615
1616 self.metrics.engine.forkchoice_updated.update_response_metrics(
1617 start,
1618 &mut self.metrics.engine.new_payload.latest_finish_at,
1619 has_attrs,
1620 &output,
1621 );
1622
1623 if let Err(err) =
1624 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1625 {
1626 self.metrics
1627 .engine
1628 .failed_forkchoice_updated_response_deliveries
1629 .increment(1);
1630 warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1631 }
1632 }
1633 BeaconEngineMessage::NewPayload { payload, tx } => {
1634 let start = Instant::now();
1635 let gas_used = payload.gas_used();
1636 let num_hash = payload.num_hash();
1637 let mut output = self.on_new_payload(payload);
1638 self.metrics.engine.new_payload.update_response_metrics(
1639 start,
1640 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1641 &output,
1642 gas_used,
1643 );
1644
1645 let maybe_event =
1646 output.as_mut().ok().and_then(|out| out.event.take());
1647
1648 if let Err(err) =
1650 tx.send(output.map(|o| o.outcome).map_err(|e| {
1651 BeaconOnNewPayloadError::Internal(Box::new(e))
1652 }))
1653 {
1654 warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1655 self.metrics
1656 .engine
1657 .failed_new_payload_response_deliveries
1658 .increment(1);
1659 }
1660
1661 self.on_maybe_tree_event(maybe_event)?;
1663 }
1664 BeaconEngineMessage::RethNewPayload {
1665 payload,
1666 wait_for_persistence,
1667 wait_for_caches,
1668 tx,
1669 enqueued_at,
1670 } => {
1671 debug!(
1672 target: "engine::tree",
1673 wait_for_persistence,
1674 wait_for_caches,
1675 "Processing reth_newPayload"
1676 );
1677
1678 let backpressure_wait = enqueued_at.elapsed();
1679
1680 let explicit_persistence_wait = if wait_for_persistence {
1681 let pending_persistence = self.persistence_state.rx.take();
1682 if let Some((rx, start_time, _action)) = pending_persistence {
1683 let (persistence_tx, persistence_rx) =
1684 std::sync::mpsc::channel();
1685 self.runtime.spawn_blocking_named(
1686 "wait-persist",
1687 move || {
1688 let start = Instant::now();
1689 let result = rx
1690 .recv()
1691 .expect("persistence state channel closed");
1692 let _ = persistence_tx.send((
1693 result,
1694 start_time,
1695 start.elapsed(),
1696 ));
1697 },
1698 );
1699 let (result, start_time, wait_duration) = persistence_rx
1700 .recv()
1701 .expect("persistence result channel closed");
1702 let _ = self.on_persistence_complete(result, start_time);
1703 wait_duration
1704 } else {
1705 Duration::ZERO
1706 }
1707 } else {
1708 Duration::ZERO
1709 };
1710
1711 let cache_wait = wait_for_caches
1712 .then(|| self.payload_validator.wait_for_caches());
1713
1714 let start = Instant::now();
1715 let gas_used = payload.gas_used();
1716 let num_hash = payload.num_hash();
1717 let mut output = self.on_new_payload(payload);
1718 let latency = start.elapsed();
1719 self.metrics.engine.new_payload.update_response_metrics(
1720 start,
1721 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1722 &output,
1723 gas_used,
1724 );
1725
1726 let maybe_event =
1727 output.as_mut().ok().and_then(|out| out.event.take());
1728
1729 let timings = NewPayloadTimings {
1730 latency,
1731 persistence_wait: backpressure_wait + explicit_persistence_wait,
1732 execution_cache_wait: cache_wait
1733 .map(|wait| wait.execution_cache),
1734 sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
1735 };
1736 if let Err(err) =
1737 tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
1738 BeaconOnNewPayloadError::Internal(Box::new(e))
1739 }))
1740 {
1741 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1742 self.metrics
1743 .engine
1744 .failed_new_payload_response_deliveries
1745 .increment(1);
1746 }
1747
1748 self.on_maybe_tree_event(maybe_event)?;
1749 }
1750 }
1751 }
1752 }
1753 }
1754 FromEngine::DownloadedBlocks(blocks) => {
1755 if let Some(event) = self.on_downloaded(blocks)? {
1756 self.on_tree_event(event)?;
1757 }
1758 }
1759 }
1760 Ok(ops::ControlFlow::Continue(()))
1761 }
1762
1763 fn on_backfill_sync_finished(
1777 &mut self,
1778 ctrl: ControlFlow,
1779 ) -> Result<(), InsertBlockFatalError> {
1780 debug!(target: "engine::tree", "received backfill sync finished event");
1781 self.backfill_sync_state = BackfillSyncState::Idle;
1782
1783 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1785 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1786 self.state.invalid_headers.insert(**bad_block);
1788
1789 Some(*target)
1791 } else {
1792 ctrl.block_number()
1794 };
1795
1796 let Some(backfill_height) = backfill_height else { return Ok(()) };
1798
1799 let Some(backfill_num_hash) = self
1805 .provider
1806 .block_hash(backfill_height)?
1807 .map(|hash| BlockNumHash { hash, number: backfill_height })
1808 else {
1809 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1810 return Ok(())
1811 };
1812
1813 if ctrl.is_unwind() {
1814 self.state.tree_state.reset(backfill_num_hash)
1817 } else {
1818 self.state.tree_state.remove_until(
1819 backfill_num_hash,
1820 self.persistence_state.last_persisted_block.hash,
1821 Some(backfill_num_hash),
1822 );
1823 }
1824
1825 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1826 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1827
1828 self.state.buffer.remove_old_blocks(backfill_height);
1830 self.purge_timing_stats(backfill_height, None);
1831 self.canonical_in_memory_state.clear_state();
1834
1835 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1836 self.state.tree_state.set_canonical_head(new_head.num_hash());
1839 self.persistence_state.finish(new_head.hash(), new_head.number());
1840
1841 self.canonical_in_memory_state.set_canonical_head(new_head);
1843 }
1844
1845 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1848 else {
1849 return Ok(())
1850 };
1851 if !self.engine_kind.is_opstack() && sync_target_state.finalized_block_hash.is_zero() {
1852 return Ok(())
1854 }
1855 let target_hash = self.backfill_target_hash(sync_target_state);
1856 if target_hash.is_zero() {
1857 return Ok(())
1858 }
1859 let newest_target = self.state.buffer.block(&target_hash).map(|block| block.number());
1861
1862 if let Some(backfill_target) =
1868 ctrl.block_number().zip(newest_target).and_then(|(progress, target_number)| {
1869 self.backfill_sync_target(progress, target_number, None)
1872 })
1873 {
1874 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1876 backfill_target.into(),
1877 )));
1878 return Ok(())
1879 };
1880
1881 if let Some(lowest_buffered) =
1883 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1884 {
1885 let current_head_num = self.state.tree_state.current_canonical_head.number;
1886 let target_head_num = lowest_buffered.number();
1887
1888 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1889 {
1890 debug!(
1892 target: "engine::tree",
1893 %current_head_num,
1894 %target_head_num,
1895 %distance,
1896 "Backfill complete, downloading remaining blocks to reach FCU target"
1897 );
1898
1899 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1900 lowest_buffered.parent_hash(),
1901 distance,
1902 )));
1903 return Ok(());
1904 }
1905 } else {
1906 debug!(
1909 target: "engine::tree",
1910 head_hash = %sync_target_state.head_block_hash,
1911 "Backfill complete but head block not buffered, requesting download"
1912 );
1913 self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1914 sync_target_state.head_block_hash,
1915 )));
1916 return Ok(());
1917 }
1918
1919 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1921 }
1922
1923 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1927 if let Some(chain_update) = self.on_new_head(target)? {
1928 self.on_canonical_chain_update(chain_update);
1929 }
1930
1931 self.on_canonicalized_sync_target(target);
1932
1933 Ok(())
1934 }
1935
1936 fn on_canonicalized_sync_target(&mut self, target: B256) {
1938 let Some(sync_target_state) = self
1939 .state
1940 .forkchoice_state_tracker
1941 .sync_target_state()
1942 .filter(|state| state.head_block_hash == target)
1943 else {
1944 return;
1945 };
1946
1947 if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
1948 debug!(
1949 target: "engine::tree",
1950 head = %sync_target_state.head_block_hash,
1951 safe = %sync_target_state.safe_block_hash,
1952 finalized = %sync_target_state.finalized_block_hash,
1953 ?outcome,
1954 "Canonicalized sync target head before safe/finalized could be applied"
1955 );
1956 return;
1957 }
1958
1959 self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
1960 }
1961
1962 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1964 if let Some(event) = event {
1965 self.on_tree_event(event)?;
1966 }
1967
1968 Ok(())
1969 }
1970
1971 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1975 match event {
1976 TreeEvent::TreeAction(action) => match action {
1977 TreeAction::MakeCanonical { sync_target_head } => {
1978 self.make_canonical(sync_target_head)?;
1979 }
1980 },
1981 TreeEvent::BackfillAction(action) => {
1982 self.emit_event(EngineApiEvent::BackfillAction(action));
1983 }
1984 TreeEvent::Download(action) => {
1985 self.emit_event(EngineApiEvent::Download(action));
1986 }
1987 }
1988
1989 Ok(())
1990 }
1991
1992 fn purge_timing_stats(&mut self, below_number: u64, commit_duration: Option<Duration>) {
1999 let threshold = self.config.slow_block_threshold();
2000 let check_slow = commit_duration.is_some() && threshold.is_some();
2001
2002 let keys_to_remove: Vec<B256> = self
2004 .execution_timing_stats
2005 .iter()
2006 .filter(|(_, stats)| stats.block_number <= below_number)
2007 .map(|(k, _)| *k)
2008 .collect();
2009
2010 for key in keys_to_remove {
2011 let stats = self.execution_timing_stats.remove(&key).expect("key just found");
2012 if check_slow {
2013 let commit_dur = commit_duration.expect("checked above");
2014 let total_duration =
2016 stats.execution_duration + stats.state_hash_duration + commit_dur;
2017
2018 if total_duration > threshold.expect("checked above") {
2019 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2020 stats,
2021 commit_duration: Some(commit_dur),
2022 total_duration,
2023 }));
2024 }
2025 }
2026 }
2027 }
2028
2029 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
2031 let event = event.into();
2032
2033 if event.is_backfill_action() {
2034 debug_assert_eq!(
2035 self.backfill_sync_state,
2036 BackfillSyncState::Idle,
2037 "backfill action should only be emitted when backfill is idle"
2038 );
2039
2040 if self.persistence_state.in_progress() {
2041 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
2044 return
2045 }
2046
2047 self.backfill_sync_state = BackfillSyncState::Pending;
2048 self.metrics.engine.pipeline_runs.increment(1);
2049 debug!(target: "engine::tree", "emitting backfill action event");
2050 }
2051
2052 let _ = self.outgoing.send(event).inspect_err(
2053 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
2054 );
2055 }
2056
2057 pub const fn should_persist(&self) -> bool {
2061 if self.building_payload {
2062 return false
2063 }
2064
2065 if !self.backfill_sync_state.is_idle() {
2066 return false
2068 }
2069
2070 let min_block = self.persistence_state.last_persisted_block.number;
2071 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
2072 self.config.persistence_threshold()
2073 }
2074
2075 fn get_canonical_blocks_to_persist(
2078 &self,
2079 target: PersistTarget,
2080 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
2081 debug_assert!(!self.persistence_state.in_progress());
2084
2085 let mut blocks_to_persist = Vec::new();
2086 let mut current_hash = self.state.tree_state.canonical_block_hash();
2087 let last_persisted_number = self.persistence_state.last_persisted_block.number;
2088 let canonical_head_number = self.state.tree_state.canonical_block_number();
2089
2090 let target_number = match target {
2091 PersistTarget::Head => canonical_head_number,
2092 PersistTarget::Threshold => {
2093 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
2094 }
2095 };
2096
2097 debug!(
2098 target: "engine::tree",
2099 ?current_hash,
2100 ?last_persisted_number,
2101 ?canonical_head_number,
2102 ?target_number,
2103 "Returning canonical blocks to persist"
2104 );
2105 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
2106 if block.recovered_block().number() <= last_persisted_number {
2107 break;
2108 }
2109
2110 if block.recovered_block().number() <= target_number {
2111 blocks_to_persist.push(block.clone());
2112 }
2113
2114 current_hash = block.recovered_block().parent_hash();
2115 }
2116
2117 blocks_to_persist.reverse();
2119
2120 Ok(blocks_to_persist)
2121 }
2122
2123 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
2131 if let Some(remove_above) = self.find_disk_reorg()? {
2134 self.remove_blocks(remove_above);
2135 return Ok(())
2136 }
2137
2138 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
2139 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
2140 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
2141 number: self.persistence_state.last_persisted_block.number,
2142 hash: self.persistence_state.last_persisted_block.hash,
2143 });
2144 Ok(())
2145 }
2146
2147 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2154 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
2155 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
2156 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
2158 return Ok(block.clone())
2159 }
2160
2161 let (block, senders) = self
2162 .provider
2163 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
2164 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
2165 .split_sealed();
2166 let mut execution_output = self
2167 .provider
2168 .get_state(block.header().number())?
2169 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
2170 let hashed_state = self.provider.hashed_post_state(execution_output.state());
2171
2172 debug!(
2173 target: "engine::tree",
2174 number = ?block.number(),
2175 "computing block trie updates",
2176 );
2177 let db_provider = self.provider.database_provider_ro()?;
2178 let trie_updates = reth_trie_db::compute_block_trie_updates(
2179 &self.changeset_cache,
2180 &db_provider,
2181 block.number(),
2182 )?;
2183
2184 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
2185 let sorted_trie_updates = Arc::new(trie_updates);
2186 let trie_data = ComputedTrieData::new(sorted_hashed_state, sorted_trie_updates);
2187
2188 let execution_output = Arc::new(BlockExecutionOutput {
2189 state: execution_output.bundle,
2190 result: BlockExecutionResult {
2191 receipts: execution_output.receipts.pop().unwrap_or_default(),
2192 requests: execution_output.requests.pop().unwrap_or_default(),
2193 gas_used: block.gas_used(),
2194 blob_gas_used: block.blob_gas_used().unwrap_or_default(),
2195 },
2196 });
2197
2198 Ok(ExecutedBlock::new(
2199 Arc::new(RecoveredBlock::new_sealed(block, senders)),
2200 execution_output,
2201 trie_data,
2202 ))
2203 }
2204
2205 fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
2209 if self.state.tree_state.contains_hash(&hash) {
2210 Ok(true)
2211 } else {
2212 self.provider.is_known(hash)
2213 }
2214 }
2215
2216 fn sealed_header_by_hash(
2218 &self,
2219 hash: B256,
2220 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
2221 let header = self.state.tree_state.sealed_header_by_hash(&hash);
2223
2224 if header.is_some() {
2225 Ok(header)
2226 } else {
2227 self.provider.sealed_header_by_hash(hash)
2228 }
2229 }
2230
2231 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
2238 self.state
2239 .buffer
2240 .lowest_ancestor(&hash)
2241 .map(|block| block.parent_hash())
2242 .unwrap_or_else(|| hash)
2243 }
2244
2245 fn latest_valid_hash_for_invalid_payload(
2256 &mut self,
2257 parent_hash: B256,
2258 ) -> ProviderResult<Option<B256>> {
2259 if self.has_block_by_hash(parent_hash)? {
2261 return Ok(Some(parent_hash))
2262 }
2263
2264 let mut current_hash = parent_hash;
2267 let mut current_block = self.state.invalid_headers.get(¤t_hash);
2268 while let Some(block_with_parent) = current_block {
2269 current_hash = block_with_parent.parent;
2270 current_block = self.state.invalid_headers.get(¤t_hash);
2271
2272 if current_block.is_none() && self.has_block_by_hash(current_hash)? {
2275 return Ok(Some(current_hash))
2276 }
2277 }
2278 Ok(None)
2279 }
2280
2281 fn prepare_invalid_response(&mut self, parent_hash: B256) -> ProviderResult<PayloadStatus> {
2285 let valid_parent_hash = match self.sealed_header_by_hash(parent_hash)? {
2286 Some(parent) if !parent.difficulty().is_zero() => Some(B256::ZERO),
2290 Some(_) => Some(parent_hash),
2291 None => self.latest_valid_hash_for_invalid_payload(parent_hash)?,
2292 };
2293
2294 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2295 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2296 })
2297 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2298 }
2299
2300 fn is_sync_target_head(&self, block_hash: B256) -> bool {
2304 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2305 return target.head_block_hash == block_hash
2306 }
2307 false
2308 }
2309
2310 fn is_any_sync_target(&self, block_hash: B256) -> bool {
2314 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2315 return target.contains(block_hash)
2316 }
2317 false
2318 }
2319
2320 fn check_invalid_ancestor_with_head(
2326 &mut self,
2327 check: B256,
2328 head: &SealedBlock<N::Block>,
2329 ) -> ProviderResult<Option<PayloadStatus>> {
2330 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2332
2333 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2334 }
2335
2336 fn on_invalid_new_payload(
2338 &mut self,
2339 head: SealedBlock<N::Block>,
2340 invalid: BlockWithParent,
2341 ) -> ProviderResult<PayloadStatus> {
2342 let status = self.prepare_invalid_response(invalid.parent)?;
2344
2345 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2347 self.emit_event(ConsensusEngineEvent::InvalidBlock {
2348 block: Box::new(head),
2349 error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2350 });
2351
2352 Ok(status)
2353 }
2354
2355 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2369 let parent_hash = payload.parent_hash();
2370 let block_hash = payload.block_hash();
2371
2372 if let Some(entry) = self.state.invalid_headers.get(&block_hash) {
2374 return Some(entry);
2375 }
2376
2377 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2378 if lowest_buffered_ancestor == block_hash {
2379 lowest_buffered_ancestor = parent_hash;
2380 }
2381
2382 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2384 }
2385
2386 fn handle_invalid_ancestor_payload(
2395 &mut self,
2396 payload: T::ExecutionData,
2397 invalid: BlockWithParent,
2398 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2399 let parent_hash = payload.parent_hash();
2400 let num_hash = payload.num_hash();
2401
2402 let block = match self.payload_validator.convert_payload_to_block(payload) {
2408 Ok(block) => block,
2409 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2410 };
2411
2412 Ok(self.on_invalid_new_payload(block, invalid)?)
2413 }
2414
2415 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2418 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2420
2421 match self.prepare_invalid_response(header.parent) {
2423 Ok(status) => Ok(Some(status)),
2424 Err(err) => {
2425 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2426 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2428 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2429 })))
2430 }
2431 }
2432 }
2433
2434 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2437 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2438 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2439 return Err(e)
2440 }
2441
2442 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2443 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2444 return Err(e)
2445 }
2446
2447 Ok(())
2448 }
2449
2450 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2452 fn try_connect_buffered_blocks(
2453 &mut self,
2454 parent: BlockNumHash,
2455 ) -> Result<(), InsertBlockFatalError> {
2456 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2457
2458 if blocks.is_empty() {
2459 return Ok(())
2461 }
2462
2463 let now = Instant::now();
2464 let block_count = blocks.len();
2465 for child in blocks {
2466 let child_num_hash = child.num_hash();
2467 match self.insert_block(child) {
2468 Ok(res) => {
2469 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2470 if self.is_any_sync_target(child_num_hash.hash) &&
2471 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2472 {
2473 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2474 self.make_canonical(child_num_hash.hash)?;
2477 }
2478 }
2479 Err(err) => {
2480 if let InsertPayloadError::Block(err) = err {
2481 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2482 if let Err(fatal) = self.on_insert_block_error(err) {
2483 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2484 return Err(fatal)
2485 }
2486 }
2487 }
2488 }
2489 }
2490
2491 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2492 Ok(())
2493 }
2494
2495 fn buffer_block(
2497 &mut self,
2498 block: SealedBlock<N::Block>,
2499 ) -> Result<(), InsertBlockError<N::Block>> {
2500 if let Err(err) = self.validate_block(&block) {
2501 return Err(InsertBlockError::consensus_error(err, block))
2502 }
2503 self.state.buffer.insert_block(block);
2504 Ok(())
2505 }
2506
2507 #[inline]
2512 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2513 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2514 }
2515
2516 #[inline]
2519 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2520 if block > local_tip {
2521 Some(block - local_tip)
2522 } else {
2523 None
2524 }
2525 }
2526
2527 const fn backfill_target_hash(&self, state: ForkchoiceState) -> B256 {
2535 if self.engine_kind.is_opstack() {
2536 state.head_block_hash
2537 } else {
2538 state.finalized_block_hash
2539 }
2540 }
2541
2542 fn backfill_sync_target(
2549 &self,
2550 canonical_tip_num: u64,
2551 target_block_number: u64,
2552 downloaded_block: Option<BlockNumHash>,
2553 ) -> Option<B256> {
2554 let state = self.state.forkchoice_state_tracker.sync_target_state()?;
2555 let target_hash = self.backfill_target_hash(state);
2556
2557 let exceeds_backfill_threshold = match downloaded_block.as_ref() {
2559 Some(downloaded_block) if downloaded_block.hash == target_hash => {
2561 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2562 }
2563 _ => match self.state.buffer.block(&target_hash) {
2564 Some(buffered_target) => {
2566 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_target.number())
2567 }
2568 None => self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number),
2570 },
2571 };
2572
2573 if !exceeds_backfill_threshold {
2574 return None
2575 }
2576
2577 match self.provider.header_by_hash_or_number(target_hash.into()) {
2579 Err(err) => {
2580 warn!(target: "engine::tree", %err, "Failed to get backfill target block header");
2581 None
2582 }
2583 Ok(None) if !target_hash.is_zero() => Some(target_hash),
2585 Ok(None) => {
2586 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2599 Some(state.head_block_hash)
2600 }
2601 Ok(Some(_)) => None,
2603 }
2604 }
2605
2606 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2609 let mut canonical = self.state.tree_state.current_canonical_head;
2610 let mut persisted = self.persistence_state.last_persisted_block;
2611
2612 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2613 Ok(self
2614 .sealed_header_by_hash(num_hash.hash)?
2615 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2616 .parent_num_hash())
2617 };
2618
2619 while canonical.number > persisted.number {
2622 canonical = parent_num_hash(canonical)?;
2623 }
2624
2625 if canonical == persisted {
2627 return Ok(None);
2628 }
2629
2630 while persisted.number > canonical.number {
2636 persisted = parent_num_hash(persisted)?;
2637 }
2638
2639 debug_assert_eq!(persisted.number, canonical.number);
2640
2641 while persisted.hash != canonical.hash {
2643 canonical = parent_num_hash(canonical)?;
2644 persisted = parent_num_hash(persisted)?;
2645 }
2646
2647 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2648
2649 Ok(Some(persisted.number))
2650 }
2651
2652 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2656 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2657 let start = Instant::now();
2658
2659 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2661
2662 let tip = chain_update.tip().clone_sealed_header();
2663 let notification = chain_update.to_chain_notification();
2664
2665 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2667 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2668 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2669 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2670
2671 self.update_reorg_metrics(old.len(), old_first);
2672 self.reinsert_reorged_blocks(new.clone());
2673 self.reinsert_reorged_blocks(old.clone());
2674 }
2675
2676 self.canonical_in_memory_state.update_chain(chain_update);
2678 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2679
2680 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2682
2683 self.canonical_in_memory_state.notify_canon_state(notification);
2685
2686 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2688 Box::new(tip),
2689 start.elapsed(),
2690 ));
2691 }
2692
2693 fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2695 if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2696 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2697 first_reorged_block <= finalized.number
2698 {
2699 self.metrics.tree.reorgs.finalized.increment(1);
2700 } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2701 first_reorged_block <= safe.number
2702 {
2703 self.metrics.tree.reorgs.safe.increment(1);
2704 } else {
2705 self.metrics.tree.reorgs.head.increment(1);
2706 }
2707 } else {
2708 debug_unreachable!("Reorged chain doesn't have any blocks");
2709 }
2710 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2711 }
2712
2713 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2715 for block in new_chain {
2716 if self
2717 .state
2718 .tree_state
2719 .executed_block_by_hash(block.recovered_block().hash())
2720 .is_none()
2721 {
2722 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2723 self.state.tree_state.insert_executed(block);
2724 }
2725 }
2726 }
2727
2728 fn on_disconnected_downloaded_block(
2733 &self,
2734 downloaded_block: BlockNumHash,
2735 missing_parent: BlockNumHash,
2736 head: BlockNumHash,
2737 ) -> Option<TreeEvent> {
2738 if let Some(target) =
2740 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2741 {
2742 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2743 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2744 }
2745
2746 let request = if let Some(distance) =
2756 self.distance_from_local_tip(head.number, missing_parent.number)
2757 {
2758 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2759 DownloadRequest::BlockRange(missing_parent.hash, distance)
2760 } else {
2761 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2762 DownloadRequest::single_block(missing_parent.hash)
2765 };
2766
2767 Some(TreeEvent::Download(request))
2768 }
2769
2770 fn on_valid_downloaded_block(
2777 &mut self,
2778 block_num_hash: BlockNumHash,
2779 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2780 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2783 sync_target.contains(block_num_hash.hash)
2784 {
2785 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2786
2787 if sync_target.head_block_hash == block_num_hash.hash {
2788 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2790 sync_target_head: block_num_hash.hash,
2791 })))
2792 }
2793
2794 self.make_canonical(block_num_hash.hash)?;
2798 self.try_connect_buffered_blocks(block_num_hash)?;
2799
2800 if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
2803 let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
2804 trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
2805 return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
2806 }
2807
2808 return Ok(None)
2809 }
2810 trace!(target: "engine::tree", "appended downloaded block");
2811 self.try_connect_buffered_blocks(block_num_hash)?;
2812 Ok(None)
2813 }
2814
2815 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2821 fn on_downloaded_block(
2822 &mut self,
2823 block: SealedBlock<N::Block>,
2824 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2825 let block_num_hash = block.num_hash();
2826 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2827 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2828 return Ok(None)
2829 }
2830
2831 if !self.backfill_sync_state.is_idle() {
2832 return Ok(None)
2833 }
2834
2835 match self.insert_block(block) {
2837 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2838 return self.on_valid_downloaded_block(block_num_hash);
2839 }
2840 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2841 return Ok(self.on_disconnected_downloaded_block(
2844 block_num_hash,
2845 missing_ancestor,
2846 head,
2847 ))
2848 }
2849 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2850 trace!(target: "engine::tree", "downloaded block already executed");
2851 }
2852 Err(err) => {
2853 if let InsertPayloadError::Block(err) = err {
2854 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2855 if let Err(fatal) = self.on_insert_block_error(err) {
2856 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2857 return Err(fatal)
2858 }
2859 }
2860 }
2861 }
2862 Ok(None)
2863 }
2864
2865 fn insert_payload(
2874 &mut self,
2875 payload: T::ExecutionData,
2876 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2877 self.insert_block_or_payload(
2878 payload.block_with_parent(),
2879 payload,
2880 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2881 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2882 )
2883 }
2884
2885 fn insert_block(
2886 &mut self,
2887 block: SealedBlock<N::Block>,
2888 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2889 self.insert_block_or_payload(
2890 block.block_with_parent(),
2891 block,
2892 |validator, block, ctx| validator.validate_block(block, ctx),
2893 |_, block| Ok(block),
2894 )
2895 }
2896
2897 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
2914 fn insert_block_or_payload<Input, Err>(
2915 &mut self,
2916 block_id: BlockWithParent,
2917 input: Input,
2918 execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ValidationOutput<N>, Err>,
2919 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2920 ) -> Result<InsertPayloadOk, Err>
2921 where
2922 Err: From<InsertBlockError<N::Block>>,
2923 {
2924 let block_insert_start = Instant::now();
2925 let block_num_hash = block_id.block;
2926 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2927
2928 if self.state.tree_state.contains_hash(&block_num_hash.hash) {
2930 convert_to_block(self, input)?;
2931 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2932 }
2933
2934 if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2937 match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2938 Err(err) => {
2939 let block = convert_to_block(self, input)?;
2940 return Err(InsertBlockError::new(block, err.into()).into());
2941 }
2942 Ok(Some(_)) => {
2943 convert_to_block(self, input)?;
2944 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2945 }
2946 Ok(None) => {}
2947 }
2948 }
2949
2950 match self.state_provider_builder(block_id.parent) {
2952 Err(err) => {
2953 let block = convert_to_block(self, input)?;
2954 return Err(InsertBlockError::new(block, err.into()).into());
2955 }
2956 Ok(None) => {
2957 let block = convert_to_block(self, input)?;
2958
2959 let missing_ancestor = self
2962 .state
2963 .buffer
2964 .lowest_ancestor(&block.parent_hash())
2965 .map(|block| block.parent_num_hash())
2966 .unwrap_or_else(|| block.parent_num_hash());
2967
2968 self.state.buffer.insert_block(block);
2969
2970 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2971 head: self.state.tree_state.current_canonical_head,
2972 missing_ancestor,
2973 }))
2974 }
2975 Ok(Some(_)) => {}
2976 }
2977
2978 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2983
2984 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2985
2986 let start = Instant::now();
2987
2988 let ValidationOutput { executed_block: executed, execution_timing_stats: timing_stats } =
2989 execute(&mut self.payload_validator, input, ctx)?;
2990
2991 if let Some(stats) = timing_stats {
2994 if let Some(threshold) = self.config.slow_block_threshold() {
2995 let total_duration = stats.execution_duration + stats.state_hash_duration;
2996 if total_duration > threshold {
2997 self.emit_event(ConsensusEngineEvent::SlowBlock(SlowBlockInfo {
2998 stats: stats.clone(),
2999 commit_duration: None,
3000 total_duration,
3001 }));
3002 }
3003 }
3004 self.execution_timing_stats.insert(executed.recovered_block().hash(), stats);
3005 }
3006
3007 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
3009 {
3010 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
3011 self.canonical_in_memory_state.set_pending_block(executed.clone());
3012 }
3013
3014 self.state.tree_state.insert_executed(executed.clone());
3015 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
3016
3017 let elapsed = start.elapsed();
3019 let engine_event = if is_fork {
3020 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
3021 } else {
3022 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
3023 };
3024 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
3025
3026 self.metrics
3027 .engine
3028 .block_insert_total_duration
3029 .record(block_insert_start.elapsed().as_secs_f64());
3030 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
3031 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
3032 }
3033
3034 fn on_insert_block_error(
3040 &mut self,
3041 error: InsertBlockError<N::Block>,
3042 ) -> Result<PayloadStatus, InsertBlockFatalError> {
3043 let (block, error) = error.split();
3044
3045 let validation_err = error.ensure_validation_error()?;
3048
3049 warn!(
3053 target: "engine::tree",
3054 invalid_hash=%block.hash(),
3055 invalid_number=block.number(),
3056 %validation_err,
3057 "Invalid block error on new payload",
3058 );
3059 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
3060
3061 let is_transient = match &validation_err {
3063 InsertBlockValidationError::Consensus(err) => self.consensus.is_transient_error(err),
3064 _ => false,
3065 };
3066 if is_transient {
3067 warn!(
3068 target: "engine::tree",
3069 invalid_hash=%block.hash(),
3070 invalid_number=block.number(),
3071 %validation_err,
3072 "Skipping invalid header cache insert for transient validation error",
3073 );
3074 } else {
3075 self.state.invalid_headers.insert(block.block_with_parent());
3076 }
3077 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock {
3078 block: Box::new(block),
3079 error: validation_err.to_string(),
3080 }));
3081
3082 Ok(PayloadStatus::new(
3083 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
3084 latest_valid_hash,
3085 ))
3086 }
3087
3088 fn on_new_payload_error(
3090 &mut self,
3091 error: NewPayloadError,
3092 payload_num_hash: NumHash,
3093 parent_hash: B256,
3094 ) -> ProviderResult<PayloadStatus> {
3095 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
3096 let latest_valid_hash =
3099 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
3100 None
3104 } else {
3105 self.latest_valid_hash_for_invalid_payload(parent_hash)?
3106 };
3107
3108 let status = PayloadStatusEnum::from(error);
3109 Ok(PayloadStatus::new(status, latest_valid_hash))
3110 }
3111
3112 pub fn find_canonical_header(
3114 &self,
3115 hash: B256,
3116 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
3117 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
3118
3119 if canonical.is_none() {
3120 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
3121 }
3122
3123 Ok(canonical)
3124 }
3125
3126 fn update_finalized_block(
3128 &self,
3129 finalized_block_hash: B256,
3130 ) -> Result<(), OnForkChoiceUpdated> {
3131 if finalized_block_hash.is_zero() {
3132 return Ok(())
3133 }
3134
3135 match self.find_canonical_header(finalized_block_hash) {
3136 Ok(None) => {
3137 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
3138 return Err(OnForkChoiceUpdated::invalid_state())
3140 }
3141 Ok(Some(finalized)) => {
3142 if Some(finalized.num_hash()) !=
3143 self.canonical_in_memory_state.get_finalized_num_hash()
3144 {
3145 let _ = self.persistence.save_finalized_block_number(finalized.number());
3148 self.canonical_in_memory_state.set_finalized(finalized.clone());
3149 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
3151 }
3152 }
3153 Err(err) => {
3154 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
3155 }
3156 }
3157
3158 Ok(())
3159 }
3160
3161 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
3163 if safe_block_hash.is_zero() {
3164 return Ok(())
3165 }
3166
3167 match self.find_canonical_header(safe_block_hash) {
3168 Ok(None) => {
3169 debug!(target: "engine::tree", "Safe block not found in canonical chain");
3170 return Err(OnForkChoiceUpdated::invalid_state())
3172 }
3173 Ok(Some(safe)) => {
3174 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
3175 let _ = self.persistence.save_safe_block_number(safe.number());
3178 self.canonical_in_memory_state.set_safe(safe.clone());
3179 self.metrics.tree.safe_block_height.set(safe.number() as f64);
3181 }
3182 }
3183 Err(err) => {
3184 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
3185 }
3186 }
3187
3188 Ok(())
3189 }
3190
3191 fn ensure_consistent_forkchoice_state(
3200 &self,
3201 state: ForkchoiceState,
3202 ) -> Result<(), OnForkChoiceUpdated> {
3203 self.update_finalized_block(state.finalized_block_hash)?;
3209
3210 self.update_safe_block(state.safe_block_hash)
3216 }
3217
3218 fn process_payload_attributes(
3233 &self,
3234 attributes: T::PayloadAttributes,
3235 head: &N::BlockHeader,
3236 state: ForkchoiceState,
3237 ) -> OnForkChoiceUpdated {
3238 if let Err(err) =
3239 self.payload_validator.validate_payload_attributes_against_header(&attributes, head)
3240 {
3241 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
3242 return OnForkChoiceUpdated::invalid_payload_attributes()
3243 }
3244
3245 let cache = if self.config.share_execution_cache_with_payload_builder() {
3251 self.payload_validator.cache_for(state.head_block_hash)
3252 } else {
3253 None
3254 };
3255
3256 let trie_handle = if self.config.share_sparse_trie_with_payload_builder() {
3257 self.payload_validator.sparse_trie_handle_for(
3258 state.head_block_hash,
3259 head.state_root(),
3260 &self.state,
3261 )
3262 } else {
3263 None
3264 };
3265
3266 let pending_payload_id = self.payload_builder.send_new_payload(BuildNewPayload {
3269 parent_hash: state.head_block_hash,
3270 attributes,
3271 cache,
3272 trie_handle,
3273 });
3274
3275 OnForkChoiceUpdated::updated_with_pending_payload_id(
3287 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
3288 pending_payload_id,
3289 )
3290 }
3291
3292 pub(crate) fn remove_before(
3299 &mut self,
3300 upper_bound: BlockNumHash,
3301 finalized_hash: Option<B256>,
3302 ) -> ProviderResult<()> {
3303 let num = if let Some(hash) = finalized_hash {
3306 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
3307 } else {
3308 None
3309 };
3310
3311 self.state.tree_state.remove_until(
3312 upper_bound,
3313 self.persistence_state.last_persisted_block.hash,
3314 num,
3315 );
3316 Ok(())
3317 }
3318
3319 pub fn state_provider_builder(
3324 &self,
3325 hash: B256,
3326 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
3327 where
3328 P: BlockReader + StateProviderFactory + StateReader + Clone,
3329 {
3330 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
3331 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
3332 return Ok(Some(StateProviderBuilder::new(
3334 self.provider.clone(),
3335 historical,
3336 Some(blocks),
3337 )))
3338 }
3339
3340 if let Some(header) = self.provider.header(hash)? {
3342 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
3343 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
3346 }
3347
3348 debug!(target: "engine::tree", %hash, "no canonical state found for block");
3349 Ok(None)
3350 }
3351}
3352
3353#[derive(Debug)]
3355enum LoopEvent<T, N>
3356where
3357 N: NodePrimitives,
3358 T: PayloadTypes,
3359{
3360 EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3362 PersistenceComplete {
3364 result: PersistenceResult,
3366 start_time: Instant,
3368 },
3369 Disconnected,
3371}
3372
3373#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3379pub enum BlockStatus {
3380 Valid,
3382 Disconnected {
3384 head: BlockNumHash,
3386 missing_ancestor: BlockNumHash,
3388 },
3389}
3390
3391#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3396pub enum InsertPayloadOk {
3397 AlreadySeen(BlockStatus),
3399 Inserted(BlockStatus),
3401}
3402
3403#[derive(Debug, Clone, Copy)]
3405enum PersistTarget {
3406 Threshold,
3408 Head,
3410}
3411
3412#[derive(Debug, Clone, Copy, Default)]
3414pub struct CacheWaitDurations {
3415 pub execution_cache: Duration,
3417 pub sparse_trie: Duration,
3419}
3420
3421pub trait WaitForCaches {
3426 fn wait_for_caches(&self) -> CacheWaitDurations;
3430}