1use crate::tree::{
42 error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
43 instrumented_state::{InstrumentedStateProvider, StateProviderStats},
44 payload_processor::PayloadProcessor,
45 precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
46 sparse_trie::StateRootComputeOutcome,
47 CacheWaitDurations, CachedStateProvider, EngineApiMetrics, EngineApiTreeState, ExecutionEnv,
48 PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
49};
50use alloy_consensus::transaction::{Either, TxHashRef};
51use alloy_eip7928::BlockAccessList;
52use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
53use alloy_evm::Evm;
54use alloy_primitives::{map::B256Set, B256};
55#[cfg(feature = "trie-debug")]
56use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
57
58use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
59use reth_chain_state::{
60 CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, ExecutionTimingStats, LazyOverlay,
61};
62use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
63use reth_engine_primitives::{
64 ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
65};
66use reth_errors::{BlockExecutionError, ProviderResult};
67use reth_evm::{
68 block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
69 OnStateHook, SpecFor,
70};
71use reth_execution_cache::CacheStats;
72use reth_payload_primitives::{
73 BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
74};
75use reth_primitives_traits::{
76 AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
77 RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
78};
79use reth_provider::{
80 providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
81 ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
82 ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
83 StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
84};
85use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
86use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState, StateRoot};
87use reth_trie_db::ChangesetCache;
88use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
89use revm_primitives::{Address, KECCAK_EMPTY};
90use std::{
91 collections::HashMap,
92 panic::{self, AssertUnwindSafe},
93 sync::{
94 atomic::{AtomicUsize, Ordering},
95 mpsc::RecvTimeoutError,
96 Arc,
97 },
98 time::Duration,
99};
100use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
101
102pub type ValidationOutcome<N, E = InsertPayloadError<BlockTy<N>>> =
104 Result<(ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>), E>;
105
106type LazyHashedPostState = reth_tasks::LazyHandle<HashedPostState>;
108
109type InsertPayloadResult<N> = Result<
111 (ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>),
112 InsertPayloadError<<N as NodePrimitives>::Block>,
113>;
114
115pub struct TreeCtx<'a, N: NodePrimitives> {
120 state: &'a mut EngineApiTreeState<N>,
122 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
124}
125
126impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("TreeCtx")
129 .field("state", &"EngineApiTreeState")
130 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
131 .finish()
132 }
133}
134
135impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
136 pub const fn new(
138 state: &'a mut EngineApiTreeState<N>,
139 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
140 ) -> Self {
141 Self { state, canonical_in_memory_state }
142 }
143}
144
145impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
146 pub const fn state(&self) -> &EngineApiTreeState<N> {
148 &*self.state
149 }
150
151 pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
153 self.state
154 }
155
156 pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
158 self.canonical_in_memory_state
159 }
160}
161
162#[derive(derive_more::Debug)]
170pub struct BasicEngineValidator<P, Evm, V>
171where
172 Evm: ConfigureEvm,
173{
174 provider: P,
176 consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
178 evm_config: Evm,
180 config: TreeConfig,
182 payload_processor: PayloadProcessor<Evm>,
184 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
186 precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
188 #[debug(skip)]
190 invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
191 metrics: EngineApiMetrics,
193 validator: V,
195 changeset_cache: ChangesetCache,
197 runtime: reth_tasks::Runtime,
199}
200
201impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
202where
203 N: NodePrimitives,
204 P: DatabaseProviderFactory<
205 Provider: BlockReader
206 + StageCheckpointReader
207 + PruneCheckpointReader
208 + ChangeSetReader
209 + StorageChangeSetReader
210 + BlockNumReader
211 + StorageSettingsCache,
212 > + BlockReader<Header = N::BlockHeader>
213 + ChangeSetReader
214 + BlockNumReader
215 + StateProviderFactory
216 + StateReader
217 + HashedPostStateProvider
218 + Clone
219 + 'static,
220 Evm: ConfigureEvm<Primitives = N> + 'static,
221{
222 #[allow(clippy::too_many_arguments)]
224 pub fn new(
225 provider: P,
226 consensus: Arc<dyn FullConsensus<N>>,
227 evm_config: Evm,
228 validator: V,
229 config: TreeConfig,
230 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
231 changeset_cache: ChangesetCache,
232 runtime: reth_tasks::Runtime,
233 ) -> Self {
234 let precompile_cache_map = PrecompileCacheMap::default();
235 let payload_processor = PayloadProcessor::new(
236 runtime.clone(),
237 evm_config.clone(),
238 &config,
239 precompile_cache_map.clone(),
240 );
241 Self {
242 provider,
243 consensus,
244 evm_config,
245 payload_processor,
246 precompile_cache_map,
247 precompile_cache_metrics: HashMap::new(),
248 config,
249 invalid_block_hook,
250 metrics: EngineApiMetrics::default(),
251 validator,
252 changeset_cache,
253 runtime,
254 }
255 }
256
257 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
259 pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
260 &self,
261 input: BlockOrPayload<T>,
262 ) -> Result<SealedBlock<N::Block>, NewPayloadError>
263 where
264 V: PayloadValidator<T, Block = N::Block>,
265 {
266 match input {
267 BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
268 BlockOrPayload::Block(block) => Ok(block),
269 }
270 }
271
272 pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
274 &self,
275 input: &BlockOrPayload<T>,
276 ) -> Result<EvmEnvFor<Evm>, Evm::Error>
277 where
278 V: PayloadValidator<T, Block = N::Block>,
279 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
280 {
281 match input {
282 BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
283 BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
284 }
285 }
286
287 pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
289 &'a self,
290 input: &'a BlockOrPayload<T>,
291 ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
292 where
293 V: PayloadValidator<T, Block = N::Block>,
294 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
295 {
296 Ok(match input {
297 BlockOrPayload::Payload(payload) => {
298 let iter = self
299 .evm_config
300 .tx_iterator_for_payload(payload)
301 .map_err(NewPayloadError::other)?;
302 Either::Left(iter)
303 }
304 BlockOrPayload::Block(block) => {
305 let txs = block.body().clone_transactions();
306 let convert = |tx: N::SignedTx| tx.try_into_recovered();
307 Either::Right((txs, convert))
308 }
309 })
310 }
311
312 pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
314 &self,
315 input: &'a BlockOrPayload<T>,
316 ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
317 where
318 V: PayloadValidator<T, Block = N::Block>,
319 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
320 {
321 match input {
322 BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
323 BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
324 }
325 }
326
327 fn handle_execution_error<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
332 &self,
333 input: BlockOrPayload<T>,
334 execution_err: InsertBlockErrorKind,
335 parent_block: &SealedHeader<N::BlockHeader>,
336 ) -> InsertPayloadResult<N>
337 where
338 V: PayloadValidator<T, Block = N::Block>,
339 {
340 debug!(
341 target: "engine::tree::payload_validator",
342 ?execution_err,
343 block = ?input.num_hash(),
344 "Block execution failed, checking for header validation errors"
345 );
346
347 let block = self.convert_to_block(input)?;
350
351 if let Err(consensus_err) = self.validate_block_inner(&block, None) {
353 return Err(InsertBlockError::new(block, consensus_err.into()).into())
355 }
356
357 if let Err(consensus_err) =
359 self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
360 {
361 return Err(InsertBlockError::new(block, consensus_err.into()).into())
363 }
364
365 Err(InsertBlockError::new(block, execution_err).into())
367 }
368
369 #[instrument(
377 level = "debug",
378 target = "engine::tree::payload_validator",
379 skip_all,
380 fields(
381 parent = ?input.parent_hash(),
382 type_name = ?input.type_name(),
383 )
384 )]
385 pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
386 &mut self,
387 input: BlockOrPayload<T>,
388 mut ctx: TreeCtx<'_, N>,
389 ) -> InsertPayloadResult<N>
390 where
391 V: PayloadValidator<T, Block = N::Block> + Clone,
392 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
393 {
394 let is_payload = matches!(&input, BlockOrPayload::Payload(_));
398 let convert_to_block = match &input {
399 BlockOrPayload::Payload(_) => {
400 let payload_clone = input.clone();
401 let validator = self.validator.clone();
402 let handle = self.payload_processor.executor().spawn_blocking_named(
403 "payload-convert",
404 move || {
405 let BlockOrPayload::Payload(payload) = payload_clone else {
406 unreachable!()
407 };
408 validator.convert_payload_to_block(payload)
409 },
410 );
411 Either::Left(handle)
412 }
413 BlockOrPayload::Block(_) => Either::Right(()),
414 };
415
416 let convert_to_block =
419 move |input: BlockOrPayload<T>| -> Result<SealedBlock<N::Block>, NewPayloadError> {
420 match convert_to_block {
421 Either::Left(handle) => handle.try_into_inner().expect("sole handle"),
422 Either::Right(()) => {
423 let BlockOrPayload::Block(block) = input else { unreachable!() };
424 Ok(block)
425 }
426 }
427 };
428
429 macro_rules! ensure_ok {
432 ($expr:expr) => {
433 match $expr {
434 Ok(val) => val,
435 Err(e) => {
436 let block = convert_to_block(input)?;
437 return Err(InsertBlockError::new(block, e.into()).into())
438 }
439 }
440 };
441 }
442
443 macro_rules! ensure_ok_post_block {
445 ($expr:expr, $block:expr) => {
446 match $expr {
447 Ok(val) => val,
448 Err(e) => {
449 return Err(
450 InsertBlockError::new($block.into_sealed_block(), e.into()).into()
451 )
452 }
453 }
454 };
455 }
456
457 let parent_hash = input.parent_hash();
458
459 trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
460 let _enter =
461 debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
462 let Some(provider_builder) =
463 ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
464 else {
465 return Err(InsertBlockError::new(
467 convert_to_block(input)?,
468 ProviderError::HeaderNotFound(parent_hash.into()).into(),
469 )
470 .into())
471 };
472 let mut state_provider = ensure_ok!(provider_builder.build());
473 drop(_enter);
474
475 let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
478 else {
479 return Err(InsertBlockError::new(
480 convert_to_block(input)?,
481 ProviderError::HeaderNotFound(parent_hash.into()).into(),
482 )
483 .into())
484 };
485
486 let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
487 .in_scope(|| self.evm_env_for(&input))
488 .map_err(NewPayloadError::other)?;
489
490 let env = ExecutionEnv {
491 evm_env,
492 hash: input.hash(),
493 parent_hash: input.parent_hash(),
494 parent_state_root: parent_block.state_root(),
495 transaction_count: input.transaction_count(),
496 gas_used: input.gas_used(),
497 withdrawals: input.withdrawals().map(|w| w.to_vec()),
498 };
499
500 let strategy = self.plan_state_root_computation();
502
503 debug!(
504 target: "engine::tree::payload_validator",
505 ?strategy,
506 "Decided which state root algorithm to run"
507 );
508
509 let txs = self.tx_iterator_for(&input)?;
511
512 let block_access_list = ensure_ok!(input
514 .block_access_list()
515 .transpose()
516 .map_err(Box::<dyn std::error::Error + Send + Sync>::from))
518 .map(Arc::new);
519
520 let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
523
524 let overlay_factory =
527 OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
528 .with_block_hash(Some(anchor_hash))
529 .with_lazy_overlay(lazy_overlay);
530
531 let mut handle = ensure_ok!(self.spawn_payload_processor(
533 env.clone(),
534 txs,
535 provider_builder,
536 overlay_factory.clone(),
537 strategy,
538 block_access_list,
539 ));
540
541 let slow_block_enabled = self.config.slow_block_threshold().is_some();
543 let cache_stats = slow_block_enabled.then(|| Arc::new(CacheStats::default()));
544
545 if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
548 state_provider = Box::new(
549 CachedStateProvider::new(state_provider, caches, cache_metrics)
550 .with_cache_stats(cache_stats.clone()),
551 );
552 };
553
554 let state_provider_stats = if slow_block_enabled || self.config.state_provider_metrics() {
555 let instrumented_state_provider =
556 InstrumentedStateProvider::new(state_provider, "engine");
557 let stats = slow_block_enabled.then(|| instrumented_state_provider.stats());
558 state_provider = Box::new(instrumented_state_provider);
559 stats
560 } else {
561 None
562 };
563
564 let execute_block_start = Instant::now();
568 let (output, senders, receipt_root_rx) =
569 match self.execute_block(state_provider, env, &input, &mut handle) {
570 Ok(output) => output,
571 Err(err) => return self.handle_execution_error(input, err, &parent_block),
572 };
573 let execution_duration = execute_block_start.elapsed();
574
575 handle.stop_prewarming_execution();
577
578 let output = Arc::new(output);
582
583 let valid_block_tx = handle.terminate_caching(Some(output.clone()));
586
587 let hashed_state_output = output.clone();
591 let hashed_state_provider = self.provider.clone();
592 let hashed_state: LazyHashedPostState =
593 self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
594 let _span = debug_span!(
595 target: "engine::tree::payload_validator",
596 "hashed_post_state",
597 )
598 .entered();
599 hashed_state_provider.hashed_post_state(&hashed_state_output.state)
600 });
601
602 let block = convert_to_block(input)?;
603 let transaction_root = is_payload.then(|| {
604 let body = block.body().clone();
605 let parent_span = Span::current();
606 let num_hash = block.num_hash();
607 self.payload_processor.executor().spawn_blocking_named("payload-tx-root", move || {
608 let _span =
609 debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
610 .entered();
611 body.calculate_tx_root()
612 })
613 });
614 let block = block.with_senders(senders);
615
616 let receipt_root_bloom = {
618 let _enter = debug_span!(
619 target: "engine::tree::payload_validator",
620 "wait_receipt_root",
621 )
622 .entered();
623
624 receipt_root_rx
625 .blocking_recv()
626 .inspect_err(|_| {
627 tracing::error!(
628 target: "engine::tree::payload_validator",
629 "Receipt root task dropped sender without result, receipt root calculation likely aborted"
630 );
631 })
632 .ok()
633 };
634 let transaction_root = transaction_root.map(|handle| {
635 let _span =
636 debug_span!(target: "engine::tree::payload_validator", "wait_payload_tx_root")
637 .entered();
638 handle.try_into_inner().expect("sole handle")
639 });
640
641 let hashed_state = ensure_ok_post_block!(
642 self.validate_post_execution(
643 &block,
644 &parent_block,
645 &output,
646 &mut ctx,
647 transaction_root,
648 receipt_root_bloom,
649 hashed_state,
650 ),
651 block
652 );
653
654 let root_time = Instant::now();
655 let mut maybe_state_root = None;
656 let mut state_root_task_failed = false;
657 #[cfg(feature = "trie-debug")]
658 let mut trie_debug_recorders = Vec::new();
659
660 match strategy {
661 StateRootStrategy::StateRootTask => {
662 debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
663
664 let task_result = ensure_ok_post_block!(
665 self.await_state_root_with_timeout(
666 &mut handle,
667 overlay_factory.clone(),
668 &hashed_state,
669 ),
670 block
671 );
672
673 match task_result {
674 Ok(StateRootComputeOutcome {
675 state_root,
676 trie_updates,
677 #[cfg(feature = "trie-debug")]
678 debug_recorders,
679 }) => {
680 let elapsed = root_time.elapsed();
681 info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
682
683 #[cfg(feature = "trie-debug")]
684 {
685 trie_debug_recorders = debug_recorders;
686 }
687
688 if self.config.always_compare_trie_updates() {
690 let _has_diff = self.compare_trie_updates_with_serial(
691 overlay_factory.clone(),
692 &hashed_state,
693 trie_updates.as_ref().clone(),
694 );
695 #[cfg(feature = "trie-debug")]
696 if _has_diff {
697 Self::write_trie_debug_recorders(
698 block.header().number(),
699 &trie_debug_recorders,
700 );
701 }
702 }
703
704 if state_root == block.header().state_root() {
706 maybe_state_root = Some((state_root, trie_updates, elapsed))
707 } else {
708 warn!(
709 target: "engine::tree::payload_validator",
710 ?state_root,
711 block_state_root = ?block.header().state_root(),
712 "State root task returned incorrect state root"
713 );
714 #[cfg(feature = "trie-debug")]
715 Self::write_trie_debug_recorders(
716 block.header().number(),
717 &trie_debug_recorders,
718 );
719 state_root_task_failed = true;
720 }
721 }
722 Err(error) => {
723 debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
724 state_root_task_failed = true;
725 }
726 }
727 }
728 StateRootStrategy::Parallel => {
729 debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
730 match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
731 Ok(result) => {
732 let elapsed = root_time.elapsed();
733 info!(
734 target: "engine::tree::payload_validator",
735 regular_state_root = ?result.0,
736 ?elapsed,
737 "Regular root task finished"
738 );
739 maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
740 }
741 Err(error) => {
742 debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
743 }
744 }
745 }
746 StateRootStrategy::Synchronous => {}
747 }
748
749 let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
753 maybe_state_root
754 {
755 maybe_state_root
756 } else {
757 if self.config.state_root_fallback() {
759 debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
760 } else {
761 warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
762 self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
763 }
764
765 let (root, updates) = ensure_ok_post_block!(
766 Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
767 block
768 );
769
770 if state_root_task_failed {
771 self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
772 }
773
774 (root, Arc::new(updates), root_time.elapsed())
775 };
776
777 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
778 self.metrics
779 .record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
780 debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
781
782 if state_root != block.header().state_root() {
784 #[cfg(feature = "trie-debug")]
785 Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
786
787 self.on_invalid_block(
789 &parent_block,
790 &block,
791 &output,
792 Some((&trie_output, state_root)),
793 ctx.state_mut(),
794 );
795 let block_state_root = block.header().state_root();
796 return Err(InsertBlockError::new(
797 block.into_sealed_block(),
798 ConsensusError::BodyStateRootDiff(
799 GotExpected { got: state_root, expected: block_state_root }.into(),
800 )
801 .into(),
802 )
803 .into())
804 }
805
806 let timing_stats = state_provider_stats.map(|stats| {
807 self.calculate_timing_stats(
808 &block,
809 stats,
810 cache_stats,
811 &output,
812 execution_duration,
813 root_elapsed,
814 )
815 });
816
817 if let Some(valid_block_tx) = valid_block_tx {
818 let _ = valid_block_tx.send(());
819 }
820
821 let changeset_provider =
826 ensure_ok_post_block!(overlay_factory.database_provider_ro(), block);
827
828 let executed_block = self.spawn_deferred_trie_task(
829 block,
830 output,
831 &ctx,
832 hashed_state,
833 trie_output,
834 changeset_provider,
835 );
836 Ok((executed_block, timing_stats))
837 }
838
839 fn sealed_header_by_hash(
841 &self,
842 hash: B256,
843 state: &EngineApiTreeState<N>,
844 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
845 let header = state.tree_state.sealed_header_by_hash(&hash);
847
848 if header.is_some() {
849 Ok(header)
850 } else {
851 self.provider.sealed_header_by_hash(hash)
852 }
853 }
854
855 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
858 fn validate_block_inner(
859 &self,
860 block: &SealedBlock<N::Block>,
861 transaction_root: Option<B256>,
862 ) -> Result<(), ConsensusError> {
863 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
864 error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
865 return Err(e)
866 }
867
868 if let Err(e) =
869 self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
870 {
871 error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
872 return Err(e)
873 }
874
875 Ok(())
876 }
877
878 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
886 #[expect(clippy::type_complexity)]
887 fn execute_block<S, Err, T>(
888 &mut self,
889 state_provider: S,
890 env: ExecutionEnv<Evm>,
891 input: &BlockOrPayload<T>,
892 handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
893 ) -> Result<
894 (
895 BlockExecutionOutput<N::Receipt>,
896 Vec<Address>,
897 tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
898 ),
899 InsertBlockErrorKind,
900 >
901 where
902 S: StateProvider + Send,
903 Err: core::error::Error + Send + Sync + 'static,
904 V: PayloadValidator<T, Block = N::Block>,
905 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
906 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
907 {
908 debug!(target: "engine::tree::payload_validator", "Executing block");
909
910 let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
911 State::builder()
912 .with_database(StateProviderDatabase::new(state_provider))
913 .with_bundle_update()
914 .build()
915 });
916
917 let (spec_id, mut executor) = {
918 let _span = debug_span!(target: "engine::tree", "create_evm").entered();
919 let spec_id = *env.evm_env.spec_id();
920 let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
921 let ctx = self
922 .execution_ctx_for(input)
923 .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
924 let executor = self.evm_config.create_executor(evm, ctx);
925 (spec_id, executor)
926 };
927
928 if !self.config.precompile_cache_disabled() {
929 let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
930 executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
931 |address, precompile| {
932 let metrics = self
933 .precompile_cache_metrics
934 .entry(*address)
935 .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
936 .clone();
937 CachedPrecompile::wrap(
938 precompile,
939 self.precompile_cache_map.cache_for_address(*address),
940 spec_id,
941 Some(metrics),
942 )
943 },
944 );
945 }
946
947 let receipts_len = input.transaction_count();
950 let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
951 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
952 let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
953 self.payload_processor
954 .executor()
955 .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
956
957 let transaction_count = input.transaction_count();
958 let executed_tx_index = Arc::clone(handle.executed_tx_index());
959 let executor = executor.with_state_hook(
960 handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
961 );
962
963 let execution_start = Instant::now();
964
965 let (executor, senders) = self.execute_transactions(
967 executor,
968 transaction_count,
969 handle.iter_transactions(),
970 &receipt_tx,
971 &executed_tx_index,
972 )?;
973 drop(receipt_tx);
974
975 let post_exec_start = Instant::now();
977 let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
978 .in_scope(|| executor.finish())
979 .map(|(evm, result)| (evm.into_db(), result))?;
980 self.metrics.record_post_execution(post_exec_start.elapsed());
981
982 debug_span!(target: "engine::tree", "merge_transitions")
984 .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
985
986 let output = BlockExecutionOutput { result, state: db.take_bundle() };
987
988 let execution_duration = execution_start.elapsed();
989 self.metrics.record_block_execution(&output, execution_duration);
990 self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
991 debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
992
993 Ok((output, senders, result_rx))
994 }
995
996 fn execute_transactions<E, Tx, InnerTx, Err>(
1006 &self,
1007 mut executor: E,
1008 transaction_count: usize,
1009 transactions: impl Iterator<Item = Result<Tx, Err>>,
1010 receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
1011 executed_tx_index: &AtomicUsize,
1012 ) -> Result<(E, Vec<Address>), BlockExecutionError>
1013 where
1014 E: BlockExecutor<Receipt = N::Receipt>,
1015 Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
1016 InnerTx: TxHashRef,
1017 Err: core::error::Error + Send + Sync + 'static,
1018 {
1019 let mut senders = Vec::with_capacity(transaction_count);
1020
1021 let pre_exec_start = Instant::now();
1023 debug_span!(target: "engine::tree", "pre_execution")
1024 .in_scope(|| executor.apply_pre_execution_changes())?;
1025 self.metrics.record_pre_execution(pre_exec_start.elapsed());
1026
1027 let exec_span = debug_span!(target: "engine::tree", "execution").entered();
1029 let mut transactions = transactions.into_iter();
1030 let mut last_sent_len = 0usize;
1035 loop {
1036 let wait_start = Instant::now();
1039 let Some(tx_result) = transactions.next() else { break };
1040 self.metrics.record_transaction_wait(wait_start.elapsed());
1041
1042 let tx = tx_result.map_err(BlockExecutionError::other)?;
1043 let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
1044
1045 senders.push(tx_signer);
1046
1047 let _enter = debug_span!(
1048 target: "engine::tree",
1049 "execute tx",
1050 tx_index = senders.len() - 1,
1051 )
1052 .entered();
1053 trace!(target: "engine::tree", "Executing transaction");
1054
1055 let tx_start = Instant::now();
1056 executor.execute_transaction(tx)?;
1057 self.metrics.record_transaction_execution(tx_start.elapsed());
1058
1059 executed_tx_index.store(senders.len(), Ordering::Relaxed);
1061
1062 let current_len = executor.receipts().len();
1063 if current_len > last_sent_len {
1064 last_sent_len = current_len;
1065 if let Some(receipt) = executor.receipts().last() {
1067 let tx_index = current_len - 1;
1068 let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
1069 }
1070 }
1071 }
1072 drop(exec_span);
1073
1074 Ok((executor, senders))
1075 }
1076
1077 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1088 fn compute_state_root_parallel(
1089 &self,
1090 overlay_factory: OverlayStateProviderFactory<P>,
1091 hashed_state: &LazyHashedPostState,
1092 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
1093 let hashed_state = hashed_state.get();
1094 let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1098 let overlay_factory =
1099 overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1100 ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
1101 .incremental_root_with_updates()
1102 }
1103
1104 fn compute_state_root_serial(
1110 overlay_factory: OverlayStateProviderFactory<P>,
1111 hashed_state: &LazyHashedPostState,
1112 ) -> ProviderResult<(B256, TrieUpdates)> {
1113 let hashed_state = hashed_state.get();
1114 let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1118 let overlay_factory =
1119 overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1120
1121 let provider = overlay_factory.database_provider_ro()?;
1122
1123 Ok(StateRoot::new(&provider, &provider)
1124 .with_prefix_sets(prefix_sets)
1125 .root_with_updates()?)
1126 }
1127
1128 #[instrument(
1142 level = "debug",
1143 target = "engine::tree::payload_validator",
1144 name = "await_state_root",
1145 skip_all
1146 )]
1147 fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
1148 &self,
1149 handle: &mut PayloadHandle<Tx, Err, R>,
1150 overlay_factory: OverlayStateProviderFactory<P>,
1151 hashed_state: &LazyHashedPostState,
1152 ) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
1153 let Some(timeout) = self.config.state_root_task_timeout() else {
1154 return Ok(handle.state_root());
1155 };
1156
1157 let task_rx = handle.take_state_root_rx();
1158
1159 match task_rx.recv_timeout(timeout) {
1160 Ok(result) => Ok(result),
1161 Err(RecvTimeoutError::Disconnected) => {
1162 Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
1163 }
1164 Err(RecvTimeoutError::Timeout) => {
1165 warn!(
1166 target: "engine::tree::payload_validator",
1167 ?timeout,
1168 "State root task timed out, spawning sequential fallback"
1169 );
1170 self.metrics.block_validation.state_root_task_timeout_total.increment(1);
1171
1172 let (seq_tx, seq_rx) =
1173 std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
1174
1175 let seq_overlay = overlay_factory;
1176 let seq_hashed_state = hashed_state.clone();
1177 self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
1178 let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
1179 let _ = seq_tx.send(result);
1180 });
1181
1182 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
1183
1184 loop {
1185 match task_rx.recv_timeout(POLL_INTERVAL) {
1186 Ok(result) => {
1187 debug!(
1188 target: "engine::tree::payload_validator",
1189 source = "task",
1190 "State root timeout race won"
1191 );
1192 return Ok(result);
1193 }
1194 Err(RecvTimeoutError::Disconnected) => {
1195 debug!(
1196 target: "engine::tree::payload_validator",
1197 "State root task dropped, waiting for sequential fallback"
1198 );
1199 let result = seq_rx.recv().map_err(|_| {
1200 ProviderError::other(std::io::Error::other(
1201 "both state root computations failed",
1202 ))
1203 })?;
1204 let (state_root, trie_updates) = result?;
1205 return Ok(Ok(StateRootComputeOutcome {
1206 state_root,
1207 trie_updates: Arc::new(trie_updates),
1208 #[cfg(feature = "trie-debug")]
1209 debug_recorders: Vec::new(),
1210 }));
1211 }
1212 Err(RecvTimeoutError::Timeout) => {}
1213 }
1214
1215 if let Ok(result) = seq_rx.try_recv() {
1216 debug!(
1217 target: "engine::tree::payload_validator",
1218 source = "sequential",
1219 "State root timeout race won"
1220 );
1221 let (state_root, trie_updates) = result?;
1222 return Ok(Ok(StateRootComputeOutcome {
1223 state_root,
1224 trie_updates: Arc::new(trie_updates),
1225 #[cfg(feature = "trie-debug")]
1226 debug_recorders: Vec::new(),
1227 }));
1228 }
1229 }
1230 }
1231 }
1232 }
1233
1234 fn compare_trie_updates_with_serial(
1241 &self,
1242 overlay_factory: OverlayStateProviderFactory<P>,
1243 hashed_state: &LazyHashedPostState,
1244 task_trie_updates: TrieUpdates,
1245 ) -> bool {
1246 debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1247
1248 match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
1249 Ok((serial_root, serial_trie_updates)) => {
1250 debug!(
1251 target: "engine::tree::payload_validator",
1252 ?serial_root,
1253 "Serial state root computation finished for comparison"
1254 );
1255
1256 match overlay_factory.database_provider_ro() {
1258 Ok(provider) => {
1259 match super::trie_updates::compare_trie_updates(
1260 &provider,
1261 task_trie_updates,
1262 serial_trie_updates,
1263 ) {
1264 Ok(has_diff) => return has_diff,
1265 Err(err) => {
1266 warn!(
1267 target: "engine::tree::payload_validator",
1268 %err,
1269 "Error comparing trie updates"
1270 );
1271 return true;
1272 }
1273 }
1274 }
1275 Err(err) => {
1276 warn!(
1277 target: "engine::tree::payload_validator",
1278 %err,
1279 "Failed to get database provider for trie update comparison"
1280 );
1281 }
1282 }
1283 }
1284 Err(err) => {
1285 warn!(
1286 target: "engine::tree::payload_validator",
1287 %err,
1288 "Failed to compute serial state root for comparison"
1289 );
1290 }
1291 }
1292 false
1293 }
1294
1295 #[cfg(feature = "trie-debug")]
1300 fn write_trie_debug_recorders(
1301 block_number: u64,
1302 recorders: &[(Option<B256>, TrieDebugRecorder)],
1303 ) {
1304 let path = format!("trie_debug_block_{block_number}.json");
1305 match serde_json::to_string_pretty(recorders) {
1306 Ok(json) => match std::fs::write(&path, json) {
1307 Ok(()) => {
1308 warn!(
1309 target: "engine::tree::payload_validator",
1310 %path,
1311 "Wrote trie debug recorders to file"
1312 );
1313 }
1314 Err(err) => {
1315 warn!(
1316 target: "engine::tree::payload_validator",
1317 %err,
1318 %path,
1319 "Failed to write trie debug recorders"
1320 );
1321 }
1322 },
1323 Err(err) => {
1324 warn!(
1325 target: "engine::tree::payload_validator",
1326 %err,
1327 "Failed to serialize trie debug recorders"
1328 );
1329 }
1330 }
1331 }
1332
1333 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1345 #[expect(clippy::too_many_arguments)]
1346 fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1347 &self,
1348 block: &RecoveredBlock<N::Block>,
1349 parent_block: &SealedHeader<N::BlockHeader>,
1350 output: &BlockExecutionOutput<N::Receipt>,
1351 ctx: &mut TreeCtx<'_, N>,
1352 transaction_root: Option<B256>,
1353 receipt_root_bloom: Option<ReceiptRootBloom>,
1354 hashed_state: LazyHashedPostState,
1355 ) -> Result<LazyHashedPostState, InsertBlockErrorKind>
1356 where
1357 V: PayloadValidator<T, Block = N::Block>,
1358 {
1359 let start = Instant::now();
1360
1361 trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1362 if let Err(e) = self.validate_block_inner(block, transaction_root) {
1364 return Err(e.into())
1365 }
1366
1367 let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
1369 if let Err(e) =
1370 self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
1371 {
1372 warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
1373 return Err(e.into())
1374 }
1375 drop(_enter);
1376
1377 let _enter =
1379 debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1380 .entered();
1381 if let Err(err) =
1382 self.consensus.validate_block_post_execution(block, output, receipt_root_bloom)
1383 {
1384 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1386 return Err(err.into())
1387 }
1388 drop(_enter);
1389
1390 let hashed_state_ref =
1393 debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
1394 .in_scope(|| hashed_state.get());
1395
1396 let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
1397 if let Err(err) =
1398 self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, block)
1399 {
1400 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1402 return Err(err.into())
1403 }
1404
1405 self.metrics
1407 .block_validation
1408 .post_execution_validation_duration
1409 .record(start.elapsed().as_secs_f64());
1410
1411 Ok(hashed_state)
1412 }
1413
1414 #[allow(clippy::too_many_arguments)]
1430 #[instrument(
1431 level = "debug",
1432 target = "engine::tree::payload_validator",
1433 skip_all,
1434 fields(?strategy)
1435 )]
1436 fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1437 &mut self,
1438 env: ExecutionEnv<Evm>,
1439 txs: T,
1440 provider_builder: StateProviderBuilder<N, P>,
1441 overlay_factory: OverlayStateProviderFactory<P>,
1442 strategy: StateRootStrategy,
1443 block_access_list: Option<Arc<BlockAccessList>>,
1444 ) -> Result<
1445 PayloadHandle<
1446 impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1447 impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1448 N::Receipt,
1449 >,
1450 InsertBlockErrorKind,
1451 > {
1452 match strategy {
1453 StateRootStrategy::StateRootTask => {
1454 let spawn_start = Instant::now();
1455
1456 let handle = self.payload_processor.spawn(
1458 env,
1459 txs,
1460 provider_builder,
1461 overlay_factory,
1462 &self.config,
1463 block_access_list,
1464 );
1465
1466 self.metrics
1468 .block_validation
1469 .spawn_payload_processor
1470 .record(spawn_start.elapsed().as_secs_f64());
1471
1472 Ok(handle)
1473 }
1474 StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
1475 let start = Instant::now();
1476 let handle = self.payload_processor.spawn_cache_exclusive(
1477 env,
1478 txs,
1479 provider_builder,
1480 block_access_list,
1481 );
1482
1483 self.metrics
1485 .block_validation
1486 .spawn_payload_processor
1487 .record(start.elapsed().as_secs_f64());
1488
1489 Ok(handle)
1490 }
1491 }
1492 }
1493
1494 fn state_provider_builder(
1499 &self,
1500 hash: B256,
1501 state: &EngineApiTreeState<N>,
1502 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1503 if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1504 debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1505 return Ok(Some(StateProviderBuilder::new(
1507 self.provider.clone(),
1508 historical,
1509 Some(blocks),
1510 )))
1511 }
1512
1513 if let Some(header) = self.provider.header(hash)? {
1515 debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1516 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1519 }
1520
1521 debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1522 Ok(None)
1523 }
1524
1525 const fn plan_state_root_computation(&self) -> StateRootStrategy {
1530 if self.config.state_root_fallback() {
1531 StateRootStrategy::Synchronous
1532 } else if self.config.use_state_root_task() {
1533 StateRootStrategy::StateRootTask
1534 } else {
1535 StateRootStrategy::Parallel
1536 }
1537 }
1538
1539 fn on_invalid_block(
1541 &self,
1542 parent_header: &SealedHeader<N::BlockHeader>,
1543 block: &RecoveredBlock<N::Block>,
1544 output: &BlockExecutionOutput<N::Receipt>,
1545 trie_updates: Option<(&TrieUpdates, B256)>,
1546 state: &mut EngineApiTreeState<N>,
1547 ) {
1548 if state.invalid_headers.get(&block.hash()).is_some() {
1549 return
1551 }
1552 self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1553 }
1554
1555 fn get_parent_lazy_overlay(
1565 parent_hash: B256,
1566 state: &EngineApiTreeState<N>,
1567 ) -> (Option<LazyOverlay>, B256) {
1568 let (anchor_hash, blocks) =
1570 state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
1571
1572 if blocks.is_empty() {
1573 debug!(target: "engine::tree::payload_validator", "Parent found on disk, no lazy overlay needed");
1574 return (None, anchor_hash);
1575 }
1576
1577 if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
1579 debug!(
1580 target: "engine::tree::payload_validator",
1581 %parent_hash,
1582 %anchor_hash,
1583 "Using cached canonical overlay"
1584 );
1585 return (Some(cached.overlay.clone()), cached.anchor_hash);
1586 }
1587
1588 debug!(
1589 target: "engine::tree::payload_validator",
1590 %anchor_hash,
1591 num_blocks = blocks.len(),
1592 "Creating lazy overlay for in-memory blocks"
1593 );
1594
1595 let handles: Vec<DeferredTrieData> = blocks.iter().map(|b| b.trie_data_handle()).collect();
1597
1598 (Some(LazyOverlay::new(anchor_hash, handles)), anchor_hash)
1599 }
1600
1601 fn spawn_deferred_trie_task(
1618 &self,
1619 block: RecoveredBlock<N::Block>,
1620 execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1621 ctx: &TreeCtx<'_, N>,
1622 hashed_state: LazyHashedPostState,
1623 trie_output: Arc<TrieUpdates>,
1624 changeset_provider: impl TrieCursorFactory + Send + 'static,
1625 ) -> ExecutedBlock<N> {
1626 let (anchor_hash, overlay_blocks) = ctx
1628 .state()
1629 .tree_state
1630 .blocks_by_hash(block.parent_hash())
1631 .unwrap_or_else(|| (block.parent_hash(), Vec::new()));
1632
1633 let ancestors: Vec<DeferredTrieData> =
1636 overlay_blocks.iter().rev().map(|b| b.trie_data_handle()).collect();
1637
1638 let hashed_state = match hashed_state.try_into_inner() {
1642 Ok(state) => Arc::new(state),
1643 Err(handle) => Arc::new(handle.get().clone()),
1644 };
1645 let deferred_trie_data =
1646 DeferredTrieData::pending(hashed_state, trie_output, anchor_hash, ancestors);
1647 let deferred_handle_task = deferred_trie_data.clone();
1648 let block_validation_metrics = self.metrics.block_validation.clone();
1649
1650 let block_hash = block.hash();
1652 let block_number = block.number();
1653
1654 let pending_changeset_guard = self.changeset_cache.register_pending(block_hash);
1658
1659 let compute_trie_input_task = move || {
1662 let _span = debug_span!(
1663 target: "engine::tree::payload_validator",
1664 "compute_trie_input_task",
1665 block_number
1666 )
1667 .entered();
1668
1669 let result = panic::catch_unwind(AssertUnwindSafe(|| {
1670 let compute_start = Instant::now();
1671 let computed = deferred_handle_task.wait_cloned();
1672 block_validation_metrics
1673 .deferred_trie_compute_duration
1674 .record(compute_start.elapsed().as_secs_f64());
1675
1676 block_validation_metrics
1678 .hashed_post_state_size
1679 .record(computed.hashed_state.total_len() as f64);
1680 block_validation_metrics
1681 .trie_updates_sorted_size
1682 .record(computed.trie_updates.total_len() as f64);
1683 if let Some(anchored) = &computed.anchored_trie_input {
1684 block_validation_metrics
1685 .anchored_overlay_trie_updates_size
1686 .record(anchored.trie_input.nodes.total_len() as f64);
1687 block_validation_metrics
1688 .anchored_overlay_hashed_state_size
1689 .record(anchored.trie_input.state.total_len() as f64);
1690 }
1691
1692 let changeset_start = Instant::now();
1696
1697 match reth_trie::changesets::compute_trie_changesets(
1698 &changeset_provider,
1699 &computed.trie_updates,
1700 ) {
1701 Ok(changesets) => {
1702 debug!(
1703 target: "engine::tree::changeset",
1704 ?block_number,
1705 elapsed = ?changeset_start.elapsed(),
1706 "Computed and caching changesets"
1707 );
1708
1709 pending_changeset_guard.resolve(block_number, Arc::new(changesets));
1710 }
1711 Err(e) => {
1712 warn!(
1713 target: "engine::tree::changeset",
1714 ?block_number,
1715 ?e,
1716 "Failed to compute changesets in deferred trie task"
1717 );
1718 }
1719 }
1720 }));
1721
1722 if result.is_err() {
1723 error!(
1724 target: "engine::tree::payload_validator",
1725 "Deferred trie task panicked; fallback computation will be used when trie data is accessed"
1726 );
1727 }
1728 };
1729
1730 self.payload_processor
1732 .executor()
1733 .spawn_blocking_named("trie-input", compute_trie_input_task);
1734
1735 ExecutedBlock::with_deferred_trie_data(
1736 Arc::new(block),
1737 execution_outcome,
1738 deferred_trie_data,
1739 )
1740 }
1741
1742 fn calculate_timing_stats(
1743 &self,
1744 block: &RecoveredBlock<N::Block>,
1745 provider_stats: Arc<StateProviderStats>,
1746 cache_stats: Option<Arc<CacheStats>>,
1747 output: &BlockExecutionOutput<N::Receipt>,
1748 execution_duration: Duration,
1749 state_hash_duration: Duration,
1750 ) -> Box<ExecutionTimingStats> {
1751 let accounts_read = provider_stats.total_account_fetches();
1752 let storage_read = provider_stats.total_storage_fetches();
1753 let code_read = provider_stats.total_code_fetches();
1754 let code_bytes_read = provider_stats.total_code_fetched_bytes();
1755
1756 let accounts_changed = output.state.state.len();
1758 let accounts_deleted =
1759 output.state.state.values().filter(|acc| acc.was_destroyed()).count();
1760 let storage_slots_changed =
1761 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
1762 let storage_slots_deleted = output
1763 .state
1764 .state
1765 .values()
1766 .flat_map(|account| account.storage.values())
1767 .filter(|slot| {
1768 slot.present_value.is_zero() && !slot.previous_or_original_value.is_zero()
1769 })
1770 .count();
1771
1772 let is_new_deployment = |acc: &BundleAccount| -> bool {
1774 let has_code_now = acc.info.as_ref().is_some_and(|info| info.code_hash != KECCAK_EMPTY);
1775 let had_no_code_before = acc
1776 .original_info
1777 .as_ref()
1778 .map(|info| info.code_hash == KECCAK_EMPTY)
1779 .unwrap_or(true);
1780 has_code_now && had_no_code_before
1781 };
1782
1783 let bytecodes_changed =
1784 output.state.state.values().filter(|acc| is_new_deployment(acc)).count();
1785
1786 let unique_new_code_hashes: B256Set = output
1788 .state
1789 .state
1790 .values()
1791 .filter(|acc| is_new_deployment(acc))
1792 .filter_map(|acc| acc.info.as_ref().map(|info| info.code_hash))
1793 .collect();
1794 let code_bytes_written: usize = unique_new_code_hashes
1795 .iter()
1796 .filter_map(|hash| {
1797 output.state.contracts.get(hash).map(|bytecode| bytecode.original_bytes().len())
1798 })
1799 .sum();
1800
1801 let state_read_duration = provider_stats.total_account_fetch_latency() +
1803 provider_stats.total_storage_fetch_latency() +
1804 provider_stats.total_code_fetch_latency();
1805
1806 let eip7702_delegations_set =
1809 output.state.contracts.values().filter(|bytecode| bytecode.is_eip7702()).count();
1810 let eip7702_delegations_cleared = output
1815 .state
1816 .state
1817 .values()
1818 .filter(|acc| {
1819 let original_was_eip7702 = acc
1821 .original_info
1822 .as_ref()
1823 .and_then(|info| info.code.as_ref())
1824 .map(|bytecode| bytecode.is_eip7702())
1825 .unwrap_or(false);
1826
1827 let code_now_empty =
1829 acc.info.as_ref().map(|info| info.code_hash == KECCAK_EMPTY).unwrap_or(false);
1830
1831 original_was_eip7702 && code_now_empty
1832 })
1833 .count();
1834
1835 let (account_cache_hits, account_cache_misses) = cache_stats
1837 .as_ref()
1838 .map(|s| (s.account_hits(), s.account_misses()))
1839 .unwrap_or_default();
1840 let (storage_cache_hits, storage_cache_misses) = cache_stats
1841 .as_ref()
1842 .map(|s| (s.storage_hits(), s.storage_misses()))
1843 .unwrap_or_default();
1844 let (code_cache_hits, code_cache_misses) =
1845 cache_stats.as_ref().map(|s| (s.code_hits(), s.code_misses())).unwrap_or_default();
1846
1847 Box::new(ExecutionTimingStats {
1849 block_number: block.number(),
1850 block_hash: block.hash(),
1851 gas_used: output.result.gas_used,
1852 tx_count: block.transaction_count(),
1853 execution_duration,
1854 state_read_duration,
1855 state_hash_duration,
1856 accounts_read,
1857 storage_read,
1858 code_read,
1859 code_bytes_read,
1860 accounts_changed,
1861 accounts_deleted,
1862 storage_slots_changed,
1863 storage_slots_deleted,
1864 bytecodes_changed,
1865 code_bytes_written,
1866 eip7702_delegations_set,
1867 eip7702_delegations_cleared,
1868 account_cache_hits,
1869 account_cache_misses,
1870 storage_cache_hits,
1871 storage_cache_misses,
1872 code_cache_hits,
1873 code_cache_misses,
1874 })
1875 }
1876}
1877
1878#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1880enum StateRootStrategy {
1881 StateRootTask,
1883 Parallel,
1885 Synchronous,
1887}
1888
1889pub trait EngineValidator<
1893 Types: PayloadTypes,
1894 N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
1895>: Send + Sync + 'static
1896{
1897 fn validate_payload_attributes_against_header(
1907 &self,
1908 attr: &Types::PayloadAttributes,
1909 header: &N::BlockHeader,
1910 ) -> Result<(), InvalidPayloadAttributesError>;
1911
1912 fn convert_payload_to_block(
1921 &self,
1922 payload: Types::ExecutionData,
1923 ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
1924
1925 fn validate_payload(
1927 &mut self,
1928 payload: Types::ExecutionData,
1929 ctx: TreeCtx<'_, N>,
1930 ) -> ValidationOutcome<N>;
1931
1932 fn validate_block(
1934 &mut self,
1935 block: SealedBlock<N::Block>,
1936 ctx: TreeCtx<'_, N>,
1937 ) -> ValidationOutcome<N>;
1938
1939 fn on_inserted_executed_block(&self, block: ExecutedBlock<N>);
1944}
1945
1946impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
1947where
1948 P: DatabaseProviderFactory<
1949 Provider: BlockReader
1950 + StageCheckpointReader
1951 + PruneCheckpointReader
1952 + ChangeSetReader
1953 + StorageChangeSetReader
1954 + BlockNumReader
1955 + StorageSettingsCache,
1956 > + BlockReader<Header = N::BlockHeader>
1957 + StateProviderFactory
1958 + StateReader
1959 + ChangeSetReader
1960 + BlockNumReader
1961 + HashedPostStateProvider
1962 + Clone
1963 + 'static,
1964 N: NodePrimitives,
1965 V: PayloadValidator<Types, Block = N::Block> + Clone,
1966 Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
1967 Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1968{
1969 fn validate_payload_attributes_against_header(
1970 &self,
1971 attr: &Types::PayloadAttributes,
1972 header: &N::BlockHeader,
1973 ) -> Result<(), InvalidPayloadAttributesError> {
1974 self.validator.validate_payload_attributes_against_header(attr, header)
1975 }
1976
1977 fn convert_payload_to_block(
1978 &self,
1979 payload: Types::ExecutionData,
1980 ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
1981 let block = self.validator.convert_payload_to_block(payload)?;
1982 Ok(block)
1983 }
1984
1985 fn validate_payload(
1986 &mut self,
1987 payload: Types::ExecutionData,
1988 ctx: TreeCtx<'_, N>,
1989 ) -> ValidationOutcome<N> {
1990 self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
1991 }
1992
1993 fn validate_block(
1994 &mut self,
1995 block: SealedBlock<N::Block>,
1996 ctx: TreeCtx<'_, N>,
1997 ) -> ValidationOutcome<N> {
1998 self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
1999 }
2000
2001 fn on_inserted_executed_block(&self, block: ExecutedBlock<N>) {
2002 self.payload_processor.on_inserted_executed_block(
2003 block.recovered_block.block_with_parent(),
2004 &block.execution_output.state,
2005 );
2006 }
2007}
2008
2009impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
2010where
2011 Evm: ConfigureEvm,
2012{
2013 fn wait_for_caches(&self) -> CacheWaitDurations {
2014 self.payload_processor.wait_for_caches()
2015 }
2016}
2017
2018#[derive(Debug, Clone)]
2020pub enum BlockOrPayload<T: PayloadTypes> {
2021 Payload(T::ExecutionData),
2023 Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
2025}
2026
2027impl<T: PayloadTypes> BlockOrPayload<T> {
2028 pub fn hash(&self) -> B256 {
2030 match self {
2031 Self::Payload(payload) => payload.block_hash(),
2032 Self::Block(block) => block.hash(),
2033 }
2034 }
2035
2036 pub fn num_hash(&self) -> NumHash {
2038 match self {
2039 Self::Payload(payload) => payload.num_hash(),
2040 Self::Block(block) => block.num_hash(),
2041 }
2042 }
2043
2044 pub fn parent_hash(&self) -> B256 {
2046 match self {
2047 Self::Payload(payload) => payload.parent_hash(),
2048 Self::Block(block) => block.parent_hash(),
2049 }
2050 }
2051
2052 pub fn block_with_parent(&self) -> BlockWithParent {
2054 match self {
2055 Self::Payload(payload) => payload.block_with_parent(),
2056 Self::Block(block) => block.block_with_parent(),
2057 }
2058 }
2059
2060 pub const fn type_name(&self) -> &'static str {
2062 match self {
2063 Self::Payload(_) => "payload",
2064 Self::Block(_) => "block",
2065 }
2066 }
2067
2068 pub const fn block_access_list(&self) -> Option<Result<BlockAccessList, alloy_rlp::Error>> {
2070 None
2072 }
2073
2074 pub fn transaction_count(&self) -> usize
2076 where
2077 T::ExecutionData: ExecutionPayload,
2078 {
2079 match self {
2080 Self::Payload(payload) => payload.transaction_count(),
2081 Self::Block(block) => block.transaction_count(),
2082 }
2083 }
2084
2085 pub fn withdrawals(&self) -> Option<&[Withdrawal]>
2087 where
2088 T::ExecutionData: ExecutionPayload,
2089 {
2090 match self {
2091 Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
2092 Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
2093 }
2094 }
2095
2096 pub fn gas_used(&self) -> u64
2098 where
2099 T::ExecutionData: ExecutionPayload,
2100 {
2101 match self {
2102 Self::Payload(payload) => payload.gas_used(),
2103 Self::Block(block) => block.gas_used(),
2104 }
2105 }
2106}