1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_primitives::B256;
11use alloy_rpc_types_engine::{
12 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
13};
14use error::{InsertBlockError, InsertBlockFatalError};
15use reth_chain_state::{
16 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
17 NewCanonicalChain,
18};
19use reth_consensus::{Consensus, FullConsensus};
20use reth_engine_primitives::{
21 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
22 ForkchoiceStateTracker, OnForkChoiceUpdated,
23};
24use reth_errors::{ConsensusError, ProviderResult};
25use reth_evm::ConfigureEvm;
26use reth_payload_builder::PayloadBuilderHandle;
27use reth_payload_primitives::{
28 BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
29};
30use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
31use reth_provider::{
32 BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
33 DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
34 StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
35 StorageSettingsCache, TransactionVariant,
36};
37use reth_revm::database::StateProviderDatabase;
38use reth_stages_api::ControlFlow;
39use reth_tasks::spawn_os_thread;
40use reth_trie_db::ChangesetCache;
41use revm::interpreter::debug_unreachable;
42use state::TreeState;
43use std::{fmt::Debug, ops, sync::Arc, time::Instant};
44
45use crossbeam_channel::{Receiver, Sender};
46use tokio::sync::{
47 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
48 oneshot,
49};
50use tracing::*;
51
52mod block_buffer;
53mod cached_state;
54pub mod error;
55pub mod instrumented_state;
56mod invalid_headers;
57mod metrics;
58pub mod payload_processor;
59pub mod payload_validator;
60mod persistence_state;
61pub mod precompile_cache;
62#[cfg(test)]
63mod tests;
64mod trie_updates;
65
66use crate::tree::error::AdvancePersistenceError;
67pub use block_buffer::BlockBuffer;
68pub use cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache};
69pub use invalid_headers::InvalidHeaderCache;
70pub use metrics::EngineApiMetrics;
71pub use payload_processor::*;
72pub use payload_validator::{BasicEngineValidator, EngineValidator};
73pub use persistence_state::PersistenceState;
74pub use reth_engine_primitives::TreeConfig;
75
76pub mod state;
77
78pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
88
89const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
94
95#[derive(Clone, Debug)]
97pub struct StateProviderBuilder<N: NodePrimitives, P> {
98 provider_factory: P,
100 historical: B256,
102 overlay: Option<Vec<ExecutedBlock<N>>>,
104}
105
106impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
107 pub const fn new(
110 provider_factory: P,
111 historical: B256,
112 overlay: Option<Vec<ExecutedBlock<N>>>,
113 ) -> Self {
114 Self { provider_factory, historical, overlay }
115 }
116}
117
118impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
119where
120 P: BlockReader + StateProviderFactory + StateReader + Clone,
121{
122 pub fn build(&self) -> ProviderResult<StateProviderBox> {
124 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
125 if let Some(overlay) = self.overlay.clone() {
126 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
127 }
128 Ok(provider)
129 }
130}
131
132#[derive(Debug)]
136pub struct EngineApiTreeState<N: NodePrimitives> {
137 tree_state: TreeState<N>,
139 forkchoice_state_tracker: ForkchoiceStateTracker,
141 buffer: BlockBuffer<N::Block>,
143 invalid_headers: InvalidHeaderCache,
146}
147
148impl<N: NodePrimitives> EngineApiTreeState<N> {
149 fn new(
150 block_buffer_limit: u32,
151 max_invalid_header_cache_length: u32,
152 canonical_block: BlockNumHash,
153 engine_kind: EngineApiKind,
154 ) -> Self {
155 Self {
156 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
157 buffer: BlockBuffer::new(block_buffer_limit),
158 tree_state: TreeState::new(canonical_block, engine_kind),
159 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
160 }
161 }
162
163 pub const fn tree_state(&self) -> &TreeState<N> {
165 &self.tree_state
166 }
167
168 pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
170 self.invalid_headers.get(hash).is_some()
171 }
172}
173
174#[derive(Debug)]
176pub struct TreeOutcome<T> {
177 pub outcome: T,
179 pub event: Option<TreeEvent>,
181}
182
183impl<T> TreeOutcome<T> {
184 pub const fn new(outcome: T) -> Self {
186 Self { outcome, event: None }
187 }
188
189 pub fn with_event(mut self, event: TreeEvent) -> Self {
191 self.event = Some(event);
192 self
193 }
194}
195
196#[derive(Debug)]
198pub enum TreeEvent {
199 TreeAction(TreeAction),
201 BackfillAction(BackfillAction),
203 Download(DownloadRequest),
205}
206
207impl TreeEvent {
208 const fn is_backfill_action(&self) -> bool {
210 matches!(self, Self::BackfillAction(_))
211 }
212}
213
214#[derive(Debug)]
216pub enum TreeAction {
217 MakeCanonical {
219 sync_target_head: B256,
221 },
222}
223
224pub struct EngineApiTreeHandler<N, P, T, V, C>
229where
230 N: NodePrimitives,
231 T: PayloadTypes,
232 C: ConfigureEvm<Primitives = N> + 'static,
233{
234 provider: P,
235 consensus: Arc<dyn FullConsensus<N>>,
236 payload_validator: V,
237 state: EngineApiTreeState<N>,
239 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
248 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
250 outgoing: UnboundedSender<EngineApiEvent<N>>,
252 persistence: PersistenceHandle<N>,
254 persistence_state: PersistenceState,
256 backfill_sync_state: BackfillSyncState,
258 canonical_in_memory_state: CanonicalInMemoryState<N>,
261 payload_builder: PayloadBuilderHandle<T>,
264 config: TreeConfig,
266 metrics: EngineApiMetrics,
268 engine_kind: EngineApiKind,
270 evm_config: C,
272 changeset_cache: ChangesetCache,
274 use_hashed_state: bool,
277}
278
279impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
280 for EngineApiTreeHandler<N, P, T, V, C>
281where
282 N: NodePrimitives,
283 C: Debug + ConfigureEvm<Primitives = N>,
284{
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 f.debug_struct("EngineApiTreeHandler")
287 .field("provider", &self.provider)
288 .field("consensus", &self.consensus)
289 .field("payload_validator", &self.payload_validator)
290 .field("state", &self.state)
291 .field("incoming_tx", &self.incoming_tx)
292 .field("persistence", &self.persistence)
293 .field("persistence_state", &self.persistence_state)
294 .field("backfill_sync_state", &self.backfill_sync_state)
295 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
296 .field("payload_builder", &self.payload_builder)
297 .field("config", &self.config)
298 .field("metrics", &self.metrics)
299 .field("engine_kind", &self.engine_kind)
300 .field("evm_config", &self.evm_config)
301 .field("changeset_cache", &self.changeset_cache)
302 .field("use_hashed_state", &self.use_hashed_state)
303 .finish()
304 }
305}
306
307impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
308where
309 N: NodePrimitives,
310 P: DatabaseProviderFactory
311 + BlockReader<Block = N::Block, Header = N::BlockHeader>
312 + StateProviderFactory
313 + StateReader<Receipt = N::Receipt>
314 + HashedPostStateProvider
315 + Clone
316 + 'static,
317 P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
318 + StageCheckpointReader
319 + ChangeSetReader
320 + StorageChangeSetReader
321 + StorageSettingsCache,
322 C: ConfigureEvm<Primitives = N> + 'static,
323 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
324 V: EngineValidator<T>,
325{
326 #[expect(clippy::too_many_arguments)]
328 pub fn new(
329 provider: P,
330 consensus: Arc<dyn FullConsensus<N>>,
331 payload_validator: V,
332 outgoing: UnboundedSender<EngineApiEvent<N>>,
333 state: EngineApiTreeState<N>,
334 canonical_in_memory_state: CanonicalInMemoryState<N>,
335 persistence: PersistenceHandle<N>,
336 persistence_state: PersistenceState,
337 payload_builder: PayloadBuilderHandle<T>,
338 config: TreeConfig,
339 engine_kind: EngineApiKind,
340 evm_config: C,
341 changeset_cache: ChangesetCache,
342 use_hashed_state: bool,
343 ) -> Self {
344 let (incoming_tx, incoming) = crossbeam_channel::unbounded();
345
346 Self {
347 provider,
348 consensus,
349 payload_validator,
350 incoming,
351 outgoing,
352 persistence,
353 persistence_state,
354 backfill_sync_state: BackfillSyncState::Idle,
355 state,
356 canonical_in_memory_state,
357 payload_builder,
358 config,
359 metrics: Default::default(),
360 incoming_tx,
361 engine_kind,
362 evm_config,
363 changeset_cache,
364 use_hashed_state,
365 }
366 }
367
368 #[expect(clippy::complexity)]
374 pub fn spawn_new(
375 provider: P,
376 consensus: Arc<dyn FullConsensus<N>>,
377 payload_validator: V,
378 persistence: PersistenceHandle<N>,
379 payload_builder: PayloadBuilderHandle<T>,
380 canonical_in_memory_state: CanonicalInMemoryState<N>,
381 config: TreeConfig,
382 kind: EngineApiKind,
383 evm_config: C,
384 changeset_cache: ChangesetCache,
385 use_hashed_state: bool,
386 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
387 {
388 let best_block_number = provider.best_block_number().unwrap_or(0);
389 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
390
391 let persistence_state = PersistenceState {
392 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
393 rx: None,
394 };
395
396 let (tx, outgoing) = unbounded_channel();
397 let state = EngineApiTreeState::new(
398 config.block_buffer_limit(),
399 config.max_invalid_header_cache_length(),
400 header.num_hash(),
401 kind,
402 );
403
404 let task = Self::new(
405 provider,
406 consensus,
407 payload_validator,
408 tx,
409 state,
410 canonical_in_memory_state,
411 persistence,
412 persistence_state,
413 payload_builder,
414 config,
415 kind,
416 evm_config,
417 changeset_cache,
418 use_hashed_state,
419 );
420 let incoming = task.incoming_tx.clone();
421 spawn_os_thread("engine", || task.run());
422 (incoming, outgoing)
423 }
424
425 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
427 self.incoming_tx.clone()
428 }
429
430 pub fn run(mut self) {
434 loop {
435 match self.wait_for_event() {
436 LoopEvent::EngineMessage(msg) => {
437 debug!(target: "engine::tree", %msg, "received new engine message");
438 match self.on_engine_message(msg) {
439 Ok(ops::ControlFlow::Break(())) => return,
440 Ok(ops::ControlFlow::Continue(())) => {}
441 Err(fatal) => {
442 error!(target: "engine::tree", %fatal, "insert block fatal error");
443 return
444 }
445 }
446 }
447 LoopEvent::PersistenceComplete { result, start_time } => {
448 if let Err(err) = self.on_persistence_complete(result, start_time) {
449 error!(target: "engine::tree", %err, "Persistence complete handling failed");
450 return
451 }
452 }
453 LoopEvent::Disconnected => {
454 error!(target: "engine::tree", "Channel disconnected");
455 return
456 }
457 }
458
459 if let Err(err) = self.advance_persistence() {
464 error!(target: "engine::tree", %err, "Advancing persistence failed");
465 return
466 }
467 }
468 }
469
470 fn wait_for_event(&mut self) -> LoopEvent<T, N> {
476 let maybe_persistence = self.persistence_state.rx.take();
478
479 if let Some((persistence_rx, start_time, action)) = maybe_persistence {
480 crossbeam_channel::select_biased! {
483 recv(persistence_rx) -> result => {
484 match result {
486 Ok(value) => LoopEvent::PersistenceComplete {
487 result: value,
488 start_time,
489 },
490 Err(_) => LoopEvent::Disconnected,
491 }
492 },
493 recv(self.incoming) -> msg => {
494 self.persistence_state.rx = Some((persistence_rx, start_time, action));
496 match msg {
497 Ok(m) => LoopEvent::EngineMessage(m),
498 Err(_) => LoopEvent::Disconnected,
499 }
500 },
501 }
502 } else {
503 match self.incoming.recv() {
505 Ok(m) => LoopEvent::EngineMessage(m),
506 Err(_) => LoopEvent::Disconnected,
507 }
508 }
509 }
510
511 fn on_downloaded(
517 &mut self,
518 mut blocks: Vec<SealedBlock<N::Block>>,
519 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
520 if blocks.is_empty() {
521 return Ok(None)
523 }
524
525 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
526 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
527 for block in blocks.drain(..batch) {
528 if let Some(event) = self.on_downloaded_block(block)? {
529 let needs_backfill = event.is_backfill_action();
530 self.on_tree_event(event)?;
531 if needs_backfill {
532 return Ok(None)
534 }
535 }
536 }
537
538 if !blocks.is_empty() {
540 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
541 }
542
543 Ok(None)
544 }
545
546 #[instrument(
561 level = "debug",
562 target = "engine::tree",
563 skip_all,
564 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
565 )]
566 fn on_new_payload(
567 &mut self,
568 payload: T::ExecutionData,
569 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
570 trace!(target: "engine::tree", "invoked new payload");
571
572 let start = Instant::now();
574
575 let num_hash = payload.num_hash();
602 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
603 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
604
605 let block_hash = num_hash.hash;
606
607 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
609 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
610 return Ok(TreeOutcome::new(status));
611 }
612
613 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
615
616 let status = if self.backfill_sync_state.is_idle() {
617 self.try_insert_payload(payload)?
618 } else {
619 self.try_buffer_payload(payload)?
620 };
621
622 let mut outcome = TreeOutcome::new(status);
623 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
625 if self.state.tree_state.canonical_block_hash() != block_hash {
627 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
628 sync_target_head: block_hash,
629 }));
630 }
631 }
632
633 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
635
636 Ok(outcome)
637 }
638
639 #[instrument(level = "debug", target = "engine::tree", skip_all)]
646 fn try_insert_payload(
647 &mut self,
648 payload: T::ExecutionData,
649 ) -> Result<PayloadStatus, InsertBlockFatalError> {
650 let block_hash = payload.block_hash();
651 let num_hash = payload.num_hash();
652 let parent_hash = payload.parent_hash();
653 let mut latest_valid_hash = None;
654
655 match self.insert_payload(payload) {
656 Ok(status) => {
657 let status = match status {
658 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
659 latest_valid_hash = Some(block_hash);
660 self.try_connect_buffered_blocks(num_hash)?;
661 PayloadStatusEnum::Valid
662 }
663 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
664 latest_valid_hash = Some(block_hash);
665 PayloadStatusEnum::Valid
666 }
667 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
668 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
669 PayloadStatusEnum::Syncing
671 }
672 };
673
674 Ok(PayloadStatus::new(status, latest_valid_hash))
675 }
676 Err(error) => match error {
677 InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
678 InsertPayloadError::Payload(error) => {
679 Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
680 }
681 },
682 }
683 }
684
685 fn try_buffer_payload(
694 &mut self,
695 payload: T::ExecutionData,
696 ) -> Result<PayloadStatus, InsertBlockFatalError> {
697 let parent_hash = payload.parent_hash();
698 let num_hash = payload.num_hash();
699
700 match self.payload_validator.convert_payload_to_block(payload) {
701 Ok(block) => {
703 if let Err(error) = self.buffer_block(block) {
704 Ok(self.on_insert_block_error(error)?)
705 } else {
706 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
707 }
708 }
709 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
710 }
711 }
712
713 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
720 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
722 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
723 self.metrics.engine.executed_new_block_cache_miss.increment(1);
724 return Ok(None)
725 };
726
727 let new_head_number = new_head_block.recovered_block().number();
728 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
729
730 let mut new_chain = vec![new_head_block.clone()];
731 let mut current_hash = new_head_block.recovered_block().parent_hash();
732 let mut current_number = new_head_number - 1;
733
734 while current_number > current_canonical_number {
739 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
740 {
741 current_hash = block.recovered_block().parent_hash();
742 current_number -= 1;
743 new_chain.push(block);
744 } else {
745 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
746 return Ok(None)
749 }
750 }
751
752 if current_hash == self.state.tree_state.current_canonical_head.hash {
755 new_chain.reverse();
756
757 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
759 }
760
761 let mut old_chain = Vec::new();
763 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
764
765 while current_canonical_number > current_number {
768 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
769 old_hash = block.recovered_block().parent_hash();
770 old_chain.push(block);
771 current_canonical_number -= 1;
772 } else {
773 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
775 return Ok(None)
776 }
777 }
778
779 debug_assert_eq!(current_number, current_canonical_number);
781
782 while old_hash != current_hash {
785 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
786 old_hash = block.recovered_block().parent_hash();
787 old_chain.push(block);
788 } else {
789 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
791 return Ok(None)
792 }
793
794 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
795 {
796 current_hash = block.recovered_block().parent_hash();
797 new_chain.push(block);
798 } else {
799 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
801 return Ok(None)
802 }
803 }
804 new_chain.reverse();
805 old_chain.reverse();
806
807 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
808 }
809
810 fn update_latest_block_to_canonical_ancestor(
822 &mut self,
823 canonical_header: &SealedHeader<N::BlockHeader>,
824 ) -> ProviderResult<()> {
825 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
826 let current_head_number = self.state.tree_state.canonical_block_number();
827 let new_head_number = canonical_header.number();
828 let new_head_hash = canonical_header.hash();
829
830 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
832
833 if new_head_number < current_head_number {
835 debug!(
836 target: "engine::tree",
837 current_head = current_head_number,
838 new_head = new_head_number,
839 new_head_hash = ?new_head_hash,
840 "FCU unwind detected: reverting to canonical ancestor"
841 );
842
843 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
844 } else {
845 debug!(
846 target: "engine::tree",
847 previous_head = current_head_number,
848 new_head = new_head_number,
849 new_head_hash = ?new_head_hash,
850 "Advancing latest block to canonical ancestor"
851 );
852 self.handle_chain_advance_or_same_height(canonical_header)
853 }
854 }
855
856 fn handle_canonical_chain_unwind(
859 &self,
860 current_head_number: u64,
861 canonical_header: &SealedHeader<N::BlockHeader>,
862 ) -> ProviderResult<()> {
863 let new_head_number = canonical_header.number();
864 debug!(
865 target: "engine::tree",
866 from = current_head_number,
867 to = new_head_number,
868 "Handling unwind: collecting blocks to remove from in-memory state"
869 );
870
871 let old_blocks =
873 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
874
875 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
877 }
878
879 fn collect_blocks_for_canonical_unwind(
881 &self,
882 new_head_number: u64,
883 current_head_number: u64,
884 ) -> Vec<ExecutedBlock<N>> {
885 let mut old_blocks =
886 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
887
888 for block_num in (new_head_number + 1)..=current_head_number {
889 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
890 let executed_block = block_state.block_ref().clone();
891 old_blocks.push(executed_block);
892 debug!(
893 target: "engine::tree",
894 block_number = block_num,
895 "Collected block for removal from in-memory state"
896 );
897 }
898 }
899
900 if old_blocks.is_empty() {
901 debug!(
902 target: "engine::tree",
903 "No blocks found in memory to remove, will clear and reset state"
904 );
905 }
906
907 old_blocks
908 }
909
910 fn apply_canonical_ancestor_via_reorg(
912 &self,
913 canonical_header: &SealedHeader<N::BlockHeader>,
914 old_blocks: Vec<ExecutedBlock<N>>,
915 ) -> ProviderResult<()> {
916 let new_head_hash = canonical_header.hash();
917 let new_head_number = canonical_header.number();
918
919 match self.canonical_block_by_hash(new_head_hash)? {
921 Some(executed_block) => {
922 self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
924 new: vec![executed_block],
925 old: old_blocks,
926 });
927
928 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
931
932 debug!(
933 target: "engine::tree",
934 block_number = new_head_number,
935 block_hash = ?new_head_hash,
936 "Successfully loaded canonical ancestor into memory via reorg"
937 );
938 }
939 None => {
940 warn!(
942 target: "engine::tree",
943 block_hash = ?new_head_hash,
944 "Could not find canonical ancestor block, updating header only"
945 );
946 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
947 }
948 }
949
950 Ok(())
951 }
952
953 fn handle_chain_advance_or_same_height(
955 &self,
956 canonical_header: &SealedHeader<N::BlockHeader>,
957 ) -> ProviderResult<()> {
958 self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
960
961 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
963
964 Ok(())
965 }
966
967 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
969 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
971 return Ok(());
972 }
973
974 if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
976 self.canonical_in_memory_state
977 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
978
979 debug!(
980 target: "engine::tree",
981 block_number,
982 block_hash = ?block_hash,
983 "Added canonical block to in-memory state"
984 );
985 }
986
987 Ok(())
988 }
989
990 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
999 fn on_forkchoice_updated(
1000 &mut self,
1001 state: ForkchoiceState,
1002 attrs: Option<T::PayloadAttributes>,
1003 version: EngineApiMessageVersion,
1004 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1005 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1006
1007 self.record_forkchoice_metrics();
1009
1010 if let Some(early_result) = self.validate_forkchoice_state(state)? {
1012 return Ok(TreeOutcome::new(early_result));
1013 }
1014
1015 if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
1017 return Ok(result);
1018 }
1019
1020 if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1023 return Ok(result);
1024 }
1025
1026 self.handle_missing_block(state)
1028 }
1029
1030 fn record_forkchoice_metrics(&self) {
1032 self.canonical_in_memory_state.on_forkchoice_update_received();
1033 }
1034
1035 fn validate_forkchoice_state(
1040 &mut self,
1041 state: ForkchoiceState,
1042 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1043 if state.head_block_hash.is_zero() {
1044 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1045 }
1046
1047 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1050 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1051 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1052 }
1053
1054 if !self.backfill_sync_state.is_idle() {
1055 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1058 return Ok(Some(OnForkChoiceUpdated::syncing()));
1059 }
1060
1061 Ok(None)
1062 }
1063
1064 fn handle_canonical_head(
1070 &self,
1071 state: ForkchoiceState,
1072 attrs: &Option<T::PayloadAttributes>, version: EngineApiMessageVersion,
1074 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1075 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1090 return Ok(None);
1091 }
1092
1093 trace!(target: "engine::tree", "fcu head hash is already canonical");
1094
1095 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1097 return Ok(Some(TreeOutcome::new(outcome)));
1099 }
1100
1101 if let Some(attr) = attrs {
1103 let tip = self
1104 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1105 .ok_or_else(|| {
1106 ProviderError::HeaderNotFound(state.head_block_hash.into())
1109 })?;
1110 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1112 return Ok(Some(TreeOutcome::new(updated)));
1113 }
1114
1115 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1117 PayloadStatusEnum::Valid,
1118 Some(state.head_block_hash),
1119 )));
1120 Ok(Some(outcome))
1121 }
1122
1123 fn apply_chain_update(
1135 &mut self,
1136 state: ForkchoiceState,
1137 attrs: &Option<T::PayloadAttributes>,
1138 version: EngineApiMessageVersion,
1139 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1140 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1142 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1143
1144 if self.engine_kind.is_opstack() ||
1147 self.config.always_process_payload_attributes_on_canonical_head()
1148 {
1149 if self.config.unwind_canonical_header() {
1155 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1156 }
1157
1158 if let Some(attr) = attrs {
1159 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1160 let updated = self.process_payload_attributes(
1162 attr.clone(),
1163 &canonical_header,
1164 state,
1165 version,
1166 );
1167 return Ok(Some(TreeOutcome::new(updated)));
1168 }
1169 }
1170
1171 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1183 PayloadStatusEnum::Valid,
1184 Some(state.head_block_hash),
1185 )));
1186 return Ok(Some(outcome));
1187 }
1188
1189 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1191 let tip = chain_update.tip().clone_sealed_header();
1192 self.on_canonical_chain_update(chain_update);
1193
1194 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1196 return Ok(Some(TreeOutcome::new(outcome)));
1198 }
1199
1200 if let Some(attr) = attrs {
1201 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1203 return Ok(Some(TreeOutcome::new(updated)));
1204 }
1205
1206 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1207 PayloadStatusEnum::Valid,
1208 Some(state.head_block_hash),
1209 )));
1210 return Ok(Some(outcome));
1211 }
1212
1213 Ok(None)
1214 }
1215
1216 fn handle_missing_block(
1221 &self,
1222 state: ForkchoiceState,
1223 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1224 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1231 !state.safe_block_hash.is_zero() &&
1233 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1234 {
1235 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1236 state.safe_block_hash
1237 } else {
1238 state.head_block_hash
1239 };
1240
1241 let target = self.lowest_buffered_ancestor_or(target);
1242 trace!(target: "engine::tree", %target, "downloading missing block");
1243
1244 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1245 PayloadStatusEnum::Syncing,
1246 )))
1247 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1248 }
1249
1250 fn remove_blocks(&mut self, new_tip_num: u64) {
1253 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1254 if new_tip_num < self.persistence_state.last_persisted_block.number {
1255 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1256 let (tx, rx) = crossbeam_channel::bounded(1);
1257 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1258 self.persistence_state.start_remove(new_tip_num, rx);
1259 }
1260 }
1261
1262 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1265 if blocks_to_persist.is_empty() {
1266 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1267 return
1268 }
1269
1270 let highest_num_hash = blocks_to_persist
1272 .iter()
1273 .max_by_key(|block| block.recovered_block().number())
1274 .map(|b| b.recovered_block().num_hash())
1275 .expect("Checked non-empty persisting blocks");
1276
1277 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1278 let (tx, rx) = crossbeam_channel::bounded(1);
1279 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1280
1281 self.persistence_state.start_save(highest_num_hash, rx);
1282 }
1283
1284 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1289 if !self.persistence_state.in_progress() {
1290 if let Some(new_tip_num) = self.find_disk_reorg()? {
1291 self.remove_blocks(new_tip_num)
1292 } else if self.should_persist() {
1293 let blocks_to_persist =
1294 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1295 self.persist_blocks(blocks_to_persist);
1296 }
1297 }
1298
1299 Ok(())
1300 }
1301
1302 fn finish_termination(
1307 &mut self,
1308 pending_termination: oneshot::Sender<()>,
1309 ) -> Result<(), AdvancePersistenceError> {
1310 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1311 let result = self.persist_until_complete();
1312 let _ = pending_termination.send(());
1313 result
1314 }
1315
1316 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1318 loop {
1319 if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
1321 let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
1322 self.on_persistence_complete(result, start_time)?;
1323 }
1324
1325 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1326
1327 if blocks_to_persist.is_empty() {
1328 debug!(target: "engine::tree", "persistence complete, signaling termination");
1329 return Ok(())
1330 }
1331
1332 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1333 self.persist_blocks(blocks_to_persist);
1334 }
1335 }
1336
1337 #[cfg(test)]
1341 pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
1342 let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
1343 return Ok(false);
1344 };
1345
1346 match rx.try_recv() {
1347 Ok(result) => {
1348 self.on_persistence_complete(result, start_time)?;
1349 Ok(true)
1350 }
1351 Err(crossbeam_channel::TryRecvError::Empty) => {
1352 self.persistence_state.rx = Some((rx, start_time, action));
1354 Ok(false)
1355 }
1356 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1357 Err(AdvancePersistenceError::ChannelClosed)
1358 }
1359 }
1360 }
1361
1362 fn on_persistence_complete(
1364 &mut self,
1365 last_persisted_hash_num: Option<BlockNumHash>,
1366 start_time: Instant,
1367 ) -> Result<(), AdvancePersistenceError> {
1368 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1369
1370 let Some(BlockNumHash {
1371 hash: last_persisted_block_hash,
1372 number: last_persisted_block_number,
1373 }) = last_persisted_hash_num
1374 else {
1375 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1377 return Ok(())
1378 };
1379
1380 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1381 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1382
1383 let min_threshold =
1387 last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
1388 let eviction_threshold =
1389 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
1390 finalized.number.min(min_threshold)
1392 } else {
1393 min_threshold
1395 };
1396 debug!(
1397 target: "engine::tree",
1398 last_persisted = last_persisted_block_number,
1399 finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
1400 eviction_threshold,
1401 "Evicting changesets below threshold"
1402 );
1403 self.changeset_cache.evict(eviction_threshold);
1404
1405 self.state.tree_state.invalidate_cached_overlay();
1407
1408 self.on_new_persisted_block()?;
1409
1410 if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
1414 rayon::spawn(move || {
1415 let _ = overlay.get();
1416 });
1417 }
1418
1419 Ok(())
1420 }
1421
1422 fn on_engine_message(
1426 &mut self,
1427 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1428 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1429 match msg {
1430 FromEngine::Event(event) => match event {
1431 FromOrchestrator::BackfillSyncStarted => {
1432 debug!(target: "engine::tree", "received backfill sync started event");
1433 self.backfill_sync_state = BackfillSyncState::Active;
1434 }
1435 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1436 self.on_backfill_sync_finished(ctrl)?;
1437 }
1438 FromOrchestrator::Terminate { tx } => {
1439 debug!(target: "engine::tree", "received terminate request");
1440 if let Err(err) = self.finish_termination(tx) {
1441 error!(target: "engine::tree", %err, "Termination failed");
1442 }
1443 return Ok(ops::ControlFlow::Break(()))
1444 }
1445 },
1446 FromEngine::Request(request) => {
1447 match request {
1448 EngineApiRequest::InsertExecutedBlock(block) => {
1449 let block_num_hash = block.recovered_block().num_hash();
1450 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1451 return Ok(ops::ControlFlow::Continue(()))
1453 }
1454
1455 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1456 let now = Instant::now();
1457
1458 if self.state.tree_state.canonical_block_hash() ==
1461 block.recovered_block().parent_hash()
1462 {
1463 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1464 self.canonical_in_memory_state.set_pending_block(block.clone());
1465 }
1466
1467 self.state.tree_state.insert_executed(block.clone());
1468 self.payload_validator.on_inserted_executed_block(block.clone());
1469 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1470 self.emit_event(EngineApiEvent::BeaconConsensus(
1471 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1472 ));
1473 }
1474 EngineApiRequest::Beacon(request) => {
1475 match request {
1476 BeaconEngineMessage::ForkchoiceUpdated {
1477 state,
1478 payload_attrs,
1479 tx,
1480 version,
1481 } => {
1482 let has_attrs = payload_attrs.is_some();
1483
1484 let start = Instant::now();
1485 let mut output =
1486 self.on_forkchoice_updated(state, payload_attrs, version);
1487
1488 if let Ok(res) = &mut output {
1489 self.state
1491 .forkchoice_state_tracker
1492 .set_latest(state, res.outcome.forkchoice_status());
1493
1494 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1496 state,
1497 res.outcome.forkchoice_status(),
1498 ));
1499
1500 self.on_maybe_tree_event(res.event.take())?;
1502 }
1503
1504 if let Err(ref err) = output {
1505 error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
1506 }
1507
1508 self.metrics.engine.forkchoice_updated.update_response_metrics(
1509 start,
1510 &mut self.metrics.engine.new_payload.latest_finish_at,
1511 has_attrs,
1512 &output,
1513 );
1514
1515 if let Err(err) =
1516 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1517 {
1518 self.metrics
1519 .engine
1520 .failed_forkchoice_updated_response_deliveries
1521 .increment(1);
1522 warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
1523 }
1524 }
1525 BeaconEngineMessage::NewPayload { payload, tx } => {
1526 let start = Instant::now();
1527 let gas_used = payload.gas_used();
1528 let num_hash = payload.num_hash();
1529 let mut output = self.on_new_payload(payload);
1530 self.metrics.engine.new_payload.update_response_metrics(
1531 start,
1532 &mut self.metrics.engine.forkchoice_updated.latest_finish_at,
1533 &output,
1534 gas_used,
1535 );
1536
1537 let maybe_event =
1538 output.as_mut().ok().and_then(|out| out.event.take());
1539
1540 if let Err(err) =
1542 tx.send(output.map(|o| o.outcome).map_err(|e| {
1543 BeaconOnNewPayloadError::Internal(Box::new(e))
1544 }))
1545 {
1546 warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
1547 self.metrics
1548 .engine
1549 .failed_new_payload_response_deliveries
1550 .increment(1);
1551 }
1552
1553 self.on_maybe_tree_event(maybe_event)?;
1555 }
1556 }
1557 }
1558 }
1559 }
1560 FromEngine::DownloadedBlocks(blocks) => {
1561 if let Some(event) = self.on_downloaded(blocks)? {
1562 self.on_tree_event(event)?;
1563 }
1564 }
1565 }
1566 Ok(ops::ControlFlow::Continue(()))
1567 }
1568
1569 fn on_backfill_sync_finished(
1583 &mut self,
1584 ctrl: ControlFlow,
1585 ) -> Result<(), InsertBlockFatalError> {
1586 debug!(target: "engine::tree", "received backfill sync finished event");
1587 self.backfill_sync_state = BackfillSyncState::Idle;
1588
1589 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1591 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1592 self.state.invalid_headers.insert(**bad_block);
1594
1595 Some(*target)
1597 } else {
1598 ctrl.block_number()
1600 };
1601
1602 let Some(backfill_height) = backfill_height else { return Ok(()) };
1604
1605 let Some(backfill_num_hash) = self
1611 .provider
1612 .block_hash(backfill_height)?
1613 .map(|hash| BlockNumHash { hash, number: backfill_height })
1614 else {
1615 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1616 return Ok(())
1617 };
1618
1619 if ctrl.is_unwind() {
1620 self.state.tree_state.reset(backfill_num_hash)
1623 } else {
1624 self.state.tree_state.remove_until(
1625 backfill_num_hash,
1626 self.persistence_state.last_persisted_block.hash,
1627 Some(backfill_num_hash),
1628 );
1629 }
1630
1631 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1632 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1633
1634 self.state.buffer.remove_old_blocks(backfill_height);
1636 self.canonical_in_memory_state.clear_state();
1639
1640 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1641 self.state.tree_state.set_canonical_head(new_head.num_hash());
1644 self.persistence_state.finish(new_head.hash(), new_head.number());
1645
1646 self.canonical_in_memory_state.set_canonical_head(new_head);
1648 }
1649
1650 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1653 else {
1654 return Ok(())
1655 };
1656 if sync_target_state.finalized_block_hash.is_zero() {
1657 return Ok(())
1659 }
1660 let newest_finalized = self
1662 .state
1663 .buffer
1664 .block(&sync_target_state.finalized_block_hash)
1665 .map(|block| block.number());
1666
1667 if let Some(backfill_target) =
1673 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1674 self.backfill_sync_target(progress, finalized_number, None)
1677 })
1678 {
1679 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1681 backfill_target.into(),
1682 )));
1683 return Ok(())
1684 };
1685
1686 if let Some(lowest_buffered) =
1688 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1689 {
1690 let current_head_num = self.state.tree_state.current_canonical_head.number;
1691 let target_head_num = lowest_buffered.number();
1692
1693 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1694 {
1695 debug!(
1697 target: "engine::tree",
1698 %current_head_num,
1699 %target_head_num,
1700 %distance,
1701 "Backfill complete, downloading remaining blocks to reach FCU target"
1702 );
1703
1704 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1705 lowest_buffered.parent_hash(),
1706 distance,
1707 )));
1708 return Ok(());
1709 }
1710 } else {
1711 debug!(
1714 target: "engine::tree",
1715 head_hash = %sync_target_state.head_block_hash,
1716 "Backfill complete but head block not buffered, requesting download"
1717 );
1718 self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
1719 sync_target_state.head_block_hash,
1720 )));
1721 return Ok(());
1722 }
1723
1724 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1726 }
1727
1728 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1732 if let Some(chain_update) = self.on_new_head(target)? {
1733 self.on_canonical_chain_update(chain_update);
1734 }
1735
1736 Ok(())
1737 }
1738
1739 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1741 if let Some(event) = event {
1742 self.on_tree_event(event)?;
1743 }
1744
1745 Ok(())
1746 }
1747
1748 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1752 match event {
1753 TreeEvent::TreeAction(action) => match action {
1754 TreeAction::MakeCanonical { sync_target_head } => {
1755 self.make_canonical(sync_target_head)?;
1756 }
1757 },
1758 TreeEvent::BackfillAction(action) => {
1759 self.emit_event(EngineApiEvent::BackfillAction(action));
1760 }
1761 TreeEvent::Download(action) => {
1762 self.emit_event(EngineApiEvent::Download(action));
1763 }
1764 }
1765
1766 Ok(())
1767 }
1768
1769 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1771 let event = event.into();
1772
1773 if event.is_backfill_action() {
1774 debug_assert_eq!(
1775 self.backfill_sync_state,
1776 BackfillSyncState::Idle,
1777 "backfill action should only be emitted when backfill is idle"
1778 );
1779
1780 if self.persistence_state.in_progress() {
1781 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1784 return
1785 }
1786
1787 self.backfill_sync_state = BackfillSyncState::Pending;
1788 self.metrics.engine.pipeline_runs.increment(1);
1789 debug!(target: "engine::tree", "emitting backfill action event");
1790 }
1791
1792 let _ = self.outgoing.send(event).inspect_err(
1793 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1794 );
1795 }
1796
1797 pub const fn should_persist(&self) -> bool {
1801 if !self.backfill_sync_state.is_idle() {
1802 return false
1804 }
1805
1806 let min_block = self.persistence_state.last_persisted_block.number;
1807 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1808 self.config.persistence_threshold()
1809 }
1810
1811 fn get_canonical_blocks_to_persist(
1814 &self,
1815 target: PersistTarget,
1816 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1817 debug_assert!(!self.persistence_state.in_progress());
1820
1821 let mut blocks_to_persist = Vec::new();
1822 let mut current_hash = self.state.tree_state.canonical_block_hash();
1823 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1824 let canonical_head_number = self.state.tree_state.canonical_block_number();
1825
1826 let target_number = match target {
1827 PersistTarget::Head => canonical_head_number,
1828 PersistTarget::Threshold => {
1829 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1830 }
1831 };
1832
1833 debug!(
1834 target: "engine::tree",
1835 ?current_hash,
1836 ?last_persisted_number,
1837 ?canonical_head_number,
1838 ?target_number,
1839 "Returning canonical blocks to persist"
1840 );
1841 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1842 if block.recovered_block().number() <= last_persisted_number {
1843 break;
1844 }
1845
1846 if block.recovered_block().number() <= target_number {
1847 blocks_to_persist.push(block.clone());
1848 }
1849
1850 current_hash = block.recovered_block().parent_hash();
1851 }
1852
1853 blocks_to_persist.reverse();
1855
1856 Ok(blocks_to_persist)
1857 }
1858
1859 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1867 if let Some(remove_above) = self.find_disk_reorg()? {
1870 self.remove_blocks(remove_above);
1871 return Ok(())
1872 }
1873
1874 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1875 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1876 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1877 number: self.persistence_state.last_persisted_block.number,
1878 hash: self.persistence_state.last_persisted_block.hash,
1879 });
1880 Ok(())
1881 }
1882
1883 #[instrument(level = "debug", target = "engine::tree", skip(self))]
1890 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1891 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1892 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1894 return Ok(Some(block.clone()))
1895 }
1896
1897 let (block, senders) = self
1898 .provider
1899 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1900 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1901 .split_sealed();
1902 let mut execution_output = self
1903 .provider
1904 .get_state(block.header().number())?
1905 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1906 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1907
1908 debug!(
1909 target: "engine::tree",
1910 number = ?block.number(),
1911 "computing block trie updates",
1912 );
1913 let db_provider = self.provider.database_provider_ro()?;
1914 let trie_updates = reth_trie_db::compute_block_trie_updates(
1915 &self.changeset_cache,
1916 &db_provider,
1917 block.number(),
1918 )?;
1919
1920 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
1921 let sorted_trie_updates = Arc::new(trie_updates);
1922 let trie_data =
1924 ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
1925
1926 let execution_output = Arc::new(BlockExecutionOutput {
1927 state: execution_output.bundle,
1928 result: BlockExecutionResult {
1929 receipts: execution_output.receipts.pop().unwrap_or_default(),
1930 requests: execution_output.requests.pop().unwrap_or_default(),
1931 gas_used: block.gas_used(),
1932 blob_gas_used: block.blob_gas_used().unwrap_or_default(),
1933 },
1934 });
1935
1936 Ok(Some(ExecutedBlock::new(
1937 Arc::new(RecoveredBlock::new_sealed(block, senders)),
1938 execution_output,
1939 trie_data,
1940 )))
1941 }
1942
1943 fn sealed_header_by_hash(
1945 &self,
1946 hash: B256,
1947 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1948 let header = self.state.tree_state.sealed_header_by_hash(&hash);
1950
1951 if header.is_some() {
1952 Ok(header)
1953 } else {
1954 self.provider.sealed_header_by_hash(hash)
1955 }
1956 }
1957
1958 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1965 self.state
1966 .buffer
1967 .lowest_ancestor(&hash)
1968 .map(|block| block.parent_hash())
1969 .unwrap_or_else(|| hash)
1970 }
1971
1972 fn latest_valid_hash_for_invalid_payload(
1983 &mut self,
1984 parent_hash: B256,
1985 ) -> ProviderResult<Option<B256>> {
1986 if self.sealed_header_by_hash(parent_hash)?.is_some() {
1988 return Ok(Some(parent_hash))
1989 }
1990
1991 let mut current_hash = parent_hash;
1994 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1995 while let Some(block_with_parent) = current_block {
1996 current_hash = block_with_parent.parent;
1997 current_block = self.state.invalid_headers.get(¤t_hash);
1998
1999 if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
2002 return Ok(Some(current_hash))
2003 }
2004 }
2005 Ok(None)
2006 }
2007
2008 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
2012 if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
2015 !parent.difficulty().is_zero()
2016 {
2017 parent_hash = B256::ZERO;
2018 }
2019
2020 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
2021 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2022 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2023 })
2024 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
2025 }
2026
2027 fn is_sync_target_head(&self, block_hash: B256) -> bool {
2031 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2032 return target.head_block_hash == block_hash
2033 }
2034 false
2035 }
2036
2037 fn is_any_sync_target(&self, block_hash: B256) -> bool {
2041 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
2042 return target.contains(block_hash)
2043 }
2044 false
2045 }
2046
2047 fn check_invalid_ancestor_with_head(
2053 &mut self,
2054 check: B256,
2055 head: &SealedBlock<N::Block>,
2056 ) -> ProviderResult<Option<PayloadStatus>> {
2057 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
2059
2060 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
2061 }
2062
2063 fn on_invalid_new_payload(
2065 &mut self,
2066 head: SealedBlock<N::Block>,
2067 invalid: BlockWithParent,
2068 ) -> ProviderResult<PayloadStatus> {
2069 let status = self.prepare_invalid_response(invalid.parent)?;
2071
2072 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
2074 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
2075
2076 Ok(status)
2077 }
2078
2079 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
2091 let parent_hash = payload.parent_hash();
2092 let block_hash = payload.block_hash();
2093 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
2094 if lowest_buffered_ancestor == block_hash {
2095 lowest_buffered_ancestor = parent_hash;
2096 }
2097
2098 self.state.invalid_headers.get(&lowest_buffered_ancestor)
2100 }
2101
2102 fn handle_invalid_ancestor_payload(
2111 &mut self,
2112 payload: T::ExecutionData,
2113 invalid: BlockWithParent,
2114 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2115 let parent_hash = payload.parent_hash();
2116 let num_hash = payload.num_hash();
2117
2118 let block = match self.payload_validator.convert_payload_to_block(payload) {
2124 Ok(block) => block,
2125 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2126 };
2127
2128 Ok(self.on_invalid_new_payload(block, invalid)?)
2129 }
2130
2131 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2134 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2136
2137 match self.prepare_invalid_response(header.parent) {
2139 Ok(status) => Ok(Some(status)),
2140 Err(err) => {
2141 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2142 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2144 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2145 })))
2146 }
2147 }
2148 }
2149
2150 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2153 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2154 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2155 return Err(e)
2156 }
2157
2158 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2159 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2160 return Err(e)
2161 }
2162
2163 Ok(())
2164 }
2165
2166 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2168 fn try_connect_buffered_blocks(
2169 &mut self,
2170 parent: BlockNumHash,
2171 ) -> Result<(), InsertBlockFatalError> {
2172 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2173
2174 if blocks.is_empty() {
2175 return Ok(())
2177 }
2178
2179 let now = Instant::now();
2180 let block_count = blocks.len();
2181 for child in blocks {
2182 let child_num_hash = child.num_hash();
2183 match self.insert_block(child) {
2184 Ok(res) => {
2185 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2186 if self.is_any_sync_target(child_num_hash.hash) &&
2187 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2188 {
2189 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2190 self.make_canonical(child_num_hash.hash)?;
2193 }
2194 }
2195 Err(err) => {
2196 if let InsertPayloadError::Block(err) = err {
2197 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2198 if let Err(fatal) = self.on_insert_block_error(err) {
2199 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2200 return Err(fatal)
2201 }
2202 }
2203 }
2204 }
2205 }
2206
2207 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2208 Ok(())
2209 }
2210
2211 fn buffer_block(
2213 &mut self,
2214 block: SealedBlock<N::Block>,
2215 ) -> Result<(), InsertBlockError<N::Block>> {
2216 if let Err(err) = self.validate_block(&block) {
2217 return Err(InsertBlockError::consensus_error(err, block))
2218 }
2219 self.state.buffer.insert_block(block);
2220 Ok(())
2221 }
2222
2223 #[inline]
2228 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2229 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2230 }
2231
2232 #[inline]
2235 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2236 if block > local_tip {
2237 Some(block - local_tip)
2238 } else {
2239 None
2240 }
2241 }
2242
2243 fn backfill_sync_target(
2250 &self,
2251 canonical_tip_num: u64,
2252 target_block_number: u64,
2253 downloaded_block: Option<BlockNumHash>,
2254 ) -> Option<B256> {
2255 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2256
2257 let exceeds_backfill_threshold =
2259 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2260 (Some(downloaded_block), Some(state))
2262 if downloaded_block.hash == state.finalized_block_hash =>
2263 {
2264 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2265 }
2266 _ => match sync_target_state
2267 .as_ref()
2268 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2269 {
2270 Some(buffered_finalized) => {
2271 self.exceeds_backfill_run_threshold(
2274 canonical_tip_num,
2275 buffered_finalized.number(),
2276 )
2277 }
2278 None => {
2279 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2281 }
2282 },
2283 };
2284
2285 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2287 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2289 Err(err) => {
2290 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2291 }
2292 Ok(None) => {
2293 if !state.finalized_block_hash.is_zero() {
2295 return Some(state.finalized_block_hash)
2298 }
2299
2300 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2313 return Some(state.head_block_hash)
2314 }
2315 Ok(Some(_)) => {
2316 }
2318 }
2319 }
2320
2321 None
2322 }
2323
2324 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2327 let mut canonical = self.state.tree_state.current_canonical_head;
2328 let mut persisted = self.persistence_state.last_persisted_block;
2329
2330 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2331 Ok(self
2332 .sealed_header_by_hash(num_hash.hash)?
2333 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2334 .parent_num_hash())
2335 };
2336
2337 while canonical.number > persisted.number {
2340 canonical = parent_num_hash(canonical)?;
2341 }
2342
2343 if canonical == persisted {
2345 return Ok(None);
2346 }
2347
2348 while persisted.number > canonical.number {
2354 persisted = parent_num_hash(persisted)?;
2355 }
2356
2357 debug_assert_eq!(persisted.number, canonical.number);
2358
2359 while persisted.hash != canonical.hash {
2361 canonical = parent_num_hash(canonical)?;
2362 persisted = parent_num_hash(persisted)?;
2363 }
2364
2365 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2366
2367 Ok(Some(persisted.number))
2368 }
2369
2370 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2374 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2375 let start = Instant::now();
2376
2377 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2379
2380 let tip = chain_update.tip().clone_sealed_header();
2381 let notification = chain_update.to_chain_notification();
2382
2383 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2385 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2386 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2387 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2388
2389 self.update_reorg_metrics(old.len(), old_first);
2390 self.reinsert_reorged_blocks(new.clone());
2391
2392 if !self.use_hashed_state {
2395 self.reinsert_reorged_blocks(old.clone());
2396 }
2397 }
2398
2399 self.canonical_in_memory_state.update_chain(chain_update);
2401 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2402
2403 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2405
2406 self.canonical_in_memory_state.notify_canon_state(notification);
2408
2409 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2411 Box::new(tip),
2412 start.elapsed(),
2413 ));
2414 }
2415
2416 fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
2418 if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
2419 if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
2420 first_reorged_block <= finalized.number
2421 {
2422 self.metrics.tree.reorgs.finalized.increment(1);
2423 } else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
2424 first_reorged_block <= safe.number
2425 {
2426 self.metrics.tree.reorgs.safe.increment(1);
2427 } else {
2428 self.metrics.tree.reorgs.head.increment(1);
2429 }
2430 } else {
2431 debug_unreachable!("Reorged chain doesn't have any blocks");
2432 }
2433 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2434 }
2435
2436 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2438 for block in new_chain {
2439 if self
2440 .state
2441 .tree_state
2442 .executed_block_by_hash(block.recovered_block().hash())
2443 .is_none()
2444 {
2445 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2446 self.state.tree_state.insert_executed(block);
2447 }
2448 }
2449 }
2450
2451 fn on_disconnected_downloaded_block(
2456 &self,
2457 downloaded_block: BlockNumHash,
2458 missing_parent: BlockNumHash,
2459 head: BlockNumHash,
2460 ) -> Option<TreeEvent> {
2461 if let Some(target) =
2463 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2464 {
2465 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2466 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2467 }
2468
2469 let request = if let Some(distance) =
2479 self.distance_from_local_tip(head.number, missing_parent.number)
2480 {
2481 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2482 DownloadRequest::BlockRange(missing_parent.hash, distance)
2483 } else {
2484 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2485 DownloadRequest::single_block(missing_parent.hash)
2488 };
2489
2490 Some(TreeEvent::Download(request))
2491 }
2492
2493 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2499 fn on_downloaded_block(
2500 &mut self,
2501 block: SealedBlock<N::Block>,
2502 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2503 let block_num_hash = block.num_hash();
2504 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2505 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2506 return Ok(None)
2507 }
2508
2509 if !self.backfill_sync_state.is_idle() {
2510 return Ok(None)
2511 }
2512
2513 match self.insert_block(block) {
2515 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2516 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2519 sync_target.contains(block_num_hash.hash)
2520 {
2521 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2522
2523 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2526 sync_target_head: block_num_hash.hash,
2527 })))
2528 }
2529 trace!(target: "engine::tree", "appended downloaded block");
2530 self.try_connect_buffered_blocks(block_num_hash)?;
2531 }
2532 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2533 return Ok(self.on_disconnected_downloaded_block(
2536 block_num_hash,
2537 missing_ancestor,
2538 head,
2539 ))
2540 }
2541 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2542 trace!(target: "engine::tree", "downloaded block already executed");
2543 }
2544 Err(err) => {
2545 if let InsertPayloadError::Block(err) = err {
2546 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2547 if let Err(fatal) = self.on_insert_block_error(err) {
2548 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2549 return Err(fatal)
2550 }
2551 }
2552 }
2553 }
2554 Ok(None)
2555 }
2556
2557 fn insert_payload(
2566 &mut self,
2567 payload: T::ExecutionData,
2568 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2569 self.insert_block_or_payload(
2570 payload.block_with_parent(),
2571 payload,
2572 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2573 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2574 )
2575 }
2576
2577 fn insert_block(
2578 &mut self,
2579 block: SealedBlock<N::Block>,
2580 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2581 self.insert_block_or_payload(
2582 block.block_with_parent(),
2583 block,
2584 |validator, block, ctx| validator.validate_block(block, ctx),
2585 |_, block| Ok(block),
2586 )
2587 }
2588
2589 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2606 fn insert_block_or_payload<Input, Err>(
2607 &mut self,
2608 block_id: BlockWithParent,
2609 input: Input,
2610 execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2611 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2612 ) -> Result<InsertPayloadOk, Err>
2613 where
2614 Err: From<InsertBlockError<N::Block>>,
2615 {
2616 let block_insert_start = Instant::now();
2617 let block_num_hash = block_id.block;
2618 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2619
2620 if self.state.tree_state.sealed_header_by_hash(&block_num_hash.hash).is_some() {
2622 convert_to_block(self, input)?;
2623 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2624 }
2625
2626 if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
2629 match self.provider.sealed_header_by_hash(block_num_hash.hash) {
2630 Err(err) => {
2631 let block = convert_to_block(self, input)?;
2632 return Err(InsertBlockError::new(block, err.into()).into());
2633 }
2634 Ok(Some(_)) => {
2635 convert_to_block(self, input)?;
2636 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
2637 }
2638 Ok(None) => {}
2639 }
2640 }
2641
2642 match self.state_provider_builder(block_id.parent) {
2644 Err(err) => {
2645 let block = convert_to_block(self, input)?;
2646 return Err(InsertBlockError::new(block, err.into()).into());
2647 }
2648 Ok(None) => {
2649 let block = convert_to_block(self, input)?;
2650
2651 let missing_ancestor = self
2654 .state
2655 .buffer
2656 .lowest_ancestor(&block.parent_hash())
2657 .map(|block| block.parent_num_hash())
2658 .unwrap_or_else(|| block.parent_num_hash());
2659
2660 self.state.buffer.insert_block(block);
2661
2662 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2663 head: self.state.tree_state.current_canonical_head,
2664 missing_ancestor,
2665 }))
2666 }
2667 Ok(Some(_)) => {}
2668 }
2669
2670 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2675
2676 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2677
2678 let start = Instant::now();
2679
2680 let executed = execute(&mut self.payload_validator, input, ctx)?;
2681
2682 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2684 {
2685 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2686 self.canonical_in_memory_state.set_pending_block(executed.clone());
2687 }
2688
2689 self.state.tree_state.insert_executed(executed.clone());
2690 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2691
2692 let elapsed = start.elapsed();
2694 let engine_event = if is_fork {
2695 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2696 } else {
2697 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2698 };
2699 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2700
2701 self.metrics
2702 .engine
2703 .block_insert_total_duration
2704 .record(block_insert_start.elapsed().as_secs_f64());
2705 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2706 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2707 }
2708
2709 fn on_insert_block_error(
2715 &mut self,
2716 error: InsertBlockError<N::Block>,
2717 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2718 let (block, error) = error.split();
2719
2720 let validation_err = error.ensure_validation_error()?;
2723
2724 warn!(
2728 target: "engine::tree",
2729 invalid_hash=%block.hash(),
2730 invalid_number=block.number(),
2731 %validation_err,
2732 "Invalid block error on new payload",
2733 );
2734 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2735
2736 self.state.invalid_headers.insert(block.block_with_parent());
2738 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2739 Box::new(block),
2740 )));
2741
2742 Ok(PayloadStatus::new(
2743 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2744 latest_valid_hash,
2745 ))
2746 }
2747
2748 fn on_new_payload_error(
2750 &mut self,
2751 error: NewPayloadError,
2752 payload_num_hash: NumHash,
2753 parent_hash: B256,
2754 ) -> ProviderResult<PayloadStatus> {
2755 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2756 let latest_valid_hash =
2759 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2760 None
2764 } else {
2765 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2766 };
2767
2768 let status = PayloadStatusEnum::from(error);
2769 Ok(PayloadStatus::new(status, latest_valid_hash))
2770 }
2771
2772 pub fn find_canonical_header(
2774 &self,
2775 hash: B256,
2776 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2777 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2778
2779 if canonical.is_none() {
2780 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2781 }
2782
2783 Ok(canonical)
2784 }
2785
2786 fn update_finalized_block(
2788 &self,
2789 finalized_block_hash: B256,
2790 ) -> Result<(), OnForkChoiceUpdated> {
2791 if finalized_block_hash.is_zero() {
2792 return Ok(())
2793 }
2794
2795 match self.find_canonical_header(finalized_block_hash) {
2796 Ok(None) => {
2797 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2798 return Err(OnForkChoiceUpdated::invalid_state())
2800 }
2801 Ok(Some(finalized)) => {
2802 if Some(finalized.num_hash()) !=
2803 self.canonical_in_memory_state.get_finalized_num_hash()
2804 {
2805 let _ = self.persistence.save_finalized_block_number(finalized.number());
2808 self.canonical_in_memory_state.set_finalized(finalized.clone());
2809 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2811 }
2812 }
2813 Err(err) => {
2814 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2815 }
2816 }
2817
2818 Ok(())
2819 }
2820
2821 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2823 if safe_block_hash.is_zero() {
2824 return Ok(())
2825 }
2826
2827 match self.find_canonical_header(safe_block_hash) {
2828 Ok(None) => {
2829 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2830 return Err(OnForkChoiceUpdated::invalid_state())
2832 }
2833 Ok(Some(safe)) => {
2834 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2835 let _ = self.persistence.save_safe_block_number(safe.number());
2838 self.canonical_in_memory_state.set_safe(safe.clone());
2839 self.metrics.tree.safe_block_height.set(safe.number() as f64);
2841 }
2842 }
2843 Err(err) => {
2844 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2845 }
2846 }
2847
2848 Ok(())
2849 }
2850
2851 fn ensure_consistent_forkchoice_state(
2860 &self,
2861 state: ForkchoiceState,
2862 ) -> Result<(), OnForkChoiceUpdated> {
2863 self.update_finalized_block(state.finalized_block_hash)?;
2869
2870 self.update_safe_block(state.safe_block_hash)
2876 }
2877
2878 fn process_payload_attributes(
2883 &self,
2884 attrs: T::PayloadAttributes,
2885 head: &N::BlockHeader,
2886 state: ForkchoiceState,
2887 version: EngineApiMessageVersion,
2888 ) -> OnForkChoiceUpdated {
2889 if let Err(err) =
2890 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2891 {
2892 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2893 return OnForkChoiceUpdated::invalid_payload_attributes()
2894 }
2895
2896 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2901 state.head_block_hash,
2902 attrs,
2903 version as u8,
2904 ) {
2905 Ok(attributes) => {
2906 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2909
2910 OnForkChoiceUpdated::updated_with_pending_payload_id(
2922 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2923 pending_payload_id,
2924 )
2925 }
2926 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2927 }
2928 }
2929
2930 pub(crate) fn remove_before(
2937 &mut self,
2938 upper_bound: BlockNumHash,
2939 finalized_hash: Option<B256>,
2940 ) -> ProviderResult<()> {
2941 let num = if let Some(hash) = finalized_hash {
2944 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2945 } else {
2946 None
2947 };
2948
2949 self.state.tree_state.remove_until(
2950 upper_bound,
2951 self.persistence_state.last_persisted_block.hash,
2952 num,
2953 );
2954 Ok(())
2955 }
2956
2957 pub fn state_provider_builder(
2962 &self,
2963 hash: B256,
2964 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2965 where
2966 P: BlockReader + StateProviderFactory + StateReader + Clone,
2967 {
2968 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2969 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2970 return Ok(Some(StateProviderBuilder::new(
2972 self.provider.clone(),
2973 historical,
2974 Some(blocks),
2975 )))
2976 }
2977
2978 if let Some(header) = self.provider.header(hash)? {
2980 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2981 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2984 }
2985
2986 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2987 Ok(None)
2988 }
2989}
2990
2991#[derive(Debug)]
2993enum LoopEvent<T, N>
2994where
2995 N: NodePrimitives,
2996 T: PayloadTypes,
2997{
2998 EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
3000 PersistenceComplete {
3002 result: Option<BlockNumHash>,
3004 start_time: Instant,
3006 },
3007 Disconnected,
3009}
3010
3011#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3017pub enum BlockStatus {
3018 Valid,
3020 Disconnected {
3022 head: BlockNumHash,
3024 missing_ancestor: BlockNumHash,
3026 },
3027}
3028
3029#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3034pub enum InsertPayloadOk {
3035 AlreadySeen(BlockStatus),
3037 Inserted(BlockStatus),
3039}
3040
3041#[derive(Debug, Clone, Copy)]
3043enum PersistTarget {
3044 Threshold,
3046 Head,
3048}