1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use reth_chain_state::{
17 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
18 NewCanonicalChain,
19};
20use reth_consensus::{Consensus, FullConsensus};
21use reth_engine_primitives::{
22 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
23 ForkchoiceStateTracker, OnForkChoiceUpdated,
24};
25use reth_errors::{ConsensusError, ProviderResult};
26use reth_evm::{ConfigureEvm, OnStateHook};
27use reth_payload_builder::PayloadBuilderHandle;
28use reth_payload_primitives::{
29 BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
30};
31use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
32use reth_provider::{
33 BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
34 StateProviderFactory, StateReader, TransactionVariant, TrieReader,
35};
36use reth_revm::database::StateProviderDatabase;
37use reth_stages_api::ControlFlow;
38use revm::state::EvmState;
39use state::TreeState;
40use std::{
41 fmt::Debug,
42 ops,
43 sync::{
44 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
45 Arc,
46 },
47 time::Instant,
48};
49use tokio::sync::{
50 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
51 oneshot::{self, error::TryRecvError},
52};
53use tracing::*;
54
55mod block_buffer;
56mod cached_state;
57pub mod error;
58pub mod instrumented_state;
59mod invalid_headers;
60mod metrics;
61mod payload_processor;
62pub mod payload_validator;
63mod persistence_state;
64pub mod precompile_cache;
65#[cfg(test)]
66mod tests;
67#[expect(unused)]
68mod trie_updates;
69
70use crate::tree::error::AdvancePersistenceError;
71pub use block_buffer::BlockBuffer;
72pub use invalid_headers::InvalidHeaderCache;
73pub use payload_processor::*;
74pub use payload_validator::{BasicEngineValidator, EngineValidator};
75pub use persistence_state::PersistenceState;
76pub use reth_engine_primitives::TreeConfig;
77
78pub mod state;
79
80pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91#[derive(Clone, Debug)]
93pub struct StateProviderBuilder<N: NodePrimitives, P> {
94 provider_factory: P,
96 historical: B256,
98 overlay: Option<Vec<ExecutedBlock<N>>>,
100}
101
102impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
103 pub const fn new(
106 provider_factory: P,
107 historical: B256,
108 overlay: Option<Vec<ExecutedBlock<N>>>,
109 ) -> Self {
110 Self { provider_factory, historical, overlay }
111 }
112}
113
114impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
115where
116 P: BlockReader + StateProviderFactory + StateReader + Clone,
117{
118 pub fn build(&self) -> ProviderResult<StateProviderBox> {
120 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
121 if let Some(overlay) = self.overlay.clone() {
122 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
123 }
124 Ok(provider)
125 }
126}
127
128#[derive(Debug)]
132pub struct EngineApiTreeState<N: NodePrimitives> {
133 tree_state: TreeState<N>,
135 forkchoice_state_tracker: ForkchoiceStateTracker,
137 buffer: BlockBuffer<N::Block>,
139 invalid_headers: InvalidHeaderCache,
142}
143
144impl<N: NodePrimitives> EngineApiTreeState<N> {
145 fn new(
146 block_buffer_limit: u32,
147 max_invalid_header_cache_length: u32,
148 canonical_block: BlockNumHash,
149 engine_kind: EngineApiKind,
150 ) -> Self {
151 Self {
152 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
153 buffer: BlockBuffer::new(block_buffer_limit),
154 tree_state: TreeState::new(canonical_block, engine_kind),
155 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
156 }
157 }
158}
159
160#[derive(Debug)]
162pub struct TreeOutcome<T> {
163 pub outcome: T,
165 pub event: Option<TreeEvent>,
167}
168
169impl<T> TreeOutcome<T> {
170 pub const fn new(outcome: T) -> Self {
172 Self { outcome, event: None }
173 }
174
175 pub fn with_event(mut self, event: TreeEvent) -> Self {
177 self.event = Some(event);
178 self
179 }
180}
181
182#[derive(Debug)]
184pub enum TreeEvent {
185 TreeAction(TreeAction),
187 BackfillAction(BackfillAction),
189 Download(DownloadRequest),
191}
192
193impl TreeEvent {
194 const fn is_backfill_action(&self) -> bool {
196 matches!(self, Self::BackfillAction(_))
197 }
198}
199
200#[derive(Debug)]
202pub enum TreeAction {
203 MakeCanonical {
205 sync_target_head: B256,
207 },
208}
209
210struct MeteredStateHook {
212 metrics: reth_evm::metrics::ExecutorMetrics,
213 inner_hook: Box<dyn OnStateHook>,
214}
215
216impl OnStateHook for MeteredStateHook {
217 fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
218 let accounts = state.keys().len();
220 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
221 let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
222
223 self.metrics.accounts_loaded_histogram.record(accounts as f64);
224 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
225 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
226
227 self.inner_hook.on_state(source, state);
229 }
230}
231
232pub struct EngineApiTreeHandler<N, P, T, V, C>
237where
238 N: NodePrimitives,
239 T: PayloadTypes,
240 C: ConfigureEvm<Primitives = N> + 'static,
241{
242 provider: P,
243 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
244 payload_validator: V,
245 state: EngineApiTreeState<N>,
247 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
256 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
258 outgoing: UnboundedSender<EngineApiEvent<N>>,
260 persistence: PersistenceHandle<N>,
262 persistence_state: PersistenceState,
264 backfill_sync_state: BackfillSyncState,
266 canonical_in_memory_state: CanonicalInMemoryState<N>,
269 payload_builder: PayloadBuilderHandle<T>,
272 config: TreeConfig,
274 metrics: EngineApiMetrics,
276 engine_kind: EngineApiKind,
278 evm_config: C,
280}
281
282impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
283 for EngineApiTreeHandler<N, P, T, V, C>
284where
285 N: NodePrimitives,
286 C: Debug + ConfigureEvm<Primitives = N>,
287{
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 f.debug_struct("EngineApiTreeHandler")
290 .field("provider", &self.provider)
291 .field("consensus", &self.consensus)
292 .field("payload_validator", &self.payload_validator)
293 .field("state", &self.state)
294 .field("incoming_tx", &self.incoming_tx)
295 .field("persistence", &self.persistence)
296 .field("persistence_state", &self.persistence_state)
297 .field("backfill_sync_state", &self.backfill_sync_state)
298 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
299 .field("payload_builder", &self.payload_builder)
300 .field("config", &self.config)
301 .field("metrics", &self.metrics)
302 .field("engine_kind", &self.engine_kind)
303 .field("evm_config", &self.evm_config)
304 .finish()
305 }
306}
307
308impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
309where
310 N: NodePrimitives,
311 P: DatabaseProviderFactory
312 + BlockReader<Block = N::Block, Header = N::BlockHeader>
313 + StateProviderFactory
314 + StateReader<Receipt = N::Receipt>
315 + HashedPostStateProvider
316 + TrieReader
317 + Clone
318 + 'static,
319 <P as DatabaseProviderFactory>::Provider:
320 BlockReader<Block = N::Block, Header = N::BlockHeader>,
321 C: ConfigureEvm<Primitives = N> + 'static,
322 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
323 V: EngineValidator<T>,
324{
325 #[expect(clippy::too_many_arguments)]
327 pub fn new(
328 provider: P,
329 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
330 payload_validator: V,
331 outgoing: UnboundedSender<EngineApiEvent<N>>,
332 state: EngineApiTreeState<N>,
333 canonical_in_memory_state: CanonicalInMemoryState<N>,
334 persistence: PersistenceHandle<N>,
335 persistence_state: PersistenceState,
336 payload_builder: PayloadBuilderHandle<T>,
337 config: TreeConfig,
338 engine_kind: EngineApiKind,
339 evm_config: C,
340 ) -> Self {
341 let (incoming_tx, incoming) = std::sync::mpsc::channel();
342
343 Self {
344 provider,
345 consensus,
346 payload_validator,
347 incoming,
348 outgoing,
349 persistence,
350 persistence_state,
351 backfill_sync_state: BackfillSyncState::Idle,
352 state,
353 canonical_in_memory_state,
354 payload_builder,
355 config,
356 metrics: Default::default(),
357 incoming_tx,
358 engine_kind,
359 evm_config,
360 }
361 }
362
363 #[expect(clippy::complexity)]
369 pub fn spawn_new(
370 provider: P,
371 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
372 payload_validator: V,
373 persistence: PersistenceHandle<N>,
374 payload_builder: PayloadBuilderHandle<T>,
375 canonical_in_memory_state: CanonicalInMemoryState<N>,
376 config: TreeConfig,
377 kind: EngineApiKind,
378 evm_config: C,
379 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
380 {
381 let best_block_number = provider.best_block_number().unwrap_or(0);
382 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
383
384 let persistence_state = PersistenceState {
385 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
386 rx: None,
387 };
388
389 let (tx, outgoing) = unbounded_channel();
390 let state = EngineApiTreeState::new(
391 config.block_buffer_limit(),
392 config.max_invalid_header_cache_length(),
393 header.num_hash(),
394 kind,
395 );
396
397 let task = Self::new(
398 provider,
399 consensus,
400 payload_validator,
401 tx,
402 state,
403 canonical_in_memory_state,
404 persistence,
405 persistence_state,
406 payload_builder,
407 config,
408 kind,
409 evm_config,
410 );
411 let incoming = task.incoming_tx.clone();
412 std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
413 (incoming, outgoing)
414 }
415
416 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
418 self.incoming_tx.clone()
419 }
420
421 pub fn run(mut self) {
425 loop {
426 match self.try_recv_engine_message() {
427 Ok(Some(msg)) => {
428 debug!(target: "engine::tree", %msg, "received new engine message");
429 match self.on_engine_message(msg) {
430 Ok(ops::ControlFlow::Break(())) => return,
431 Ok(ops::ControlFlow::Continue(())) => {}
432 Err(fatal) => {
433 error!(target: "engine::tree", %fatal, "insert block fatal error");
434 return
435 }
436 }
437 }
438 Ok(None) => {
439 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
440 }
441 Err(_err) => {
442 error!(target: "engine::tree", "Engine channel disconnected");
443 return
444 }
445 }
446
447 if let Err(err) = self.advance_persistence() {
448 error!(target: "engine::tree", %err, "Advancing persistence failed");
449 return
450 }
451 }
452 }
453
454 fn on_downloaded(
460 &mut self,
461 mut blocks: Vec<SealedBlock<N::Block>>,
462 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
463 if blocks.is_empty() {
464 return Ok(None)
466 }
467
468 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
469 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
470 for block in blocks.drain(..batch) {
471 if let Some(event) = self.on_downloaded_block(block)? {
472 let needs_backfill = event.is_backfill_action();
473 self.on_tree_event(event)?;
474 if needs_backfill {
475 return Ok(None)
477 }
478 }
479 }
480
481 if !blocks.is_empty() {
483 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
484 }
485
486 Ok(None)
487 }
488
489 #[instrument(
504 level = "debug",
505 target = "engine::tree",
506 skip_all,
507 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
508 )]
509 fn on_new_payload(
510 &mut self,
511 payload: T::ExecutionData,
512 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
513 trace!(target: "engine::tree", "invoked new payload");
514
515 let start = Instant::now();
517
518 let num_hash = payload.num_hash();
545 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
546 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
547
548 let block_hash = num_hash.hash;
549
550 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
552 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
553 return Ok(TreeOutcome::new(status));
554 }
555
556 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
558
559 let status = if self.backfill_sync_state.is_idle() {
560 self.try_insert_payload(payload)?
561 } else {
562 self.try_buffer_payload(payload)?
563 };
564
565 let mut outcome = TreeOutcome::new(status);
566 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
568 if self.state.tree_state.canonical_block_hash() != block_hash {
570 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
571 sync_target_head: block_hash,
572 }));
573 }
574 }
575
576 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
578
579 Ok(outcome)
580 }
581
582 #[instrument(level = "debug", target = "engine::tree", skip_all)]
589 fn try_insert_payload(
590 &mut self,
591 payload: T::ExecutionData,
592 ) -> Result<PayloadStatus, InsertBlockFatalError> {
593 let block_hash = payload.block_hash();
594 let num_hash = payload.num_hash();
595 let parent_hash = payload.parent_hash();
596 let mut latest_valid_hash = None;
597
598 match self.insert_payload(payload) {
599 Ok(status) => {
600 let status = match status {
601 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
602 latest_valid_hash = Some(block_hash);
603 self.try_connect_buffered_blocks(num_hash)?;
604 PayloadStatusEnum::Valid
605 }
606 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
607 latest_valid_hash = Some(block_hash);
608 PayloadStatusEnum::Valid
609 }
610 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
611 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
612 PayloadStatusEnum::Syncing
614 }
615 };
616
617 Ok(PayloadStatus::new(status, latest_valid_hash))
618 }
619 Err(error) => match error {
620 InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
621 InsertPayloadError::Payload(error) => {
622 Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
623 }
624 },
625 }
626 }
627
628 fn try_buffer_payload(
637 &mut self,
638 payload: T::ExecutionData,
639 ) -> Result<PayloadStatus, InsertBlockFatalError> {
640 let parent_hash = payload.parent_hash();
641 let num_hash = payload.num_hash();
642
643 match self.payload_validator.convert_payload_to_block(payload) {
644 Ok(block) => {
646 if let Err(error) = self.buffer_block(block) {
647 Ok(self.on_insert_block_error(error)?)
648 } else {
649 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
650 }
651 }
652 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
653 }
654 }
655
656 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
663 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
665 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
666 self.metrics.engine.executed_new_block_cache_miss.increment(1);
667 return Ok(None)
668 };
669
670 let new_head_number = new_head_block.recovered_block().number();
671 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
672
673 let mut new_chain = vec![new_head_block.clone()];
674 let mut current_hash = new_head_block.recovered_block().parent_hash();
675 let mut current_number = new_head_number - 1;
676
677 while current_number > current_canonical_number {
682 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
683 {
684 current_hash = block.recovered_block().parent_hash();
685 current_number -= 1;
686 new_chain.push(block);
687 } else {
688 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
689 return Ok(None)
692 }
693 }
694
695 if current_hash == self.state.tree_state.current_canonical_head.hash {
698 new_chain.reverse();
699
700 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
702 }
703
704 let mut old_chain = Vec::new();
706 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
707
708 while current_canonical_number > current_number {
711 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
712 old_hash = block.recovered_block().parent_hash();
713 old_chain.push(block);
714 current_canonical_number -= 1;
715 } else {
716 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
718 return Ok(None)
719 }
720 }
721
722 debug_assert_eq!(current_number, current_canonical_number);
724
725 while old_hash != current_hash {
728 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
729 old_hash = block.recovered_block().parent_hash();
730 old_chain.push(block);
731 } else {
732 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
734 return Ok(None)
735 }
736
737 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
738 {
739 current_hash = block.recovered_block().parent_hash();
740 new_chain.push(block);
741 } else {
742 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
744 return Ok(None)
745 }
746 }
747 new_chain.reverse();
748 old_chain.reverse();
749
750 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
751 }
752
753 fn update_latest_block_to_canonical_ancestor(
765 &mut self,
766 canonical_header: &SealedHeader<N::BlockHeader>,
767 ) -> ProviderResult<()> {
768 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
769 let current_head_number = self.state.tree_state.canonical_block_number();
770 let new_head_number = canonical_header.number();
771 let new_head_hash = canonical_header.hash();
772
773 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
775
776 if new_head_number < current_head_number {
778 debug!(
779 target: "engine::tree",
780 current_head = current_head_number,
781 new_head = new_head_number,
782 new_head_hash = ?new_head_hash,
783 "FCU unwind detected: reverting to canonical ancestor"
784 );
785
786 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
787 } else {
788 debug!(
789 target: "engine::tree",
790 previous_head = current_head_number,
791 new_head = new_head_number,
792 new_head_hash = ?new_head_hash,
793 "Advancing latest block to canonical ancestor"
794 );
795 self.handle_chain_advance_or_same_height(canonical_header)
796 }
797 }
798
799 fn handle_canonical_chain_unwind(
802 &self,
803 current_head_number: u64,
804 canonical_header: &SealedHeader<N::BlockHeader>,
805 ) -> ProviderResult<()> {
806 let new_head_number = canonical_header.number();
807 debug!(
808 target: "engine::tree",
809 from = current_head_number,
810 to = new_head_number,
811 "Handling unwind: collecting blocks to remove from in-memory state"
812 );
813
814 let old_blocks =
816 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
817
818 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
820 }
821
822 fn collect_blocks_for_canonical_unwind(
824 &self,
825 new_head_number: u64,
826 current_head_number: u64,
827 ) -> Vec<ExecutedBlock<N>> {
828 let mut old_blocks =
829 Vec::with_capacity((current_head_number.saturating_sub(new_head_number)) as usize);
830
831 for block_num in (new_head_number + 1)..=current_head_number {
832 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
833 let executed_block = block_state.block_ref().clone();
834 old_blocks.push(executed_block);
835 debug!(
836 target: "engine::tree",
837 block_number = block_num,
838 "Collected block for removal from in-memory state"
839 );
840 }
841 }
842
843 if old_blocks.is_empty() {
844 debug!(
845 target: "engine::tree",
846 "No blocks found in memory to remove, will clear and reset state"
847 );
848 }
849
850 old_blocks
851 }
852
853 fn apply_canonical_ancestor_via_reorg(
855 &self,
856 canonical_header: &SealedHeader<N::BlockHeader>,
857 old_blocks: Vec<ExecutedBlock<N>>,
858 ) -> ProviderResult<()> {
859 let new_head_hash = canonical_header.hash();
860 let new_head_number = canonical_header.number();
861
862 match self.canonical_block_by_hash(new_head_hash)? {
864 Some(executed_block) => {
865 self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
867 new: vec![executed_block],
868 old: old_blocks,
869 });
870
871 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
874
875 debug!(
876 target: "engine::tree",
877 block_number = new_head_number,
878 block_hash = ?new_head_hash,
879 "Successfully loaded canonical ancestor into memory via reorg"
880 );
881 }
882 None => {
883 warn!(
885 target: "engine::tree",
886 block_hash = ?new_head_hash,
887 "Could not find canonical ancestor block, updating header only"
888 );
889 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
890 }
891 }
892
893 Ok(())
894 }
895
896 fn handle_chain_advance_or_same_height(
898 &self,
899 canonical_header: &SealedHeader<N::BlockHeader>,
900 ) -> ProviderResult<()> {
901 let new_head_number = canonical_header.number();
902 let new_head_hash = canonical_header.hash();
903
904 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
906
907 self.ensure_block_in_memory(new_head_number, new_head_hash)
909 }
910
911 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
913 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
915 return Ok(());
916 }
917
918 if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
920 self.canonical_in_memory_state
921 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
922
923 debug!(
924 target: "engine::tree",
925 block_number,
926 block_hash = ?block_hash,
927 "Added canonical block to in-memory state"
928 );
929 }
930
931 Ok(())
932 }
933
934 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
943 fn on_forkchoice_updated(
944 &mut self,
945 state: ForkchoiceState,
946 attrs: Option<T::PayloadAttributes>,
947 version: EngineApiMessageVersion,
948 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
949 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
950
951 self.record_forkchoice_metrics();
953
954 if let Some(early_result) = self.validate_forkchoice_state(state)? {
956 return Ok(TreeOutcome::new(early_result));
957 }
958
959 if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
961 return Ok(result);
962 }
963
964 if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
967 return Ok(result);
968 }
969
970 self.handle_missing_block(state)
972 }
973
974 fn record_forkchoice_metrics(&self) {
976 self.canonical_in_memory_state.on_forkchoice_update_received();
977 }
978
979 fn validate_forkchoice_state(
984 &mut self,
985 state: ForkchoiceState,
986 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
987 if state.head_block_hash.is_zero() {
988 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
989 }
990
991 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
994 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
995 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
996 }
997
998 if !self.backfill_sync_state.is_idle() {
999 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1002 return Ok(Some(OnForkChoiceUpdated::syncing()));
1003 }
1004
1005 Ok(None)
1006 }
1007
1008 fn handle_canonical_head(
1014 &self,
1015 state: ForkchoiceState,
1016 attrs: &Option<T::PayloadAttributes>, version: EngineApiMessageVersion,
1018 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1019 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1034 return Ok(None);
1035 }
1036
1037 trace!(target: "engine::tree", "fcu head hash is already canonical");
1038
1039 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1041 return Ok(Some(TreeOutcome::new(outcome)));
1043 }
1044
1045 if let Some(attr) = attrs {
1047 let tip = self
1048 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1049 .ok_or_else(|| {
1050 ProviderError::HeaderNotFound(state.head_block_hash.into())
1053 })?;
1054 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1056 return Ok(Some(TreeOutcome::new(updated)));
1057 }
1058
1059 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1061 PayloadStatusEnum::Valid,
1062 Some(state.head_block_hash),
1063 )));
1064 Ok(Some(outcome))
1065 }
1066
1067 fn apply_chain_update(
1079 &mut self,
1080 state: ForkchoiceState,
1081 attrs: &Option<T::PayloadAttributes>,
1082 version: EngineApiMessageVersion,
1083 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1084 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1086 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1087
1088 if self.engine_kind.is_opstack() ||
1091 self.config.always_process_payload_attributes_on_canonical_head()
1092 {
1093 if self.config.unwind_canonical_header() {
1099 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1100 }
1101
1102 if let Some(attr) = attrs {
1103 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1104 let updated = self.process_payload_attributes(
1106 attr.clone(),
1107 &canonical_header,
1108 state,
1109 version,
1110 );
1111 return Ok(Some(TreeOutcome::new(updated)));
1112 }
1113 }
1114
1115 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1127 PayloadStatusEnum::Valid,
1128 Some(state.head_block_hash),
1129 )));
1130 return Ok(Some(outcome));
1131 }
1132
1133 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1135 let tip = chain_update.tip().clone_sealed_header();
1136 self.on_canonical_chain_update(chain_update);
1137
1138 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1140 return Ok(Some(TreeOutcome::new(outcome)));
1142 }
1143
1144 if let Some(attr) = attrs {
1145 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1147 return Ok(Some(TreeOutcome::new(updated)));
1148 }
1149
1150 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1151 PayloadStatusEnum::Valid,
1152 Some(state.head_block_hash),
1153 )));
1154 return Ok(Some(outcome));
1155 }
1156
1157 Ok(None)
1158 }
1159
1160 fn handle_missing_block(
1165 &self,
1166 state: ForkchoiceState,
1167 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1168 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1175 !state.safe_block_hash.is_zero() &&
1177 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1178 {
1179 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1180 state.safe_block_hash
1181 } else {
1182 state.head_block_hash
1183 };
1184
1185 let target = self.lowest_buffered_ancestor_or(target);
1186 trace!(target: "engine::tree", %target, "downloading missing block");
1187
1188 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1189 PayloadStatusEnum::Syncing,
1190 )))
1191 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1192 }
1193
1194 #[expect(clippy::type_complexity)]
1203 fn try_recv_engine_message(
1204 &self,
1205 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1206 if self.persistence_state.in_progress() {
1207 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1209 Ok(msg) => Ok(Some(msg)),
1210 Err(err) => match err {
1211 RecvTimeoutError::Timeout => Ok(None),
1212 RecvTimeoutError::Disconnected => Err(RecvError),
1213 },
1214 }
1215 } else {
1216 self.incoming.recv().map(Some)
1217 }
1218 }
1219
1220 fn remove_blocks(&mut self, new_tip_num: u64) {
1223 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1224 if new_tip_num < self.persistence_state.last_persisted_block.number {
1225 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1226 let (tx, rx) = oneshot::channel();
1227 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1228 self.persistence_state.start_remove(new_tip_num, rx);
1229 }
1230 }
1231
1232 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1235 if blocks_to_persist.is_empty() {
1236 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1237 return
1238 }
1239
1240 let highest_num_hash = blocks_to_persist
1242 .iter()
1243 .max_by_key(|block| block.recovered_block().number())
1244 .map(|b| b.recovered_block().num_hash())
1245 .expect("Checked non-empty persisting blocks");
1246
1247 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1248 let (tx, rx) = oneshot::channel();
1249 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1250
1251 self.persistence_state.start_save(highest_num_hash, rx);
1252 }
1253
1254 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1259 if self.persistence_state.in_progress() {
1260 let (mut rx, start_time, current_action) = self
1261 .persistence_state
1262 .rx
1263 .take()
1264 .expect("if a persistence task is in progress Receiver must be Some");
1265 match rx.try_recv() {
1267 Ok(last_persisted_hash_num) => {
1268 self.on_persistence_complete(last_persisted_hash_num, start_time)?;
1269 }
1270 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1271 Err(TryRecvError::Empty) => {
1272 self.persistence_state.rx = Some((rx, start_time, current_action))
1273 }
1274 }
1275 }
1276
1277 if !self.persistence_state.in_progress() {
1278 if let Some(new_tip_num) = self.find_disk_reorg()? {
1279 self.remove_blocks(new_tip_num)
1280 } else if self.should_persist() {
1281 let blocks_to_persist =
1282 self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
1283 self.persist_blocks(blocks_to_persist);
1284 }
1285 }
1286
1287 Ok(())
1288 }
1289
1290 fn finish_termination(
1295 &mut self,
1296 pending_termination: oneshot::Sender<()>,
1297 ) -> Result<(), AdvancePersistenceError> {
1298 trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
1299 let result = self.persist_until_complete();
1300 let _ = pending_termination.send(());
1301 result
1302 }
1303
1304 fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
1306 loop {
1307 if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
1309 let result = rx.blocking_recv().map_err(|_| TryRecvError::Closed)?;
1310 self.on_persistence_complete(result, start_time)?;
1311 }
1312
1313 let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
1314
1315 if blocks_to_persist.is_empty() {
1316 debug!(target: "engine::tree", "persistence complete, signaling termination");
1317 return Ok(())
1318 }
1319
1320 debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
1321 self.persist_blocks(blocks_to_persist);
1322 }
1323 }
1324
1325 fn on_persistence_complete(
1327 &mut self,
1328 last_persisted_hash_num: Option<BlockNumHash>,
1329 start_time: Instant,
1330 ) -> Result<(), AdvancePersistenceError> {
1331 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1332
1333 let Some(BlockNumHash {
1334 hash: last_persisted_block_hash,
1335 number: last_persisted_block_number,
1336 }) = last_persisted_hash_num
1337 else {
1338 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1340 return Ok(())
1341 };
1342
1343 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1344 self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
1345 self.on_new_persisted_block()?;
1346 Ok(())
1347 }
1348
1349 fn on_engine_message(
1353 &mut self,
1354 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1355 ) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
1356 match msg {
1357 FromEngine::Event(event) => match event {
1358 FromOrchestrator::BackfillSyncStarted => {
1359 debug!(target: "engine::tree", "received backfill sync started event");
1360 self.backfill_sync_state = BackfillSyncState::Active;
1361 }
1362 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1363 self.on_backfill_sync_finished(ctrl)?;
1364 }
1365 FromOrchestrator::Terminate { tx } => {
1366 debug!(target: "engine::tree", "received terminate request");
1367 if let Err(err) = self.finish_termination(tx) {
1368 error!(target: "engine::tree", %err, "Termination failed");
1369 }
1370 return Ok(ops::ControlFlow::Break(()))
1371 }
1372 },
1373 FromEngine::Request(request) => {
1374 match request {
1375 EngineApiRequest::InsertExecutedBlock(block) => {
1376 let block_num_hash = block.recovered_block().num_hash();
1377 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1378 return Ok(ops::ControlFlow::Continue(()))
1380 }
1381
1382 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1383 let now = Instant::now();
1384
1385 if self.state.tree_state.canonical_block_hash() ==
1388 block.recovered_block().parent_hash()
1389 {
1390 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1391 self.canonical_in_memory_state.set_pending_block(block.clone());
1392 }
1393
1394 self.state.tree_state.insert_executed(block.clone());
1395 self.payload_validator.on_inserted_executed_block(block.clone());
1396 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1397 self.emit_event(EngineApiEvent::BeaconConsensus(
1398 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1399 ));
1400 }
1401 EngineApiRequest::Beacon(request) => {
1402 match request {
1403 BeaconEngineMessage::ForkchoiceUpdated {
1404 state,
1405 payload_attrs,
1406 tx,
1407 version,
1408 } => {
1409 let has_attrs = payload_attrs.is_some();
1410
1411 let start = Instant::now();
1412 let mut output =
1413 self.on_forkchoice_updated(state, payload_attrs, version);
1414
1415 if let Ok(res) = &mut output {
1416 self.state
1418 .forkchoice_state_tracker
1419 .set_latest(state, res.outcome.forkchoice_status());
1420
1421 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1423 state,
1424 res.outcome.forkchoice_status(),
1425 ));
1426
1427 self.on_maybe_tree_event(res.event.take())?;
1429 }
1430
1431 self.metrics.engine.forkchoice_updated.update_response_metrics(
1432 start,
1433 &mut self.metrics.engine.new_payload.latest_at,
1434 has_attrs,
1435 &output,
1436 );
1437
1438 if let Err(err) =
1439 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1440 {
1441 self.metrics
1442 .engine
1443 .failed_forkchoice_updated_response_deliveries
1444 .increment(1);
1445 error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1446 }
1447 }
1448 BeaconEngineMessage::NewPayload { payload, tx } => {
1449 let start = Instant::now();
1450 let gas_used = payload.gas_used();
1451 let num_hash = payload.num_hash();
1452 let mut output = self.on_new_payload(payload);
1453 self.metrics
1454 .engine
1455 .new_payload
1456 .update_response_metrics(start, &output, gas_used);
1457
1458 let maybe_event =
1459 output.as_mut().ok().and_then(|out| out.event.take());
1460
1461 if let Err(err) =
1463 tx.send(output.map(|o| o.outcome).map_err(|e| {
1464 BeaconOnNewPayloadError::Internal(Box::new(e))
1465 }))
1466 {
1467 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1468 self.metrics
1469 .engine
1470 .failed_new_payload_response_deliveries
1471 .increment(1);
1472 }
1473
1474 self.on_maybe_tree_event(maybe_event)?;
1476 }
1477 }
1478 }
1479 }
1480 }
1481 FromEngine::DownloadedBlocks(blocks) => {
1482 if let Some(event) = self.on_downloaded(blocks)? {
1483 self.on_tree_event(event)?;
1484 }
1485 }
1486 }
1487 Ok(ops::ControlFlow::Continue(()))
1488 }
1489
1490 fn on_backfill_sync_finished(
1504 &mut self,
1505 ctrl: ControlFlow,
1506 ) -> Result<(), InsertBlockFatalError> {
1507 debug!(target: "engine::tree", "received backfill sync finished event");
1508 self.backfill_sync_state = BackfillSyncState::Idle;
1509
1510 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1512 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1513 self.state.invalid_headers.insert(**bad_block);
1515
1516 Some(*target)
1518 } else {
1519 ctrl.block_number()
1521 };
1522
1523 let Some(backfill_height) = backfill_height else { return Ok(()) };
1525
1526 let Some(backfill_num_hash) = self
1532 .provider
1533 .block_hash(backfill_height)?
1534 .map(|hash| BlockNumHash { hash, number: backfill_height })
1535 else {
1536 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1537 return Ok(())
1538 };
1539
1540 if ctrl.is_unwind() {
1541 self.state.tree_state.reset(backfill_num_hash)
1544 } else {
1545 self.state.tree_state.remove_until(
1546 backfill_num_hash,
1547 self.persistence_state.last_persisted_block.hash,
1548 Some(backfill_num_hash),
1549 );
1550 }
1551
1552 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1553 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1554
1555 self.state.buffer.remove_old_blocks(backfill_height);
1557 self.canonical_in_memory_state.clear_state();
1560
1561 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1562 self.state.tree_state.set_canonical_head(new_head.num_hash());
1565 self.persistence_state.finish(new_head.hash(), new_head.number());
1566
1567 self.canonical_in_memory_state.set_canonical_head(new_head);
1569 }
1570
1571 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1574 else {
1575 return Ok(())
1576 };
1577 if sync_target_state.finalized_block_hash.is_zero() {
1578 return Ok(())
1580 }
1581 let newest_finalized = self
1583 .state
1584 .buffer
1585 .block(&sync_target_state.finalized_block_hash)
1586 .map(|block| block.number());
1587
1588 if let Some(backfill_target) =
1594 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1595 self.backfill_sync_target(progress, finalized_number, None)
1598 })
1599 {
1600 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1602 backfill_target.into(),
1603 )));
1604 return Ok(())
1605 };
1606
1607 if let Some(lowest_buffered) =
1609 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1610 {
1611 let current_head_num = self.state.tree_state.current_canonical_head.number;
1612 let target_head_num = lowest_buffered.number();
1613
1614 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1615 {
1616 debug!(
1618 target: "engine::tree",
1619 %current_head_num,
1620 %target_head_num,
1621 %distance,
1622 "Backfill complete, downloading remaining blocks to reach FCU target"
1623 );
1624
1625 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1626 lowest_buffered.parent_hash(),
1627 distance,
1628 )));
1629 return Ok(());
1630 }
1631 }
1632
1633 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1635 }
1636
1637 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1641 if let Some(chain_update) = self.on_new_head(target)? {
1642 self.on_canonical_chain_update(chain_update);
1643 }
1644
1645 Ok(())
1646 }
1647
1648 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1650 if let Some(event) = event {
1651 self.on_tree_event(event)?;
1652 }
1653
1654 Ok(())
1655 }
1656
1657 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1661 match event {
1662 TreeEvent::TreeAction(action) => match action {
1663 TreeAction::MakeCanonical { sync_target_head } => {
1664 self.make_canonical(sync_target_head)?;
1665 }
1666 },
1667 TreeEvent::BackfillAction(action) => {
1668 self.emit_event(EngineApiEvent::BackfillAction(action));
1669 }
1670 TreeEvent::Download(action) => {
1671 self.emit_event(EngineApiEvent::Download(action));
1672 }
1673 }
1674
1675 Ok(())
1676 }
1677
1678 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1680 let event = event.into();
1681
1682 if event.is_backfill_action() {
1683 debug_assert_eq!(
1684 self.backfill_sync_state,
1685 BackfillSyncState::Idle,
1686 "backfill action should only be emitted when backfill is idle"
1687 );
1688
1689 if self.persistence_state.in_progress() {
1690 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1693 return
1694 }
1695
1696 self.backfill_sync_state = BackfillSyncState::Pending;
1697 self.metrics.engine.pipeline_runs.increment(1);
1698 debug!(target: "engine::tree", "emitting backfill action event");
1699 }
1700
1701 let _ = self.outgoing.send(event).inspect_err(
1702 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1703 );
1704 }
1705
1706 pub const fn should_persist(&self) -> bool {
1710 if !self.backfill_sync_state.is_idle() {
1711 return false
1713 }
1714
1715 let min_block = self.persistence_state.last_persisted_block.number;
1716 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1717 self.config.persistence_threshold()
1718 }
1719
1720 fn get_canonical_blocks_to_persist(
1723 &self,
1724 target: PersistTarget,
1725 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1726 debug_assert!(!self.persistence_state.in_progress());
1729
1730 let mut blocks_to_persist = Vec::new();
1731 let mut current_hash = self.state.tree_state.canonical_block_hash();
1732 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1733 let canonical_head_number = self.state.tree_state.canonical_block_number();
1734
1735 let target_number = match target {
1736 PersistTarget::Head => canonical_head_number,
1737 PersistTarget::Threshold => {
1738 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
1739 }
1740 };
1741
1742 debug!(
1743 target: "engine::tree",
1744 ?current_hash,
1745 ?last_persisted_number,
1746 ?canonical_head_number,
1747 ?target_number,
1748 "Returning canonical blocks to persist"
1749 );
1750 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1751 if block.recovered_block().number() <= last_persisted_number {
1752 break;
1753 }
1754
1755 if block.recovered_block().number() <= target_number {
1756 blocks_to_persist.push(block.clone());
1757 }
1758
1759 current_hash = block.recovered_block().parent_hash();
1760 }
1761
1762 blocks_to_persist.reverse();
1764
1765 Ok(blocks_to_persist)
1766 }
1767
1768 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1776 if let Some(remove_above) = self.find_disk_reorg()? {
1779 self.remove_blocks(remove_above);
1780 return Ok(())
1781 }
1782
1783 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1784 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1785 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1786 number: self.persistence_state.last_persisted_block.number,
1787 hash: self.persistence_state.last_persisted_block.hash,
1788 });
1789 Ok(())
1790 }
1791
1792 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1799 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1800 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1802 return Ok(Some(block.clone()))
1803 }
1804
1805 let (block, senders) = self
1806 .provider
1807 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1808 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1809 .split_sealed();
1810 let execution_output = self
1811 .provider
1812 .get_state(block.header().number())?
1813 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1814 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1815 let trie_updates = self.provider.get_block_trie_updates(block.number())?;
1816
1817 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
1818 let sorted_trie_updates = Arc::new(trie_updates);
1819 let trie_data =
1821 ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
1822
1823 Ok(Some(ExecutedBlock::new(
1824 Arc::new(RecoveredBlock::new_sealed(block, senders)),
1825 Arc::new(execution_output),
1826 trie_data,
1827 )))
1828 }
1829
1830 fn sealed_header_by_hash(
1832 &self,
1833 hash: B256,
1834 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1835 let header = self.state.tree_state.sealed_header_by_hash(&hash);
1837
1838 if header.is_some() {
1839 Ok(header)
1840 } else {
1841 self.provider.sealed_header_by_hash(hash)
1842 }
1843 }
1844
1845 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1852 self.state
1853 .buffer
1854 .lowest_ancestor(&hash)
1855 .map(|block| block.parent_hash())
1856 .unwrap_or_else(|| hash)
1857 }
1858
1859 fn latest_valid_hash_for_invalid_payload(
1870 &mut self,
1871 parent_hash: B256,
1872 ) -> ProviderResult<Option<B256>> {
1873 if self.sealed_header_by_hash(parent_hash)?.is_some() {
1875 return Ok(Some(parent_hash))
1876 }
1877
1878 let mut current_hash = parent_hash;
1881 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1882 while let Some(block_with_parent) = current_block {
1883 current_hash = block_with_parent.parent;
1884 current_block = self.state.invalid_headers.get(¤t_hash);
1885
1886 if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1889 return Ok(Some(current_hash))
1890 }
1891 }
1892 Ok(None)
1893 }
1894
1895 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1899 if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1902 !parent.difficulty().is_zero()
1903 {
1904 parent_hash = B256::ZERO;
1905 }
1906
1907 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1908 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1909 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1910 })
1911 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1912 }
1913
1914 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1918 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1919 return target.head_block_hash == block_hash
1920 }
1921 false
1922 }
1923
1924 fn is_any_sync_target(&self, block_hash: B256) -> bool {
1928 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1929 return target.contains(block_hash)
1930 }
1931 false
1932 }
1933
1934 fn check_invalid_ancestor_with_head(
1940 &mut self,
1941 check: B256,
1942 head: &SealedBlock<N::Block>,
1943 ) -> ProviderResult<Option<PayloadStatus>> {
1944 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1946
1947 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1948 }
1949
1950 fn on_invalid_new_payload(
1952 &mut self,
1953 head: SealedBlock<N::Block>,
1954 invalid: BlockWithParent,
1955 ) -> ProviderResult<PayloadStatus> {
1956 let status = self.prepare_invalid_response(invalid.parent)?;
1958
1959 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1961 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1962
1963 Ok(status)
1964 }
1965
1966 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1978 let parent_hash = payload.parent_hash();
1979 let block_hash = payload.block_hash();
1980 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1981 if lowest_buffered_ancestor == block_hash {
1982 lowest_buffered_ancestor = parent_hash;
1983 }
1984
1985 self.state.invalid_headers.get(&lowest_buffered_ancestor)
1987 }
1988
1989 fn handle_invalid_ancestor_payload(
1998 &mut self,
1999 payload: T::ExecutionData,
2000 invalid: BlockWithParent,
2001 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2002 let parent_hash = payload.parent_hash();
2003 let num_hash = payload.num_hash();
2004
2005 let block = match self.payload_validator.convert_payload_to_block(payload) {
2011 Ok(block) => block,
2012 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
2013 };
2014
2015 Ok(self.on_invalid_new_payload(block, invalid)?)
2016 }
2017
2018 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2021 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2023
2024 match self.prepare_invalid_response(header.parent) {
2026 Ok(status) => Ok(Some(status)),
2027 Err(err) => {
2028 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2029 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2031 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2032 })))
2033 }
2034 }
2035 }
2036
2037 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2040 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2041 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2042 return Err(e)
2043 }
2044
2045 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2046 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2047 return Err(e)
2048 }
2049
2050 Ok(())
2051 }
2052
2053 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2055 fn try_connect_buffered_blocks(
2056 &mut self,
2057 parent: BlockNumHash,
2058 ) -> Result<(), InsertBlockFatalError> {
2059 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2060
2061 if blocks.is_empty() {
2062 return Ok(())
2064 }
2065
2066 let now = Instant::now();
2067 let block_count = blocks.len();
2068 for child in blocks {
2069 let child_num_hash = child.num_hash();
2070 match self.insert_block(child) {
2071 Ok(res) => {
2072 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2073 if self.is_any_sync_target(child_num_hash.hash) &&
2074 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2075 {
2076 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2077 self.make_canonical(child_num_hash.hash)?;
2080 }
2081 }
2082 Err(err) => {
2083 if let InsertPayloadError::Block(err) = err {
2084 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2085 if let Err(fatal) = self.on_insert_block_error(err) {
2086 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2087 return Err(fatal)
2088 }
2089 }
2090 }
2091 }
2092 }
2093
2094 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2095 Ok(())
2096 }
2097
2098 fn buffer_block(
2100 &mut self,
2101 block: SealedBlock<N::Block>,
2102 ) -> Result<(), InsertBlockError<N::Block>> {
2103 if let Err(err) = self.validate_block(&block) {
2104 return Err(InsertBlockError::consensus_error(err, block))
2105 }
2106 self.state.buffer.insert_block(block);
2107 Ok(())
2108 }
2109
2110 #[inline]
2115 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2116 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2117 }
2118
2119 #[inline]
2122 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2123 if block > local_tip {
2124 Some(block - local_tip)
2125 } else {
2126 None
2127 }
2128 }
2129
2130 fn backfill_sync_target(
2137 &self,
2138 canonical_tip_num: u64,
2139 target_block_number: u64,
2140 downloaded_block: Option<BlockNumHash>,
2141 ) -> Option<B256> {
2142 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2143
2144 let exceeds_backfill_threshold =
2146 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2147 (Some(downloaded_block), Some(state))
2149 if downloaded_block.hash == state.finalized_block_hash =>
2150 {
2151 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2152 }
2153 _ => match sync_target_state
2154 .as_ref()
2155 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2156 {
2157 Some(buffered_finalized) => {
2158 self.exceeds_backfill_run_threshold(
2161 canonical_tip_num,
2162 buffered_finalized.number(),
2163 )
2164 }
2165 None => {
2166 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2168 }
2169 },
2170 };
2171
2172 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2174 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2176 Err(err) => {
2177 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2178 }
2179 Ok(None) => {
2180 if !state.finalized_block_hash.is_zero() {
2182 return Some(state.finalized_block_hash)
2185 }
2186
2187 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2200 return Some(state.head_block_hash)
2201 }
2202 Ok(Some(_)) => {
2203 }
2205 }
2206 }
2207
2208 None
2209 }
2210
2211 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2214 let mut canonical = self.state.tree_state.current_canonical_head;
2215 let mut persisted = self.persistence_state.last_persisted_block;
2216
2217 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2218 Ok(self
2219 .sealed_header_by_hash(num_hash.hash)?
2220 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2221 .parent_num_hash())
2222 };
2223
2224 while canonical.number > persisted.number {
2227 canonical = parent_num_hash(canonical)?;
2228 }
2229
2230 if canonical == persisted {
2232 return Ok(None);
2233 }
2234
2235 while persisted.number > canonical.number {
2241 persisted = parent_num_hash(persisted)?;
2242 }
2243
2244 debug_assert_eq!(persisted.number, canonical.number);
2245
2246 while persisted.hash != canonical.hash {
2248 canonical = parent_num_hash(canonical)?;
2249 persisted = parent_num_hash(persisted)?;
2250 }
2251
2252 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2253
2254 Ok(Some(persisted.number))
2255 }
2256
2257 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2261 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2262 let start = Instant::now();
2263
2264 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2266
2267 let tip = chain_update.tip().clone_sealed_header();
2268 let notification = chain_update.to_chain_notification();
2269
2270 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2272 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2273 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2274 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2275
2276 self.update_reorg_metrics(old.len());
2277 self.reinsert_reorged_blocks(new.clone());
2278 self.reinsert_reorged_blocks(old.clone());
2279 }
2280
2281 self.canonical_in_memory_state.update_chain(chain_update);
2283 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2284
2285 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2287
2288 self.canonical_in_memory_state.notify_canon_state(notification);
2290
2291 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2293 Box::new(tip),
2294 start.elapsed(),
2295 ));
2296 }
2297
2298 fn update_reorg_metrics(&self, old_chain_length: usize) {
2300 self.metrics.tree.reorgs.increment(1);
2301 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2302 }
2303
2304 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2306 for block in new_chain {
2307 if self
2308 .state
2309 .tree_state
2310 .executed_block_by_hash(block.recovered_block().hash())
2311 .is_none()
2312 {
2313 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2314 self.state.tree_state.insert_executed(block);
2315 }
2316 }
2317 }
2318
2319 fn on_disconnected_downloaded_block(
2324 &self,
2325 downloaded_block: BlockNumHash,
2326 missing_parent: BlockNumHash,
2327 head: BlockNumHash,
2328 ) -> Option<TreeEvent> {
2329 if let Some(target) =
2331 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2332 {
2333 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2334 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2335 }
2336
2337 let request = if let Some(distance) =
2347 self.distance_from_local_tip(head.number, missing_parent.number)
2348 {
2349 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2350 DownloadRequest::BlockRange(missing_parent.hash, distance)
2351 } else {
2352 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2353 DownloadRequest::single_block(missing_parent.hash)
2356 };
2357
2358 Some(TreeEvent::Download(request))
2359 }
2360
2361 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2367 fn on_downloaded_block(
2368 &mut self,
2369 block: SealedBlock<N::Block>,
2370 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2371 let block_num_hash = block.num_hash();
2372 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2373 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2374 return Ok(None)
2375 }
2376
2377 if !self.backfill_sync_state.is_idle() {
2378 return Ok(None)
2379 }
2380
2381 match self.insert_block(block) {
2383 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2384 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2387 sync_target.contains(block_num_hash.hash)
2388 {
2389 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2390
2391 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2394 sync_target_head: block_num_hash.hash,
2395 })))
2396 }
2397 trace!(target: "engine::tree", "appended downloaded block");
2398 self.try_connect_buffered_blocks(block_num_hash)?;
2399 }
2400 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2401 return Ok(self.on_disconnected_downloaded_block(
2404 block_num_hash,
2405 missing_ancestor,
2406 head,
2407 ))
2408 }
2409 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2410 trace!(target: "engine::tree", "downloaded block already executed");
2411 }
2412 Err(err) => {
2413 if let InsertPayloadError::Block(err) = err {
2414 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2415 if let Err(fatal) = self.on_insert_block_error(err) {
2416 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2417 return Err(fatal)
2418 }
2419 }
2420 }
2421 }
2422 Ok(None)
2423 }
2424
2425 fn insert_payload(
2434 &mut self,
2435 payload: T::ExecutionData,
2436 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2437 self.insert_block_or_payload(
2438 payload.block_with_parent(),
2439 payload,
2440 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2441 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2442 )
2443 }
2444
2445 fn insert_block(
2446 &mut self,
2447 block: SealedBlock<N::Block>,
2448 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2449 self.insert_block_or_payload(
2450 block.block_with_parent(),
2451 block,
2452 |validator, block, ctx| validator.validate_block(block, ctx),
2453 |_, block| Ok(block),
2454 )
2455 }
2456
2457 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2474 fn insert_block_or_payload<Input, Err>(
2475 &mut self,
2476 block_id: BlockWithParent,
2477 input: Input,
2478 execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2479 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2480 ) -> Result<InsertPayloadOk, Err>
2481 where
2482 Err: From<InsertBlockError<N::Block>>,
2483 {
2484 let block_insert_start = Instant::now();
2485 let block_num_hash = block_id.block;
2486 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2487
2488 match self.sealed_header_by_hash(block_num_hash.hash) {
2489 Err(err) => {
2490 let block = convert_to_block(self, input)?;
2491 return Err(InsertBlockError::new(block, err.into()).into());
2492 }
2493 Ok(Some(_)) => {
2494 convert_to_block(self, input)?;
2497 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2498 }
2499 _ => {}
2500 };
2501
2502 match self.state_provider_builder(block_id.parent) {
2504 Err(err) => {
2505 let block = convert_to_block(self, input)?;
2506 return Err(InsertBlockError::new(block, err.into()).into());
2507 }
2508 Ok(None) => {
2509 let block = convert_to_block(self, input)?;
2510
2511 let missing_ancestor = self
2514 .state
2515 .buffer
2516 .lowest_ancestor(&block.parent_hash())
2517 .map(|block| block.parent_num_hash())
2518 .unwrap_or_else(|| block.parent_num_hash());
2519
2520 self.state.buffer.insert_block(block);
2521
2522 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2523 head: self.state.tree_state.current_canonical_head,
2524 missing_ancestor,
2525 }))
2526 }
2527 Ok(Some(_)) => {}
2528 }
2529
2530 let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
2535
2536 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2537
2538 let start = Instant::now();
2539
2540 let executed = execute(&mut self.payload_validator, input, ctx)?;
2541
2542 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2544 {
2545 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2546 self.canonical_in_memory_state.set_pending_block(executed.clone());
2547 }
2548
2549 self.state.tree_state.insert_executed(executed.clone());
2550 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2551
2552 let elapsed = start.elapsed();
2554 let engine_event = if is_fork {
2555 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2556 } else {
2557 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2558 };
2559 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2560
2561 self.metrics
2562 .engine
2563 .block_insert_total_duration
2564 .record(block_insert_start.elapsed().as_secs_f64());
2565 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2566 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2567 }
2568
2569 fn on_insert_block_error(
2575 &mut self,
2576 error: InsertBlockError<N::Block>,
2577 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2578 let (block, error) = error.split();
2579
2580 let validation_err = error.ensure_validation_error()?;
2583
2584 warn!(
2588 target: "engine::tree",
2589 invalid_hash=%block.hash(),
2590 invalid_number=block.number(),
2591 %validation_err,
2592 "Invalid block error on new payload",
2593 );
2594 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2595
2596 self.state.invalid_headers.insert(block.block_with_parent());
2598 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2599 Box::new(block),
2600 )));
2601
2602 Ok(PayloadStatus::new(
2603 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2604 latest_valid_hash,
2605 ))
2606 }
2607
2608 fn on_new_payload_error(
2610 &mut self,
2611 error: NewPayloadError,
2612 payload_num_hash: NumHash,
2613 parent_hash: B256,
2614 ) -> ProviderResult<PayloadStatus> {
2615 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2616 let latest_valid_hash =
2619 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2620 None
2624 } else {
2625 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2626 };
2627
2628 let status = PayloadStatusEnum::from(error);
2629 Ok(PayloadStatus::new(status, latest_valid_hash))
2630 }
2631
2632 pub fn find_canonical_header(
2634 &self,
2635 hash: B256,
2636 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2637 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2638
2639 if canonical.is_none() {
2640 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2641 }
2642
2643 Ok(canonical)
2644 }
2645
2646 fn update_finalized_block(
2648 &self,
2649 finalized_block_hash: B256,
2650 ) -> Result<(), OnForkChoiceUpdated> {
2651 if finalized_block_hash.is_zero() {
2652 return Ok(())
2653 }
2654
2655 match self.find_canonical_header(finalized_block_hash) {
2656 Ok(None) => {
2657 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2658 return Err(OnForkChoiceUpdated::invalid_state())
2660 }
2661 Ok(Some(finalized)) => {
2662 if Some(finalized.num_hash()) !=
2663 self.canonical_in_memory_state.get_finalized_num_hash()
2664 {
2665 let _ = self.persistence.save_finalized_block_number(finalized.number());
2668 self.canonical_in_memory_state.set_finalized(finalized.clone());
2669 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2671 }
2672 }
2673 Err(err) => {
2674 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2675 }
2676 }
2677
2678 Ok(())
2679 }
2680
2681 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2683 if safe_block_hash.is_zero() {
2684 return Ok(())
2685 }
2686
2687 match self.find_canonical_header(safe_block_hash) {
2688 Ok(None) => {
2689 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2690 return Err(OnForkChoiceUpdated::invalid_state())
2692 }
2693 Ok(Some(safe)) => {
2694 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2695 let _ = self.persistence.save_safe_block_number(safe.number());
2698 self.canonical_in_memory_state.set_safe(safe.clone());
2699 self.metrics.tree.safe_block_height.set(safe.number() as f64);
2701 }
2702 }
2703 Err(err) => {
2704 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2705 }
2706 }
2707
2708 Ok(())
2709 }
2710
2711 fn ensure_consistent_forkchoice_state(
2720 &self,
2721 state: ForkchoiceState,
2722 ) -> Result<(), OnForkChoiceUpdated> {
2723 self.update_finalized_block(state.finalized_block_hash)?;
2729
2730 self.update_safe_block(state.safe_block_hash)
2736 }
2737
2738 fn process_payload_attributes(
2743 &self,
2744 attrs: T::PayloadAttributes,
2745 head: &N::BlockHeader,
2746 state: ForkchoiceState,
2747 version: EngineApiMessageVersion,
2748 ) -> OnForkChoiceUpdated {
2749 if let Err(err) =
2750 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2751 {
2752 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2753 return OnForkChoiceUpdated::invalid_payload_attributes()
2754 }
2755
2756 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2761 state.head_block_hash,
2762 attrs,
2763 version as u8,
2764 ) {
2765 Ok(attributes) => {
2766 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2769
2770 OnForkChoiceUpdated::updated_with_pending_payload_id(
2782 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2783 pending_payload_id,
2784 )
2785 }
2786 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2787 }
2788 }
2789
2790 pub(crate) fn remove_before(
2797 &mut self,
2798 upper_bound: BlockNumHash,
2799 finalized_hash: Option<B256>,
2800 ) -> ProviderResult<()> {
2801 let num = if let Some(hash) = finalized_hash {
2804 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2805 } else {
2806 None
2807 };
2808
2809 self.state.tree_state.remove_until(
2810 upper_bound,
2811 self.persistence_state.last_persisted_block.hash,
2812 num,
2813 );
2814 Ok(())
2815 }
2816
2817 pub fn state_provider_builder(
2822 &self,
2823 hash: B256,
2824 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2825 where
2826 P: BlockReader + StateProviderFactory + StateReader + Clone,
2827 {
2828 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2829 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2830 return Ok(Some(StateProviderBuilder::new(
2832 self.provider.clone(),
2833 historical,
2834 Some(blocks),
2835 )))
2836 }
2837
2838 if let Some(header) = self.provider.header(hash)? {
2840 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2841 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2844 }
2845
2846 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2847 Ok(None)
2848 }
2849}
2850
2851#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2857pub enum BlockStatus {
2858 Valid,
2860 Disconnected {
2862 head: BlockNumHash,
2864 missing_ancestor: BlockNumHash,
2866 },
2867}
2868
2869#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2874pub enum InsertPayloadOk {
2875 AlreadySeen(BlockStatus),
2877 Inserted(BlockStatus),
2879}
2880
2881#[derive(Debug, Clone, Copy)]
2883enum PersistTarget {
2884 Threshold,
2886 Head,
2888}