1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 chain::FromOrchestrator,
4 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
5 persistence::PersistenceHandle,
6 tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
7};
8use alloy_consensus::BlockHeader;
9use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
10use alloy_evm::block::StateChangeSource;
11use alloy_primitives::B256;
12use alloy_rpc_types_engine::{
13 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
14};
15use error::{InsertBlockError, InsertBlockFatalError};
16use reth_chain_state::{
17 CanonicalInMemoryState, ComputedTrieData, ExecutedBlock, MemoryOverlayStateProvider,
18 NewCanonicalChain,
19};
20use reth_consensus::{Consensus, FullConsensus};
21use reth_engine_primitives::{
22 BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
23 ForkchoiceStateTracker, OnForkChoiceUpdated,
24};
25use reth_errors::{ConsensusError, ProviderResult};
26use reth_evm::{ConfigureEvm, OnStateHook};
27use reth_payload_builder::PayloadBuilderHandle;
28use reth_payload_primitives::{
29 BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
30};
31use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
32use reth_provider::{
33 BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
34 StateProviderFactory, StateReader, TransactionVariant, TrieReader,
35};
36use reth_revm::database::StateProviderDatabase;
37use reth_stages_api::ControlFlow;
38use revm::state::EvmState;
39use state::TreeState;
40use std::{
41 fmt::Debug,
42 sync::{
43 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
44 Arc,
45 },
46 time::Instant,
47};
48use tokio::sync::{
49 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
50 oneshot::{self, error::TryRecvError},
51};
52use tracing::*;
53
54mod block_buffer;
55mod cached_state;
56pub mod error;
57pub mod instrumented_state;
58mod invalid_headers;
59mod metrics;
60mod payload_processor;
61pub mod payload_validator;
62mod persistence_state;
63pub mod precompile_cache;
64#[cfg(test)]
65mod tests;
66#[expect(unused)]
68mod trie_updates;
69
70use crate::tree::error::AdvancePersistenceError;
71pub use block_buffer::BlockBuffer;
72pub use invalid_headers::InvalidHeaderCache;
73pub use payload_processor::*;
74pub use payload_validator::{BasicEngineValidator, EngineValidator};
75pub use persistence_state::PersistenceState;
76pub use reth_engine_primitives::TreeConfig;
77
78pub mod state;
79
80pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
90
91#[derive(Clone, Debug)]
93pub struct StateProviderBuilder<N: NodePrimitives, P> {
94 provider_factory: P,
96 historical: B256,
98 overlay: Option<Vec<ExecutedBlock<N>>>,
100}
101
102impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
103 pub const fn new(
106 provider_factory: P,
107 historical: B256,
108 overlay: Option<Vec<ExecutedBlock<N>>>,
109 ) -> Self {
110 Self { provider_factory, historical, overlay }
111 }
112}
113
114impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
115where
116 P: BlockReader + StateProviderFactory + StateReader + Clone,
117{
118 pub fn build(&self) -> ProviderResult<StateProviderBox> {
120 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
121 if let Some(overlay) = self.overlay.clone() {
122 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
123 }
124 Ok(provider)
125 }
126}
127
128#[derive(Debug)]
132pub struct EngineApiTreeState<N: NodePrimitives> {
133 tree_state: TreeState<N>,
135 forkchoice_state_tracker: ForkchoiceStateTracker,
137 buffer: BlockBuffer<N::Block>,
139 invalid_headers: InvalidHeaderCache,
142}
143
144impl<N: NodePrimitives> EngineApiTreeState<N> {
145 fn new(
146 block_buffer_limit: u32,
147 max_invalid_header_cache_length: u32,
148 canonical_block: BlockNumHash,
149 engine_kind: EngineApiKind,
150 ) -> Self {
151 Self {
152 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
153 buffer: BlockBuffer::new(block_buffer_limit),
154 tree_state: TreeState::new(canonical_block, engine_kind),
155 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
156 }
157 }
158}
159
160#[derive(Debug)]
162pub struct TreeOutcome<T> {
163 pub outcome: T,
165 pub event: Option<TreeEvent>,
167}
168
169impl<T> TreeOutcome<T> {
170 pub const fn new(outcome: T) -> Self {
172 Self { outcome, event: None }
173 }
174
175 pub fn with_event(mut self, event: TreeEvent) -> Self {
177 self.event = Some(event);
178 self
179 }
180}
181
182#[derive(Debug)]
184pub enum TreeEvent {
185 TreeAction(TreeAction),
187 BackfillAction(BackfillAction),
189 Download(DownloadRequest),
191}
192
193impl TreeEvent {
194 const fn is_backfill_action(&self) -> bool {
196 matches!(self, Self::BackfillAction(_))
197 }
198}
199
200#[derive(Debug)]
202pub enum TreeAction {
203 MakeCanonical {
205 sync_target_head: B256,
207 },
208}
209
210struct MeteredStateHook {
212 metrics: reth_evm::metrics::ExecutorMetrics,
213 inner_hook: Box<dyn OnStateHook>,
214}
215
216impl OnStateHook for MeteredStateHook {
217 fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
218 let accounts = state.keys().len();
220 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
221 let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
222
223 self.metrics.accounts_loaded_histogram.record(accounts as f64);
224 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
225 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
226
227 self.inner_hook.on_state(source, state);
229 }
230}
231
232pub struct EngineApiTreeHandler<N, P, T, V, C>
237where
238 N: NodePrimitives,
239 T: PayloadTypes,
240 C: ConfigureEvm<Primitives = N> + 'static,
241{
242 provider: P,
243 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
244 payload_validator: V,
245 state: EngineApiTreeState<N>,
247 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
256 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
258 outgoing: UnboundedSender<EngineApiEvent<N>>,
260 persistence: PersistenceHandle<N>,
262 persistence_state: PersistenceState,
264 backfill_sync_state: BackfillSyncState,
266 canonical_in_memory_state: CanonicalInMemoryState<N>,
269 payload_builder: PayloadBuilderHandle<T>,
272 config: TreeConfig,
274 metrics: EngineApiMetrics,
276 engine_kind: EngineApiKind,
278 evm_config: C,
280}
281
282impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
283 for EngineApiTreeHandler<N, P, T, V, C>
284where
285 N: NodePrimitives,
286 C: Debug + ConfigureEvm<Primitives = N>,
287{
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 f.debug_struct("EngineApiTreeHandler")
290 .field("provider", &self.provider)
291 .field("consensus", &self.consensus)
292 .field("payload_validator", &self.payload_validator)
293 .field("state", &self.state)
294 .field("incoming_tx", &self.incoming_tx)
295 .field("persistence", &self.persistence)
296 .field("persistence_state", &self.persistence_state)
297 .field("backfill_sync_state", &self.backfill_sync_state)
298 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
299 .field("payload_builder", &self.payload_builder)
300 .field("config", &self.config)
301 .field("metrics", &self.metrics)
302 .field("engine_kind", &self.engine_kind)
303 .field("evm_config", &self.evm_config)
304 .finish()
305 }
306}
307
308impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
309where
310 N: NodePrimitives,
311 P: DatabaseProviderFactory
312 + BlockReader<Block = N::Block, Header = N::BlockHeader>
313 + StateProviderFactory
314 + StateReader<Receipt = N::Receipt>
315 + HashedPostStateProvider
316 + TrieReader
317 + Clone
318 + 'static,
319 <P as DatabaseProviderFactory>::Provider:
320 BlockReader<Block = N::Block, Header = N::BlockHeader>,
321 C: ConfigureEvm<Primitives = N> + 'static,
322 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
323 V: EngineValidator<T>,
324{
325 #[expect(clippy::too_many_arguments)]
327 pub fn new(
328 provider: P,
329 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
330 payload_validator: V,
331 outgoing: UnboundedSender<EngineApiEvent<N>>,
332 state: EngineApiTreeState<N>,
333 canonical_in_memory_state: CanonicalInMemoryState<N>,
334 persistence: PersistenceHandle<N>,
335 persistence_state: PersistenceState,
336 payload_builder: PayloadBuilderHandle<T>,
337 config: TreeConfig,
338 engine_kind: EngineApiKind,
339 evm_config: C,
340 ) -> Self {
341 let (incoming_tx, incoming) = std::sync::mpsc::channel();
342
343 Self {
344 provider,
345 consensus,
346 payload_validator,
347 incoming,
348 outgoing,
349 persistence,
350 persistence_state,
351 backfill_sync_state: BackfillSyncState::Idle,
352 state,
353 canonical_in_memory_state,
354 payload_builder,
355 config,
356 metrics: Default::default(),
357 incoming_tx,
358 engine_kind,
359 evm_config,
360 }
361 }
362
363 #[expect(clippy::complexity)]
369 pub fn spawn_new(
370 provider: P,
371 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
372 payload_validator: V,
373 persistence: PersistenceHandle<N>,
374 payload_builder: PayloadBuilderHandle<T>,
375 canonical_in_memory_state: CanonicalInMemoryState<N>,
376 config: TreeConfig,
377 kind: EngineApiKind,
378 evm_config: C,
379 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
380 {
381 let best_block_number = provider.best_block_number().unwrap_or(0);
382 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
383
384 let persistence_state = PersistenceState {
385 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
386 rx: None,
387 };
388
389 let (tx, outgoing) = unbounded_channel();
390 let state = EngineApiTreeState::new(
391 config.block_buffer_limit(),
392 config.max_invalid_header_cache_length(),
393 header.num_hash(),
394 kind,
395 );
396
397 let task = Self::new(
398 provider,
399 consensus,
400 payload_validator,
401 tx,
402 state,
403 canonical_in_memory_state,
404 persistence,
405 persistence_state,
406 payload_builder,
407 config,
408 kind,
409 evm_config,
410 );
411 let incoming = task.incoming_tx.clone();
412 std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
413 (incoming, outgoing)
414 }
415
416 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
418 self.incoming_tx.clone()
419 }
420
421 pub fn run(mut self) {
425 loop {
426 match self.try_recv_engine_message() {
427 Ok(Some(msg)) => {
428 debug!(target: "engine::tree", %msg, "received new engine message");
429 if let Err(fatal) = self.on_engine_message(msg) {
430 error!(target: "engine::tree", %fatal, "insert block fatal error");
431 return
432 }
433 }
434 Ok(None) => {
435 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
436 }
437 Err(_err) => {
438 error!(target: "engine::tree", "Engine channel disconnected");
439 return
440 }
441 }
442
443 if let Err(err) = self.advance_persistence() {
444 error!(target: "engine::tree", %err, "Advancing persistence failed");
445 return
446 }
447 }
448 }
449
450 fn on_downloaded(
456 &mut self,
457 mut blocks: Vec<SealedBlock<N::Block>>,
458 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
459 if blocks.is_empty() {
460 return Ok(None)
462 }
463
464 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
465 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
466 for block in blocks.drain(..batch) {
467 if let Some(event) = self.on_downloaded_block(block)? {
468 let needs_backfill = event.is_backfill_action();
469 self.on_tree_event(event)?;
470 if needs_backfill {
471 return Ok(None)
473 }
474 }
475 }
476
477 if !blocks.is_empty() {
479 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
480 }
481
482 Ok(None)
483 }
484
485 #[instrument(
500 level = "debug",
501 target = "engine::tree",
502 skip_all,
503 fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
504 )]
505 fn on_new_payload(
506 &mut self,
507 payload: T::ExecutionData,
508 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
509 trace!(target: "engine::tree", "invoked new payload");
510
511 let start = Instant::now();
513
514 let num_hash = payload.num_hash();
541 let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
542 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
543
544 let block_hash = num_hash.hash;
545
546 if let Some(invalid) = self.find_invalid_ancestor(&payload) {
548 let status = self.handle_invalid_ancestor_payload(payload, invalid)?;
549 return Ok(TreeOutcome::new(status));
550 }
551
552 self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
554
555 let status = if self.backfill_sync_state.is_idle() {
556 self.try_insert_payload(payload)?
557 } else {
558 self.try_buffer_payload(payload)?
559 };
560
561 let mut outcome = TreeOutcome::new(status);
562 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
564 if self.state.tree_state.canonical_block_hash() != block_hash {
566 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
567 sync_target_head: block_hash,
568 }));
569 }
570 }
571
572 self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
574
575 Ok(outcome)
576 }
577
578 #[instrument(level = "debug", target = "engine::tree", skip_all)]
585 fn try_insert_payload(
586 &mut self,
587 payload: T::ExecutionData,
588 ) -> Result<PayloadStatus, InsertBlockFatalError> {
589 let block_hash = payload.block_hash();
590 let num_hash = payload.num_hash();
591 let parent_hash = payload.parent_hash();
592 let mut latest_valid_hash = None;
593
594 match self.insert_payload(payload) {
595 Ok(status) => {
596 let status = match status {
597 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
598 latest_valid_hash = Some(block_hash);
599 self.try_connect_buffered_blocks(num_hash)?;
600 PayloadStatusEnum::Valid
601 }
602 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
603 latest_valid_hash = Some(block_hash);
604 PayloadStatusEnum::Valid
605 }
606 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
607 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
608 PayloadStatusEnum::Syncing
610 }
611 };
612
613 Ok(PayloadStatus::new(status, latest_valid_hash))
614 }
615 Err(error) => match error {
616 InsertPayloadError::Block(error) => Ok(self.on_insert_block_error(error)?),
617 InsertPayloadError::Payload(error) => {
618 Ok(self.on_new_payload_error(error, num_hash, parent_hash)?)
619 }
620 },
621 }
622 }
623
624 fn try_buffer_payload(
633 &mut self,
634 payload: T::ExecutionData,
635 ) -> Result<PayloadStatus, InsertBlockFatalError> {
636 let parent_hash = payload.parent_hash();
637 let num_hash = payload.num_hash();
638
639 match self.payload_validator.convert_payload_to_block(payload) {
640 Ok(block) => {
642 if let Err(error) = self.buffer_block(block) {
643 Ok(self.on_insert_block_error(error)?)
644 } else {
645 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
646 }
647 }
648 Err(error) => Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
649 }
650 }
651
652 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
659 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
661 debug!(target: "engine::tree", new_head=?new_head, "New head block not found in inmemory tree state");
662 self.metrics.engine.executed_new_block_cache_miss.increment(1);
663 return Ok(None)
664 };
665
666 let new_head_number = new_head_block.recovered_block().number();
667 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
668
669 let mut new_chain = vec![new_head_block.clone()];
670 let mut current_hash = new_head_block.recovered_block().parent_hash();
671 let mut current_number = new_head_number - 1;
672
673 while current_number > current_canonical_number {
678 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
679 {
680 current_hash = block.recovered_block().parent_hash();
681 current_number -= 1;
682 new_chain.push(block);
683 } else {
684 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
685 return Ok(None)
688 }
689 }
690
691 if current_hash == self.state.tree_state.current_canonical_head.hash {
694 new_chain.reverse();
695
696 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
698 }
699
700 let mut old_chain = Vec::new();
702 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
703
704 while current_canonical_number > current_number {
707 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
708 old_hash = block.recovered_block().parent_hash();
709 old_chain.push(block);
710 current_canonical_number -= 1;
711 } else {
712 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
714 return Ok(None)
715 }
716 }
717
718 debug_assert_eq!(current_number, current_canonical_number);
720
721 while old_hash != current_hash {
724 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
725 old_hash = block.recovered_block().parent_hash();
726 old_chain.push(block);
727 } else {
728 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
730 return Ok(None)
731 }
732
733 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
734 {
735 current_hash = block.recovered_block().parent_hash();
736 new_chain.push(block);
737 } else {
738 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
740 return Ok(None)
741 }
742 }
743 new_chain.reverse();
744 old_chain.reverse();
745
746 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
747 }
748
749 fn update_latest_block_to_canonical_ancestor(
761 &mut self,
762 canonical_header: &SealedHeader<N::BlockHeader>,
763 ) -> ProviderResult<()> {
764 debug!(target: "engine::tree", head = ?canonical_header.num_hash(), "Update latest block to canonical ancestor");
765 let current_head_number = self.state.tree_state.canonical_block_number();
766 let new_head_number = canonical_header.number();
767 let new_head_hash = canonical_header.hash();
768
769 self.state.tree_state.set_canonical_head(canonical_header.num_hash());
771
772 if new_head_number < current_head_number {
774 debug!(
775 target: "engine::tree",
776 current_head = current_head_number,
777 new_head = new_head_number,
778 new_head_hash = ?new_head_hash,
779 "FCU unwind detected: reverting to canonical ancestor"
780 );
781
782 self.handle_canonical_chain_unwind(current_head_number, canonical_header)
783 } else {
784 debug!(
785 target: "engine::tree",
786 previous_head = current_head_number,
787 new_head = new_head_number,
788 new_head_hash = ?new_head_hash,
789 "Advancing latest block to canonical ancestor"
790 );
791 self.handle_chain_advance_or_same_height(canonical_header)
792 }
793 }
794
795 fn handle_canonical_chain_unwind(
798 &self,
799 current_head_number: u64,
800 canonical_header: &SealedHeader<N::BlockHeader>,
801 ) -> ProviderResult<()> {
802 let new_head_number = canonical_header.number();
803 debug!(
804 target: "engine::tree",
805 from = current_head_number,
806 to = new_head_number,
807 "Handling unwind: collecting blocks to remove from in-memory state"
808 );
809
810 let old_blocks =
812 self.collect_blocks_for_canonical_unwind(new_head_number, current_head_number);
813
814 self.apply_canonical_ancestor_via_reorg(canonical_header, old_blocks)
816 }
817
818 fn collect_blocks_for_canonical_unwind(
820 &self,
821 new_head_number: u64,
822 current_head_number: u64,
823 ) -> Vec<ExecutedBlock<N>> {
824 let mut old_blocks = Vec::new();
825
826 for block_num in (new_head_number + 1)..=current_head_number {
827 if let Some(block_state) = self.canonical_in_memory_state.state_by_number(block_num) {
828 let executed_block = block_state.block_ref().clone();
829 old_blocks.push(executed_block);
830 debug!(
831 target: "engine::tree",
832 block_number = block_num,
833 "Collected block for removal from in-memory state"
834 );
835 }
836 }
837
838 if old_blocks.is_empty() {
839 debug!(
840 target: "engine::tree",
841 "No blocks found in memory to remove, will clear and reset state"
842 );
843 }
844
845 old_blocks
846 }
847
848 fn apply_canonical_ancestor_via_reorg(
850 &self,
851 canonical_header: &SealedHeader<N::BlockHeader>,
852 old_blocks: Vec<ExecutedBlock<N>>,
853 ) -> ProviderResult<()> {
854 let new_head_hash = canonical_header.hash();
855 let new_head_number = canonical_header.number();
856
857 match self.canonical_block_by_hash(new_head_hash)? {
859 Some(executed_block) => {
860 self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
862 new: vec![executed_block],
863 old: old_blocks,
864 });
865
866 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
869
870 debug!(
871 target: "engine::tree",
872 block_number = new_head_number,
873 block_hash = ?new_head_hash,
874 "Successfully loaded canonical ancestor into memory via reorg"
875 );
876 }
877 None => {
878 warn!(
880 target: "engine::tree",
881 block_hash = ?new_head_hash,
882 "Could not find canonical ancestor block, updating header only"
883 );
884 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
885 }
886 }
887
888 Ok(())
889 }
890
891 fn handle_chain_advance_or_same_height(
893 &self,
894 canonical_header: &SealedHeader<N::BlockHeader>,
895 ) -> ProviderResult<()> {
896 let new_head_number = canonical_header.number();
897 let new_head_hash = canonical_header.hash();
898
899 self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
901
902 self.ensure_block_in_memory(new_head_number, new_head_hash)
904 }
905
906 fn ensure_block_in_memory(&self, block_number: u64, block_hash: B256) -> ProviderResult<()> {
908 if self.canonical_in_memory_state.state_by_number(block_number).is_some() {
910 return Ok(());
911 }
912
913 if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
915 self.canonical_in_memory_state
916 .update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
917
918 debug!(
919 target: "engine::tree",
920 block_number,
921 block_hash = ?block_hash,
922 "Added canonical block to in-memory state"
923 );
924 }
925
926 Ok(())
927 }
928
929 fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
939 let target_hash = target.block.hash;
940 let canonical_head = self.state.tree_state.canonical_head();
942 let mut current_hash;
943 let mut current_block = target;
944 loop {
945 if current_block.block.hash == canonical_head.hash {
946 return Ok(false)
947 }
948 if current_block.block.number <= canonical_head.number {
950 break
951 }
952 current_hash = current_block.parent;
953
954 let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
955 current_block = next_block.block_with_parent();
956 }
957
958 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
960 return Ok(false)
961 }
962
963 if self.provider.block_number(target_hash)?.is_some() {
965 return Ok(false)
966 }
967
968 Ok(true)
969 }
970
971 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
980 fn on_forkchoice_updated(
981 &mut self,
982 state: ForkchoiceState,
983 attrs: Option<T::PayloadAttributes>,
984 version: EngineApiMessageVersion,
985 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
986 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
987
988 self.record_forkchoice_metrics();
990
991 if let Some(early_result) = self.validate_forkchoice_state(state)? {
993 return Ok(TreeOutcome::new(early_result));
994 }
995
996 if let Some(result) = self.handle_canonical_head(state, &attrs, version)? {
998 return Ok(result);
999 }
1000
1001 if let Some(result) = self.apply_chain_update(state, &attrs, version)? {
1004 return Ok(result);
1005 }
1006
1007 self.handle_missing_block(state)
1009 }
1010
1011 fn record_forkchoice_metrics(&self) {
1013 self.canonical_in_memory_state.on_forkchoice_update_received();
1014 }
1015
1016 fn validate_forkchoice_state(
1021 &mut self,
1022 state: ForkchoiceState,
1023 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
1024 if state.head_block_hash.is_zero() {
1025 return Ok(Some(OnForkChoiceUpdated::invalid_state()));
1026 }
1027
1028 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
1031 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
1032 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)));
1033 }
1034
1035 if !self.backfill_sync_state.is_idle() {
1036 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
1039 return Ok(Some(OnForkChoiceUpdated::syncing()));
1040 }
1041
1042 Ok(None)
1043 }
1044
1045 fn handle_canonical_head(
1051 &self,
1052 state: ForkchoiceState,
1053 attrs: &Option<T::PayloadAttributes>, version: EngineApiMessageVersion,
1055 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1056 if self.state.tree_state.canonical_block_hash() != state.head_block_hash {
1071 return Ok(None);
1072 }
1073
1074 trace!(target: "engine::tree", "fcu head hash is already canonical");
1075
1076 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1078 return Ok(Some(TreeOutcome::new(outcome)));
1080 }
1081
1082 if let Some(attr) = attrs {
1084 let tip = self
1085 .sealed_header_by_hash(self.state.tree_state.canonical_block_hash())?
1086 .ok_or_else(|| {
1087 ProviderError::HeaderNotFound(state.head_block_hash.into())
1090 })?;
1091 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1093 return Ok(Some(TreeOutcome::new(updated)));
1094 }
1095
1096 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1098 PayloadStatusEnum::Valid,
1099 Some(state.head_block_hash),
1100 )));
1101 Ok(Some(outcome))
1102 }
1103
1104 fn apply_chain_update(
1116 &mut self,
1117 state: ForkchoiceState,
1118 attrs: &Option<T::PayloadAttributes>,
1119 version: EngineApiMessageVersion,
1120 ) -> ProviderResult<Option<TreeOutcome<OnForkChoiceUpdated>>> {
1121 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1123 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1124
1125 if self.engine_kind.is_opstack() ||
1128 self.config.always_process_payload_attributes_on_canonical_head()
1129 {
1130 if self.config.unwind_canonical_header() {
1136 self.update_latest_block_to_canonical_ancestor(&canonical_header)?;
1137 }
1138
1139 if let Some(attr) = attrs {
1140 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1141 let updated = self.process_payload_attributes(
1143 attr.clone(),
1144 &canonical_header,
1145 state,
1146 version,
1147 );
1148 return Ok(Some(TreeOutcome::new(updated)));
1149 }
1150 }
1151
1152 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1164 PayloadStatusEnum::Valid,
1165 Some(state.head_block_hash),
1166 )));
1167 return Ok(Some(outcome));
1168 }
1169
1170 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1172 let tip = chain_update.tip().clone_sealed_header();
1173 self.on_canonical_chain_update(chain_update);
1174
1175 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1177 return Ok(Some(TreeOutcome::new(outcome)));
1179 }
1180
1181 if let Some(attr) = attrs {
1182 let updated = self.process_payload_attributes(attr.clone(), &tip, state, version);
1184 return Ok(Some(TreeOutcome::new(updated)));
1185 }
1186
1187 let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1188 PayloadStatusEnum::Valid,
1189 Some(state.head_block_hash),
1190 )));
1191 return Ok(Some(outcome));
1192 }
1193
1194 Ok(None)
1195 }
1196
1197 fn handle_missing_block(
1202 &self,
1203 state: ForkchoiceState,
1204 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1205 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1212 !state.safe_block_hash.is_zero() &&
1214 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1215 {
1216 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1217 state.safe_block_hash
1218 } else {
1219 state.head_block_hash
1220 };
1221
1222 let target = self.lowest_buffered_ancestor_or(target);
1223 trace!(target: "engine::tree", %target, "downloading missing block");
1224
1225 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1226 PayloadStatusEnum::Syncing,
1227 )))
1228 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1229 }
1230
1231 #[expect(clippy::type_complexity)]
1240 fn try_recv_engine_message(
1241 &self,
1242 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1243 if self.persistence_state.in_progress() {
1244 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1246 Ok(msg) => Ok(Some(msg)),
1247 Err(err) => match err {
1248 RecvTimeoutError::Timeout => Ok(None),
1249 RecvTimeoutError::Disconnected => Err(RecvError),
1250 },
1251 }
1252 } else {
1253 self.incoming.recv().map(Some)
1254 }
1255 }
1256
1257 fn remove_blocks(&mut self, new_tip_num: u64) {
1260 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1261 if new_tip_num < self.persistence_state.last_persisted_block.number {
1262 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1263 let (tx, rx) = oneshot::channel();
1264 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1265 self.persistence_state.start_remove(new_tip_num, rx);
1266 }
1267 }
1268
1269 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
1272 if blocks_to_persist.is_empty() {
1273 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1274 return
1275 }
1276
1277 let highest_num_hash = blocks_to_persist
1279 .iter()
1280 .max_by_key(|block| block.recovered_block().number())
1281 .map(|b| b.recovered_block().num_hash())
1282 .expect("Checked non-empty persisting blocks");
1283
1284 debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1285 let (tx, rx) = oneshot::channel();
1286 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1287
1288 self.persistence_state.start_save(highest_num_hash, rx);
1289 }
1290
1291 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1296 if self.persistence_state.in_progress() {
1297 let (mut rx, start_time, current_action) = self
1298 .persistence_state
1299 .rx
1300 .take()
1301 .expect("if a persistence task is in progress Receiver must be Some");
1302 match rx.try_recv() {
1304 Ok(last_persisted_hash_num) => {
1305 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1306 let Some(BlockNumHash {
1307 hash: last_persisted_block_hash,
1308 number: last_persisted_block_number,
1309 }) = last_persisted_hash_num
1310 else {
1311 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1314 return Ok(())
1315 };
1316
1317 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
1318 self.persistence_state
1319 .finish(last_persisted_block_hash, last_persisted_block_number);
1320 self.on_new_persisted_block()?;
1321 }
1322 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1323 Err(TryRecvError::Empty) => {
1324 self.persistence_state.rx = Some((rx, start_time, current_action))
1325 }
1326 }
1327 }
1328
1329 if !self.persistence_state.in_progress() {
1330 if let Some(new_tip_num) = self.find_disk_reorg()? {
1331 self.remove_blocks(new_tip_num)
1332 } else if self.should_persist() {
1333 let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
1334 self.persist_blocks(blocks_to_persist);
1335 }
1336 }
1337
1338 Ok(())
1339 }
1340
1341 fn on_engine_message(
1343 &mut self,
1344 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1345 ) -> Result<(), InsertBlockFatalError> {
1346 match msg {
1347 FromEngine::Event(event) => match event {
1348 FromOrchestrator::BackfillSyncStarted => {
1349 debug!(target: "engine::tree", "received backfill sync started event");
1350 self.backfill_sync_state = BackfillSyncState::Active;
1351 }
1352 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1353 self.on_backfill_sync_finished(ctrl)?;
1354 }
1355 },
1356 FromEngine::Request(request) => {
1357 match request {
1358 EngineApiRequest::InsertExecutedBlock(block) => {
1359 let block_num_hash = block.recovered_block().num_hash();
1360 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1361 return Ok(())
1363 }
1364
1365 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1366 let now = Instant::now();
1367
1368 if self.state.tree_state.canonical_block_hash() ==
1371 block.recovered_block().parent_hash()
1372 {
1373 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1374 self.canonical_in_memory_state.set_pending_block(block.clone());
1375 }
1376
1377 self.state.tree_state.insert_executed(block.clone());
1378 self.payload_validator.on_inserted_executed_block(block.clone());
1379 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1380 self.emit_event(EngineApiEvent::BeaconConsensus(
1381 ConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1382 ));
1383 }
1384 EngineApiRequest::Beacon(request) => {
1385 match request {
1386 BeaconEngineMessage::ForkchoiceUpdated {
1387 state,
1388 payload_attrs,
1389 tx,
1390 version,
1391 } => {
1392 let has_attrs = payload_attrs.is_some();
1393
1394 let start = Instant::now();
1395 let mut output =
1396 self.on_forkchoice_updated(state, payload_attrs, version);
1397
1398 if let Ok(res) = &mut output {
1399 self.state
1401 .forkchoice_state_tracker
1402 .set_latest(state, res.outcome.forkchoice_status());
1403
1404 self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
1406 state,
1407 res.outcome.forkchoice_status(),
1408 ));
1409
1410 self.on_maybe_tree_event(res.event.take())?;
1412 }
1413
1414 self.metrics.engine.forkchoice_updated.update_response_metrics(
1415 start,
1416 &mut self.metrics.engine.new_payload.latest_at,
1417 has_attrs,
1418 &output,
1419 );
1420
1421 if let Err(err) =
1422 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1423 {
1424 self.metrics
1425 .engine
1426 .failed_forkchoice_updated_response_deliveries
1427 .increment(1);
1428 error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1429 }
1430 }
1431 BeaconEngineMessage::NewPayload { payload, tx } => {
1432 let start = Instant::now();
1433 let gas_used = payload.gas_used();
1434 let num_hash = payload.num_hash();
1435 let mut output = self.on_new_payload(payload);
1436 self.metrics
1437 .engine
1438 .new_payload
1439 .update_response_metrics(start, &output, gas_used);
1440
1441 let maybe_event =
1442 output.as_mut().ok().and_then(|out| out.event.take());
1443
1444 if let Err(err) =
1446 tx.send(output.map(|o| o.outcome).map_err(|e| {
1447 BeaconOnNewPayloadError::Internal(Box::new(e))
1448 }))
1449 {
1450 error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
1451 self.metrics
1452 .engine
1453 .failed_new_payload_response_deliveries
1454 .increment(1);
1455 }
1456
1457 self.on_maybe_tree_event(maybe_event)?;
1459 }
1460 }
1461 }
1462 }
1463 }
1464 FromEngine::DownloadedBlocks(blocks) => {
1465 if let Some(event) = self.on_downloaded(blocks)? {
1466 self.on_tree_event(event)?;
1467 }
1468 }
1469 }
1470 Ok(())
1471 }
1472
1473 fn on_backfill_sync_finished(
1487 &mut self,
1488 ctrl: ControlFlow,
1489 ) -> Result<(), InsertBlockFatalError> {
1490 debug!(target: "engine::tree", "received backfill sync finished event");
1491 self.backfill_sync_state = BackfillSyncState::Idle;
1492
1493 let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1495 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1496 self.state.invalid_headers.insert(**bad_block);
1498
1499 Some(*target)
1501 } else {
1502 ctrl.block_number()
1504 };
1505
1506 let Some(backfill_height) = backfill_height else { return Ok(()) };
1508
1509 let Some(backfill_num_hash) = self
1515 .provider
1516 .block_hash(backfill_height)?
1517 .map(|hash| BlockNumHash { hash, number: backfill_height })
1518 else {
1519 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1520 return Ok(())
1521 };
1522
1523 if ctrl.is_unwind() {
1524 self.state.tree_state.reset(backfill_num_hash)
1527 } else {
1528 self.state.tree_state.remove_until(
1529 backfill_num_hash,
1530 self.persistence_state.last_persisted_block.hash,
1531 Some(backfill_num_hash),
1532 );
1533 }
1534
1535 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1536 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1537
1538 self.state.buffer.remove_old_blocks(backfill_height);
1540 self.canonical_in_memory_state.clear_state();
1543
1544 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1545 self.state.tree_state.set_canonical_head(new_head.num_hash());
1548 self.persistence_state.finish(new_head.hash(), new_head.number());
1549
1550 self.canonical_in_memory_state.set_canonical_head(new_head);
1552 }
1553
1554 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1557 else {
1558 return Ok(())
1559 };
1560 if sync_target_state.finalized_block_hash.is_zero() {
1561 return Ok(())
1563 }
1564 let newest_finalized = self
1566 .state
1567 .buffer
1568 .block(&sync_target_state.finalized_block_hash)
1569 .map(|block| block.number());
1570
1571 if let Some(backfill_target) =
1577 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1578 self.backfill_sync_target(progress, finalized_number, None)
1581 })
1582 {
1583 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1585 backfill_target.into(),
1586 )));
1587 return Ok(())
1588 };
1589
1590 if let Some(lowest_buffered) =
1592 self.state.buffer.lowest_ancestor(&sync_target_state.head_block_hash)
1593 {
1594 let current_head_num = self.state.tree_state.current_canonical_head.number;
1595 let target_head_num = lowest_buffered.number();
1596
1597 if let Some(distance) = self.distance_from_local_tip(current_head_num, target_head_num)
1598 {
1599 debug!(
1601 target: "engine::tree",
1602 %current_head_num,
1603 %target_head_num,
1604 %distance,
1605 "Backfill complete, downloading remaining blocks to reach FCU target"
1606 );
1607
1608 self.emit_event(EngineApiEvent::Download(DownloadRequest::BlockRange(
1609 lowest_buffered.parent_hash(),
1610 distance,
1611 )));
1612 return Ok(());
1613 }
1614 }
1615
1616 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1618 }
1619
1620 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1624 if let Some(chain_update) = self.on_new_head(target)? {
1625 self.on_canonical_chain_update(chain_update);
1626 }
1627
1628 Ok(())
1629 }
1630
1631 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1633 if let Some(event) = event {
1634 self.on_tree_event(event)?;
1635 }
1636
1637 Ok(())
1638 }
1639
1640 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1644 match event {
1645 TreeEvent::TreeAction(action) => match action {
1646 TreeAction::MakeCanonical { sync_target_head } => {
1647 self.make_canonical(sync_target_head)?;
1648 }
1649 },
1650 TreeEvent::BackfillAction(action) => {
1651 self.emit_event(EngineApiEvent::BackfillAction(action));
1652 }
1653 TreeEvent::Download(action) => {
1654 self.emit_event(EngineApiEvent::Download(action));
1655 }
1656 }
1657
1658 Ok(())
1659 }
1660
1661 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1663 let event = event.into();
1664
1665 if event.is_backfill_action() {
1666 debug_assert_eq!(
1667 self.backfill_sync_state,
1668 BackfillSyncState::Idle,
1669 "backfill action should only be emitted when backfill is idle"
1670 );
1671
1672 if self.persistence_state.in_progress() {
1673 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1676 return
1677 }
1678
1679 self.backfill_sync_state = BackfillSyncState::Pending;
1680 self.metrics.engine.pipeline_runs.increment(1);
1681 debug!(target: "engine::tree", "emitting backfill action event");
1682 }
1683
1684 let _ = self.outgoing.send(event).inspect_err(
1685 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1686 );
1687 }
1688
1689 pub const fn should_persist(&self) -> bool {
1693 if !self.backfill_sync_state.is_idle() {
1694 return false
1696 }
1697
1698 let min_block = self.persistence_state.last_persisted_block.number;
1699 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1700 self.config.persistence_threshold()
1701 }
1702
1703 fn get_canonical_blocks_to_persist(
1707 &self,
1708 ) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
1709 debug_assert!(!self.persistence_state.in_progress());
1712
1713 let mut blocks_to_persist = Vec::new();
1714 let mut current_hash = self.state.tree_state.canonical_block_hash();
1715 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1716 let canonical_head_number = self.state.tree_state.canonical_block_number();
1717
1718 let target_number =
1720 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1721
1722 debug!(
1723 target: "engine::tree",
1724 ?current_hash,
1725 ?last_persisted_number,
1726 ?canonical_head_number,
1727 ?target_number,
1728 "Returning canonical blocks to persist"
1729 );
1730 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1731 if block.recovered_block().number() <= last_persisted_number {
1732 break;
1733 }
1734
1735 if block.recovered_block().number() <= target_number {
1736 blocks_to_persist.push(block.clone());
1737 }
1738
1739 current_hash = block.recovered_block().parent_hash();
1740 }
1741
1742 blocks_to_persist.reverse();
1744
1745 Ok(blocks_to_persist)
1746 }
1747
1748 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1756 if let Some(remove_above) = self.find_disk_reorg()? {
1759 self.remove_blocks(remove_above);
1760 return Ok(())
1761 }
1762
1763 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1764 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1765 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1766 number: self.persistence_state.last_persisted_block.number,
1767 hash: self.persistence_state.last_persisted_block.hash,
1768 });
1769 Ok(())
1770 }
1771
1772 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1779 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1780 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
1782 return Ok(Some(block.clone()))
1783 }
1784
1785 let (block, senders) = self
1786 .provider
1787 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1788 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1789 .split_sealed();
1790 let execution_output = self
1791 .provider
1792 .get_state(block.header().number())?
1793 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1794 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1795 let trie_updates = self.provider.get_block_trie_updates(block.number())?;
1796
1797 let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
1798 let sorted_trie_updates = Arc::new(trie_updates);
1799 let trie_data =
1801 ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
1802
1803 Ok(Some(ExecutedBlock::new(
1804 Arc::new(RecoveredBlock::new_sealed(block, senders)),
1805 Arc::new(execution_output),
1806 trie_data,
1807 )))
1808 }
1809
1810 fn sealed_header_by_hash(
1812 &self,
1813 hash: B256,
1814 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1815 let header = self.state.tree_state.sealed_header_by_hash(&hash);
1817
1818 if header.is_some() {
1819 Ok(header)
1820 } else {
1821 self.provider.sealed_header_by_hash(hash)
1822 }
1823 }
1824
1825 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1832 self.state
1833 .buffer
1834 .lowest_ancestor(&hash)
1835 .map(|block| block.parent_hash())
1836 .unwrap_or_else(|| hash)
1837 }
1838
1839 fn latest_valid_hash_for_invalid_payload(
1850 &mut self,
1851 parent_hash: B256,
1852 ) -> ProviderResult<Option<B256>> {
1853 if self.sealed_header_by_hash(parent_hash)?.is_some() {
1855 return Ok(Some(parent_hash))
1856 }
1857
1858 let mut current_hash = parent_hash;
1861 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1862 while let Some(block_with_parent) = current_block {
1863 current_hash = block_with_parent.parent;
1864 current_block = self.state.invalid_headers.get(¤t_hash);
1865
1866 if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
1869 return Ok(Some(current_hash))
1870 }
1871 }
1872 Ok(None)
1873 }
1874
1875 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1879 if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
1882 !parent.difficulty().is_zero()
1883 {
1884 parent_hash = B256::ZERO;
1885 }
1886
1887 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1888 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1889 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1890 })
1891 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1892 }
1893
1894 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1898 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1899 return target.head_block_hash == block_hash
1900 }
1901 false
1902 }
1903
1904 fn is_any_sync_target(&self, block_hash: B256) -> bool {
1908 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1909 return target.contains(block_hash)
1910 }
1911 false
1912 }
1913
1914 fn check_invalid_ancestor_with_head(
1920 &mut self,
1921 check: B256,
1922 head: &SealedBlock<N::Block>,
1923 ) -> ProviderResult<Option<PayloadStatus>> {
1924 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1926
1927 Ok(Some(self.on_invalid_new_payload(head.clone(), header)?))
1928 }
1929
1930 fn on_invalid_new_payload(
1932 &mut self,
1933 head: SealedBlock<N::Block>,
1934 invalid: BlockWithParent,
1935 ) -> ProviderResult<PayloadStatus> {
1936 let status = self.prepare_invalid_response(invalid.parent)?;
1938
1939 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), invalid);
1941 self.emit_event(ConsensusEngineEvent::InvalidBlock(Box::new(head)));
1942
1943 Ok(status)
1944 }
1945
1946 fn find_invalid_ancestor(&mut self, payload: &T::ExecutionData) -> Option<BlockWithParent> {
1958 let parent_hash = payload.parent_hash();
1959 let block_hash = payload.block_hash();
1960 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
1961 if lowest_buffered_ancestor == block_hash {
1962 lowest_buffered_ancestor = parent_hash;
1963 }
1964
1965 self.state.invalid_headers.get(&lowest_buffered_ancestor)
1967 }
1968
1969 fn handle_invalid_ancestor_payload(
1978 &mut self,
1979 payload: T::ExecutionData,
1980 invalid: BlockWithParent,
1981 ) -> Result<PayloadStatus, InsertBlockFatalError> {
1982 let parent_hash = payload.parent_hash();
1983 let num_hash = payload.num_hash();
1984
1985 let block = match self.payload_validator.convert_payload_to_block(payload) {
1991 Ok(block) => block,
1992 Err(error) => return Ok(self.on_new_payload_error(error, num_hash, parent_hash)?),
1993 };
1994
1995 Ok(self.on_invalid_new_payload(block, invalid)?)
1996 }
1997
1998 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
2001 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
2003
2004 match self.prepare_invalid_response(header.parent) {
2006 Ok(status) => Ok(Some(status)),
2007 Err(err) => {
2008 debug!(target: "engine::tree", %err, "Failed to prepare invalid response for ancestor check");
2009 Ok(Some(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2011 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
2012 })))
2013 }
2014 }
2015 }
2016
2017 fn validate_block(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
2020 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
2021 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
2022 return Err(e)
2023 }
2024
2025 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
2026 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
2027 return Err(e)
2028 }
2029
2030 Ok(())
2031 }
2032
2033 #[instrument(level = "debug", target = "engine::tree", skip(self))]
2035 fn try_connect_buffered_blocks(
2036 &mut self,
2037 parent: BlockNumHash,
2038 ) -> Result<(), InsertBlockFatalError> {
2039 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
2040
2041 if blocks.is_empty() {
2042 return Ok(())
2044 }
2045
2046 let now = Instant::now();
2047 let block_count = blocks.len();
2048 for child in blocks {
2049 let child_num_hash = child.num_hash();
2050 match self.insert_block(child) {
2051 Ok(res) => {
2052 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
2053 if self.is_any_sync_target(child_num_hash.hash) &&
2054 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
2055 {
2056 debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
2057 self.make_canonical(child_num_hash.hash)?;
2060 }
2061 }
2062 Err(err) => {
2063 if let InsertPayloadError::Block(err) = err {
2064 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
2065 if let Err(fatal) = self.on_insert_block_error(err) {
2066 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
2067 return Err(fatal)
2068 }
2069 }
2070 }
2071 }
2072 }
2073
2074 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
2075 Ok(())
2076 }
2077
2078 fn buffer_block(
2080 &mut self,
2081 block: SealedBlock<N::Block>,
2082 ) -> Result<(), InsertBlockError<N::Block>> {
2083 if let Err(err) = self.validate_block(&block) {
2084 return Err(InsertBlockError::consensus_error(err, block))
2085 }
2086 self.state.buffer.insert_block(block);
2087 Ok(())
2088 }
2089
2090 #[inline]
2095 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
2096 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
2097 }
2098
2099 #[inline]
2102 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
2103 if block > local_tip {
2104 Some(block - local_tip)
2105 } else {
2106 None
2107 }
2108 }
2109
2110 fn backfill_sync_target(
2117 &self,
2118 canonical_tip_num: u64,
2119 target_block_number: u64,
2120 downloaded_block: Option<BlockNumHash>,
2121 ) -> Option<B256> {
2122 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
2123
2124 let exceeds_backfill_threshold =
2126 match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
2127 (Some(downloaded_block), Some(state))
2129 if downloaded_block.hash == state.finalized_block_hash =>
2130 {
2131 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
2132 }
2133 _ => match sync_target_state
2134 .as_ref()
2135 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
2136 {
2137 Some(buffered_finalized) => {
2138 self.exceeds_backfill_run_threshold(
2141 canonical_tip_num,
2142 buffered_finalized.number(),
2143 )
2144 }
2145 None => {
2146 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
2148 }
2149 },
2150 };
2151
2152 if exceeds_backfill_threshold && let Some(state) = sync_target_state {
2154 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
2156 Err(err) => {
2157 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
2158 }
2159 Ok(None) => {
2160 if !state.finalized_block_hash.is_zero() {
2162 return Some(state.finalized_block_hash)
2165 }
2166
2167 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2180 return Some(state.head_block_hash)
2181 }
2182 Ok(Some(_)) => {
2183 }
2185 }
2186 }
2187
2188 None
2189 }
2190
2191 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
2194 let mut canonical = self.state.tree_state.current_canonical_head;
2195 let mut persisted = self.persistence_state.last_persisted_block;
2196
2197 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
2198 Ok(self
2199 .sealed_header_by_hash(num_hash.hash)?
2200 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
2201 .parent_num_hash())
2202 };
2203
2204 while canonical.number > persisted.number {
2207 canonical = parent_num_hash(canonical)?;
2208 }
2209
2210 if canonical == persisted {
2212 return Ok(None);
2213 }
2214
2215 while persisted.number > canonical.number {
2221 persisted = parent_num_hash(persisted)?;
2222 }
2223
2224 debug_assert_eq!(persisted.number, canonical.number);
2225
2226 while persisted.hash != canonical.hash {
2228 canonical = parent_num_hash(canonical)?;
2229 persisted = parent_num_hash(persisted)?;
2230 }
2231
2232 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
2233
2234 Ok(Some(persisted.number))
2235 }
2236
2237 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2241 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2242 let start = Instant::now();
2243
2244 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2246
2247 let tip = chain_update.tip().clone_sealed_header();
2248 let notification = chain_update.to_chain_notification();
2249
2250 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2252 let new_first = new.first().map(|first| first.recovered_block().num_hash());
2253 let old_first = old.first().map(|first| first.recovered_block().num_hash());
2254 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2255
2256 self.update_reorg_metrics(old.len());
2257 self.reinsert_reorged_blocks(new.clone());
2258 self.reinsert_reorged_blocks(old.clone());
2259 }
2260
2261 self.canonical_in_memory_state.update_chain(chain_update);
2263 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2264
2265 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2267
2268 self.canonical_in_memory_state.notify_canon_state(notification);
2270
2271 self.emit_event(ConsensusEngineEvent::CanonicalChainCommitted(
2273 Box::new(tip),
2274 start.elapsed(),
2275 ));
2276 }
2277
2278 fn update_reorg_metrics(&self, old_chain_length: usize) {
2280 self.metrics.tree.reorgs.increment(1);
2281 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2282 }
2283
2284 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2286 for block in new_chain {
2287 if self
2288 .state
2289 .tree_state
2290 .executed_block_by_hash(block.recovered_block().hash())
2291 .is_none()
2292 {
2293 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
2294 self.state.tree_state.insert_executed(block);
2295 }
2296 }
2297 }
2298
2299 fn on_disconnected_downloaded_block(
2304 &self,
2305 downloaded_block: BlockNumHash,
2306 missing_parent: BlockNumHash,
2307 head: BlockNumHash,
2308 ) -> Option<TreeEvent> {
2309 if let Some(target) =
2311 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2312 {
2313 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2314 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2315 }
2316
2317 let request = if let Some(distance) =
2327 self.distance_from_local_tip(head.number, missing_parent.number)
2328 {
2329 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2330 DownloadRequest::BlockRange(missing_parent.hash, distance)
2331 } else {
2332 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2333 DownloadRequest::single_block(missing_parent.hash)
2336 };
2337
2338 Some(TreeEvent::Download(request))
2339 }
2340
2341 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
2347 fn on_downloaded_block(
2348 &mut self,
2349 block: SealedBlock<N::Block>,
2350 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2351 let block_num_hash = block.num_hash();
2352 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2353 if self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?.is_some() {
2354 return Ok(None)
2355 }
2356
2357 if !self.backfill_sync_state.is_idle() {
2358 return Ok(None)
2359 }
2360
2361 match self.insert_block(block) {
2363 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2364 if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
2367 sync_target.contains(block_num_hash.hash)
2368 {
2369 debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
2370
2371 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2374 sync_target_head: block_num_hash.hash,
2375 })))
2376 }
2377 trace!(target: "engine::tree", "appended downloaded block");
2378 self.try_connect_buffered_blocks(block_num_hash)?;
2379 }
2380 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2381 return Ok(self.on_disconnected_downloaded_block(
2384 block_num_hash,
2385 missing_ancestor,
2386 head,
2387 ))
2388 }
2389 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2390 trace!(target: "engine::tree", "downloaded block already executed");
2391 }
2392 Err(err) => {
2393 if let InsertPayloadError::Block(err) = err {
2394 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2395 if let Err(fatal) = self.on_insert_block_error(err) {
2396 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2397 return Err(fatal)
2398 }
2399 }
2400 }
2401 }
2402 Ok(None)
2403 }
2404
2405 fn insert_payload(
2414 &mut self,
2415 payload: T::ExecutionData,
2416 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2417 self.insert_block_or_payload(
2418 payload.block_with_parent(),
2419 payload,
2420 |validator, payload, ctx| validator.validate_payload(payload, ctx),
2421 |this, payload| Ok(this.payload_validator.convert_payload_to_block(payload)?),
2422 )
2423 }
2424
2425 fn insert_block(
2426 &mut self,
2427 block: SealedBlock<N::Block>,
2428 ) -> Result<InsertPayloadOk, InsertPayloadError<N::Block>> {
2429 self.insert_block_or_payload(
2430 block.block_with_parent(),
2431 block,
2432 |validator, block, ctx| validator.validate_block(block, ctx),
2433 |_, block| Ok(block),
2434 )
2435 }
2436
2437 #[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
2454 fn insert_block_or_payload<Input, Err>(
2455 &mut self,
2456 block_id: BlockWithParent,
2457 input: Input,
2458 execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
2459 convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
2460 ) -> Result<InsertPayloadOk, Err>
2461 where
2462 Err: From<InsertBlockError<N::Block>>,
2463 {
2464 let block_insert_start = Instant::now();
2465 let block_num_hash = block_id.block;
2466 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
2467
2468 match self.sealed_header_by_hash(block_num_hash.hash) {
2469 Err(err) => {
2470 let block = convert_to_block(self, input)?;
2471 return Err(InsertBlockError::new(block, err.into()).into());
2472 }
2473 Ok(Some(_)) => {
2474 convert_to_block(self, input)?;
2477 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2478 }
2479 _ => {}
2480 };
2481
2482 match self.state_provider_builder(block_id.parent) {
2484 Err(err) => {
2485 let block = convert_to_block(self, input)?;
2486 return Err(InsertBlockError::new(block, err.into()).into());
2487 }
2488 Ok(None) => {
2489 let block = convert_to_block(self, input)?;
2490
2491 let missing_ancestor = self
2494 .state
2495 .buffer
2496 .lowest_ancestor(&block.parent_hash())
2497 .map(|block| block.parent_num_hash())
2498 .unwrap_or_else(|| block.parent_num_hash());
2499
2500 self.state.buffer.insert_block(block);
2501
2502 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2503 head: self.state.tree_state.current_canonical_head,
2504 missing_ancestor,
2505 }))
2506 }
2507 Ok(Some(_)) => {}
2508 }
2509
2510 let is_fork = match self.is_fork(block_id) {
2512 Err(err) => {
2513 let block = convert_to_block(self, input)?;
2514 return Err(InsertBlockError::new(block, err.into()).into());
2515 }
2516 Ok(is_fork) => is_fork,
2517 };
2518
2519 let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
2520
2521 let start = Instant::now();
2522
2523 let executed = execute(&mut self.payload_validator, input, ctx)?;
2524
2525 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2527 {
2528 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2529 self.canonical_in_memory_state.set_pending_block(executed.clone());
2530 }
2531
2532 self.state.tree_state.insert_executed(executed.clone());
2533 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2534
2535 let elapsed = start.elapsed();
2537 let engine_event = if is_fork {
2538 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2539 } else {
2540 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2541 };
2542 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2543
2544 self.metrics
2545 .engine
2546 .block_insert_total_duration
2547 .record(block_insert_start.elapsed().as_secs_f64());
2548 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2549 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2550 }
2551
2552 fn on_insert_block_error(
2558 &mut self,
2559 error: InsertBlockError<N::Block>,
2560 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2561 let (block, error) = error.split();
2562
2563 let validation_err = error.ensure_validation_error()?;
2566
2567 warn!(
2571 target: "engine::tree",
2572 invalid_hash=%block.hash(),
2573 invalid_number=block.number(),
2574 %validation_err,
2575 "Invalid block error on new payload",
2576 );
2577 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2578
2579 self.state.invalid_headers.insert(block.block_with_parent());
2581 self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(
2582 Box::new(block),
2583 )));
2584
2585 Ok(PayloadStatus::new(
2586 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2587 latest_valid_hash,
2588 ))
2589 }
2590
2591 fn on_new_payload_error(
2593 &mut self,
2594 error: NewPayloadError,
2595 payload_num_hash: NumHash,
2596 parent_hash: B256,
2597 ) -> ProviderResult<PayloadStatus> {
2598 error!(target: "engine::tree", payload=?payload_num_hash, %error, "Invalid payload");
2599 let latest_valid_hash =
2602 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
2603 None
2607 } else {
2608 self.latest_valid_hash_for_invalid_payload(parent_hash)?
2609 };
2610
2611 let status = PayloadStatusEnum::from(error);
2612 Ok(PayloadStatus::new(status, latest_valid_hash))
2613 }
2614
2615 pub fn find_canonical_header(
2617 &self,
2618 hash: B256,
2619 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2620 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2621
2622 if canonical.is_none() {
2623 canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
2624 }
2625
2626 Ok(canonical)
2627 }
2628
2629 fn update_finalized_block(
2631 &self,
2632 finalized_block_hash: B256,
2633 ) -> Result<(), OnForkChoiceUpdated> {
2634 if finalized_block_hash.is_zero() {
2635 return Ok(())
2636 }
2637
2638 match self.find_canonical_header(finalized_block_hash) {
2639 Ok(None) => {
2640 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2641 return Err(OnForkChoiceUpdated::invalid_state())
2643 }
2644 Ok(Some(finalized)) => {
2645 if Some(finalized.num_hash()) !=
2646 self.canonical_in_memory_state.get_finalized_num_hash()
2647 {
2648 let _ = self.persistence.save_finalized_block_number(finalized.number());
2651 self.canonical_in_memory_state.set_finalized(finalized.clone());
2652 self.metrics.tree.finalized_block_height.set(finalized.number() as f64);
2654 }
2655 }
2656 Err(err) => {
2657 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2658 }
2659 }
2660
2661 Ok(())
2662 }
2663
2664 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2666 if safe_block_hash.is_zero() {
2667 return Ok(())
2668 }
2669
2670 match self.find_canonical_header(safe_block_hash) {
2671 Ok(None) => {
2672 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2673 return Err(OnForkChoiceUpdated::invalid_state())
2675 }
2676 Ok(Some(safe)) => {
2677 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2678 let _ = self.persistence.save_safe_block_number(safe.number());
2681 self.canonical_in_memory_state.set_safe(safe.clone());
2682 self.metrics.tree.safe_block_height.set(safe.number() as f64);
2684 }
2685 }
2686 Err(err) => {
2687 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2688 }
2689 }
2690
2691 Ok(())
2692 }
2693
2694 fn ensure_consistent_forkchoice_state(
2703 &self,
2704 state: ForkchoiceState,
2705 ) -> Result<(), OnForkChoiceUpdated> {
2706 self.update_finalized_block(state.finalized_block_hash)?;
2712
2713 self.update_safe_block(state.safe_block_hash)
2719 }
2720
2721 fn process_payload_attributes(
2726 &self,
2727 attrs: T::PayloadAttributes,
2728 head: &N::BlockHeader,
2729 state: ForkchoiceState,
2730 version: EngineApiMessageVersion,
2731 ) -> OnForkChoiceUpdated {
2732 if let Err(err) =
2733 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2734 {
2735 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2736 return OnForkChoiceUpdated::invalid_payload_attributes()
2737 }
2738
2739 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2744 state.head_block_hash,
2745 attrs,
2746 version as u8,
2747 ) {
2748 Ok(attributes) => {
2749 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2752
2753 OnForkChoiceUpdated::updated_with_pending_payload_id(
2765 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2766 pending_payload_id,
2767 )
2768 }
2769 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2770 }
2771 }
2772
2773 pub(crate) fn remove_before(
2780 &mut self,
2781 upper_bound: BlockNumHash,
2782 finalized_hash: Option<B256>,
2783 ) -> ProviderResult<()> {
2784 let num = if let Some(hash) = finalized_hash {
2787 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2788 } else {
2789 None
2790 };
2791
2792 self.state.tree_state.remove_until(
2793 upper_bound,
2794 self.persistence_state.last_persisted_block.hash,
2795 num,
2796 );
2797 Ok(())
2798 }
2799
2800 pub fn state_provider_builder(
2805 &self,
2806 hash: B256,
2807 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2808 where
2809 P: BlockReader + StateProviderFactory + StateReader + Clone,
2810 {
2811 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2812 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2813 return Ok(Some(StateProviderBuilder::new(
2815 self.provider.clone(),
2816 historical,
2817 Some(blocks),
2818 )))
2819 }
2820
2821 if let Some(header) = self.provider.header(hash)? {
2823 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2824 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2827 }
2828
2829 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2830 Ok(None)
2831 }
2832}
2833
2834#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2840pub enum BlockStatus {
2841 Valid,
2843 Disconnected {
2845 head: BlockNumHash,
2847 missing_ancestor: BlockNumHash,
2849 },
2850}
2851
2852#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2857pub enum InsertPayloadOk {
2858 AlreadySeen(BlockStatus),
2860 Inserted(BlockStatus),
2862}