1use crate::tree::{
42 error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
43 instrumented_state::{InstrumentedStateProvider, StateProviderStats},
44 multiproof::{StateRootComputeOutcome, StateRootHandle},
45 payload_processor::PayloadProcessor,
46 precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
47 types::{InsertPayloadResult, ValidationOutput},
48 CacheWaitDurations, CachedStateProvider, EngineApiMetrics, EngineApiTreeState, ExecutionEnv,
49 PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
50};
51use alloy_consensus::transaction::{Either, TxHashRef};
52use alloy_eip7928::bal::DecodedBal;
53use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
54use alloy_evm::Evm;
55use alloy_primitives::{map::B256Set, B256};
56use reth_tasks::LazyHandle;
57#[cfg(feature = "trie-debug")]
58use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
59
60use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
61use reth_chain_state::{
62 CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, ExecutionTimingStats, LazyOverlay,
63};
64use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
65use reth_engine_primitives::{
66 ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
67};
68use reth_errors::{BlockExecutionError, ProviderResult};
69use reth_evm::{
70 block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
71 OnStateHook, SpecFor,
72};
73use reth_execution_cache::{CacheStats, SavedCache};
74use reth_payload_primitives::{
75 BuiltPayload, BuiltPayloadExecutedBlock, InvalidPayloadAttributesError, NewPayloadError,
76 PayloadTypes,
77};
78use reth_primitives_traits::{
79 AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
80 RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
81};
82use reth_provider::{
83 providers::{OverlayBuilder, OverlayStateProviderFactory},
84 BlockExecutionOutput, BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory,
85 DatabaseProviderROFactory, HashedPostStateProvider, ProviderError, PruneCheckpointReader,
86 StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateReader,
87 StorageChangeSetReader, StorageSettingsCache,
88};
89use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
90use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState};
91use reth_trie_db::ChangesetCache;
92use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
93use revm_primitives::{Address, KECCAK_EMPTY};
94use std::{
95 collections::HashMap,
96 panic::{self, AssertUnwindSafe},
97 sync::{
98 atomic::{AtomicUsize, Ordering},
99 mpsc::RecvTimeoutError,
100 Arc,
101 },
102 time::Duration,
103};
104use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
105
106pub use crate::tree::types::ValidationOutcome;
107
108type LazyHashedPostState = reth_tasks::LazyHandle<Arc<HashedPostState>>;
110
111const MAX_EXPECTED_GAS_USAGE_MULTIPLIER: u64 = 2;
115
116type ReceiptRootSender<N> =
117 crossbeam_channel::Sender<IndexedReceipt<<N as NodePrimitives>::Receipt>>;
118type ReceiptRootReceiver = tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>;
119
120pub struct TreeCtx<'a, N: NodePrimitives> {
125 state: &'a mut EngineApiTreeState<N>,
127 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
129}
130
131impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("TreeCtx")
134 .field("state", &"EngineApiTreeState")
135 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
136 .finish()
137 }
138}
139
140impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
141 pub const fn new(
143 state: &'a mut EngineApiTreeState<N>,
144 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
145 ) -> Self {
146 Self { state, canonical_in_memory_state }
147 }
148}
149
150impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
151 pub const fn state(&self) -> &EngineApiTreeState<N> {
153 &*self.state
154 }
155
156 pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
158 self.state
159 }
160
161 pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
163 self.canonical_in_memory_state
164 }
165}
166
167#[derive(derive_more::Debug)]
175pub struct BasicEngineValidator<P, Evm, V>
176where
177 Evm: ConfigureEvm,
178{
179 provider: P,
181 consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
183 evm_config: Evm,
185 config: TreeConfig,
187 payload_processor: PayloadProcessor<Evm>,
189 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
191 precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
193 #[debug(skip)]
195 invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
196 metrics: EngineApiMetrics,
198 validator: V,
200 changeset_cache: ChangesetCache,
202 runtime: reth_tasks::Runtime,
204 custom_state_root: Option<CustomStateRoot<Evm::Primitives>>,
206}
207
208impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
209where
210 N: NodePrimitives,
211 P: DatabaseProviderFactory<
212 Provider: BlockReader
213 + StageCheckpointReader
214 + PruneCheckpointReader
215 + ChangeSetReader
216 + StorageChangeSetReader
217 + BlockNumReader
218 + StorageSettingsCache,
219 > + BlockReader<Header = N::BlockHeader>
220 + ChangeSetReader
221 + BlockNumReader
222 + StateProviderFactory
223 + StateReader
224 + HashedPostStateProvider
225 + Clone
226 + 'static,
227 Evm: ConfigureEvm<Primitives = N> + 'static,
228{
229 #[expect(clippy::too_many_arguments)]
231 pub fn new(
232 provider: P,
233 consensus: Arc<dyn FullConsensus<N>>,
234 evm_config: Evm,
235 validator: V,
236 config: TreeConfig,
237 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
238 changeset_cache: ChangesetCache,
239 runtime: reth_tasks::Runtime,
240 ) -> Self {
241 let precompile_cache_map = PrecompileCacheMap::default();
242 let payload_processor = PayloadProcessor::new(
243 runtime.clone(),
244 evm_config.clone(),
245 &config,
246 precompile_cache_map.clone(),
247 );
248 Self {
249 provider,
250 consensus,
251 evm_config,
252 payload_processor,
253 precompile_cache_map,
254 precompile_cache_metrics: HashMap::new(),
255 config,
256 invalid_block_hook,
257 metrics: EngineApiMetrics::default(),
258 validator,
259 changeset_cache,
260 runtime,
261 custom_state_root: None,
262 }
263 }
264
265 pub fn with_custom_state_root(mut self, custom_state_root: CustomStateRoot<N>) -> Self {
267 self.custom_state_root = Some(custom_state_root);
268 self
269 }
270
271 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
273 pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
274 &self,
275 input: BlockOrPayload<T>,
276 ) -> Result<SealedBlock<N::Block>, NewPayloadError>
277 where
278 V: PayloadValidator<T, Block = N::Block>,
279 {
280 match input {
281 BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
282 BlockOrPayload::Block(block) => Ok(block),
283 }
284 }
285
286 pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
288 &self,
289 input: &BlockOrPayload<T>,
290 ) -> Result<EvmEnvFor<Evm>, Evm::Error>
291 where
292 V: PayloadValidator<T, Block = N::Block>,
293 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
294 {
295 match input {
296 BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
297 BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
298 }
299 }
300
301 pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
303 &'a self,
304 input: &'a BlockOrPayload<T>,
305 ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
306 where
307 V: PayloadValidator<T, Block = N::Block>,
308 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
309 {
310 Ok(match input {
311 BlockOrPayload::Payload(payload) => {
312 let iter = self
313 .evm_config
314 .tx_iterator_for_payload(payload)
315 .map_err(NewPayloadError::other)?;
316 Either::Left(iter)
317 }
318 BlockOrPayload::Block(block) => {
319 let txs = block.body().clone_transactions();
320 let convert = |tx: N::SignedTx| tx.try_into_recovered();
321 Either::Right((txs, convert))
322 }
323 })
324 }
325
326 pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
328 &self,
329 input: &'a BlockOrPayload<T>,
330 ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
331 where
332 V: PayloadValidator<T, Block = N::Block>,
333 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
334 {
335 match input {
336 BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
337 BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
338 }
339 }
340
341 #[instrument(
349 level = "debug",
350 target = "engine::tree::payload_validator",
351 skip_all,
352 fields(
353 parent = ?input.parent_hash(),
354 type_name = ?input.type_name(),
355 )
356 )]
357 pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
358 &mut self,
359 input: BlockOrPayload<T>,
360 mut ctx: TreeCtx<'_, N>,
361 ) -> InsertPayloadResult<N>
362 where
363 V: PayloadValidator<T, Block = N::Block> + Clone,
364 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
365 {
366 let parent_hash = input.parent_hash();
367
368 let parent_block = match self.sealed_header_by_hash(parent_hash, ctx.state()) {
371 Ok(Some(parent_block)) => parent_block,
372 Ok(None) => {
373 return Err(InsertBlockError::new(
374 self.convert_to_block(input)?,
375 ProviderError::HeaderNotFound(parent_hash.into()).into(),
376 )
377 .into())
378 }
379 Err(e) => {
380 return Err(InsertBlockError::new(self.convert_to_block(input)?, e.into()).into())
381 }
382 };
383
384 let validated_block = self.spawn_convert_and_validate(&input, parent_block.clone());
388
389 macro_rules! ensure_ok {
392 ($expr:expr) => {
393 match $expr {
394 Ok(val) => val,
395 Err(e) => {
396 let block = validated_block.try_into_inner().expect("sole handle")?;
397 return Err(InsertBlockError::new(block, e.into()).into())
398 }
399 }
400 };
401 }
402
403 macro_rules! ensure_ok_post_block {
405 ($expr:expr, $block:expr) => {
406 match $expr {
407 Ok(val) => val,
408 Err(e) => {
409 return Err(
410 InsertBlockError::new($block.into_sealed_block(), e.into()).into()
411 )
412 }
413 }
414 };
415 }
416
417 if input.gas_used() > parent_block.gas_limit() * MAX_EXPECTED_GAS_USAGE_MULTIPLIER {
420 if validated_block.get().is_err() {
422 return Err(validated_block
423 .try_into_inner()
424 .expect("sole handle")
425 .expect_err("Err result checked"))
426 }
427 }
428
429 trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
430 let _enter =
431 debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
432 let Some(provider_builder) =
433 ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
434 else {
435 return Err(InsertBlockError::new(
437 validated_block.try_into_inner().expect("sole handle")?,
438 ProviderError::HeaderNotFound(parent_hash.into()).into(),
439 )
440 .into())
441 };
442 let mut state_provider = ensure_ok!(provider_builder.build());
443 drop(_enter);
444
445 let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
446 .in_scope(|| self.evm_env_for(&input))
447 .map_err(NewPayloadError::other)?;
448
449 let decoded_bal = ensure_ok!(input
451 .try_decoded_access_list()
452 .map_err(|err| { Box::<dyn std::error::Error + Send + Sync>::from(err) }))
453 .map(Arc::new);
454
455 let env = ExecutionEnv {
456 evm_env,
457 hash: input.hash(),
458 parent_hash: input.parent_hash(),
459 parent_state_root: parent_block.state_root(),
460 transaction_count: input.transaction_count(),
461 gas_used: input.gas_used(),
462 withdrawals: input.withdrawals().map(|w| w.to_vec()),
463 decoded_bal,
464 };
465
466 let strategy = self.plan_state_root_computation();
468
469 debug!(
470 target: "engine::tree::payload_validator",
471 ?strategy,
472 "Decided which state root algorithm to run"
473 );
474
475 let txs = self.tx_iterator_for(&input)?;
477
478 let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
481
482 let provider_factory = self.provider.clone();
485 let overlay_builder = OverlayBuilder::<N>::new(anchor_hash, self.changeset_cache.clone())
486 .with_lazy_overlay(lazy_overlay);
487 let overlay_factory =
488 OverlayStateProviderFactory::new(provider_factory.clone(), overlay_builder.clone());
489
490 let bal_eligible = ensure_ok!(self.bal_path_eligible(env.decoded_bal.as_deref()));
493 let bal_provider_builder = bal_eligible.then(|| provider_builder.clone());
494
495 let mut handle = ensure_ok!(self.spawn_payload_processor(
497 env.clone(),
498 txs,
499 provider_builder.clone(),
500 overlay_factory.clone(),
501 &strategy,
502 ));
503
504 let slow_block_enabled = self.config.slow_block_threshold().is_some();
506 let cache_stats = slow_block_enabled.then(|| Arc::new(CacheStats::default()));
507
508 if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
511 state_provider = Box::new(
512 CachedStateProvider::new(state_provider, caches, cache_metrics)
513 .with_cache_stats(cache_stats.clone()),
514 );
515 };
516
517 let state_provider_stats = if slow_block_enabled || self.config.state_provider_metrics() {
518 let instrumented_state_provider =
519 InstrumentedStateProvider::new(state_provider, "engine");
520 let stats = slow_block_enabled.then(|| instrumented_state_provider.stats());
521 state_provider = Box::new(instrumented_state_provider);
522 stats
523 } else {
524 None
525 };
526
527 let execute_block_start = Instant::now();
531 let decoded_bal = env.decoded_bal.clone();
532 let (output, senders, receipt_root_rx) = if bal_eligible {
533 let provider_builder =
534 bal_provider_builder.expect("eligibility implies builder was cloned");
535 ensure_ok!(self.execute_block_bal(
536 state_provider,
537 env,
538 &input,
539 &handle,
540 provider_builder
541 ))
542 } else {
543 ensure_ok!(self.execute_block(state_provider, env, &input, &mut handle))
544 };
545 let block_access_list_hash = decoded_bal.as_ref().map(|decoded_bal| decoded_bal.hash());
546 let execution_duration = execute_block_start.elapsed();
547
548 handle.stop_prewarming_execution();
550
551 let output = Arc::new(output);
555
556 let valid_block_tx = handle.terminate_caching(Some(output.clone()));
559
560 let hashed_state_output = output.clone();
564 let hashed_state_provider = self.provider.clone();
565 let hashed_state: LazyHashedPostState =
566 self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
567 let _span = debug_span!(
568 target: "engine::tree::payload_validator",
569 "hashed_post_state",
570 )
571 .entered();
572 Arc::new(hashed_state_provider.hashed_post_state(&hashed_state_output.state))
573 });
574
575 let block = validated_block.try_into_inner().expect("sole handle")?;
576 let block = block.with_senders(senders);
577
578 let receipt_root_bloom = {
580 let _enter = debug_span!(
581 target: "engine::tree::payload_validator",
582 "wait_receipt_root",
583 )
584 .entered();
585
586 receipt_root_rx
587 .blocking_recv()
588 .inspect_err(|_| {
589 tracing::error!(
590 target: "engine::tree::payload_validator",
591 "Receipt root task dropped sender without result, receipt root calculation likely aborted"
592 );
593 })
594 .ok()
595 };
596
597 let hashed_state = ensure_ok_post_block!(
598 self.validate_post_execution(
599 &block,
600 &parent_block,
601 &output,
602 &mut ctx,
603 receipt_root_bloom,
604 hashed_state,
605 block_access_list_hash
606 ),
607 block
608 );
609
610 let root_time = Instant::now();
611 let mut maybe_state_root = None;
612 let mut state_root_task_failed = false;
613 #[cfg(feature = "trie-debug")]
614 let mut trie_debug_recorders = Vec::new();
615
616 match strategy {
617 StateRootStrategy::StateRootTask => {
618 debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
619
620 let task_result = ensure_ok_post_block!(
621 self.await_state_root_with_timeout(
622 &mut handle,
623 provider_builder.clone(),
624 &hashed_state,
625 ),
626 block
627 );
628
629 match task_result {
630 Ok(StateRootComputeOutcome {
631 state_root,
632 trie_updates,
633 #[cfg(feature = "trie-debug")]
634 debug_recorders,
635 }) => {
636 let elapsed = root_time.elapsed();
637 info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
638
639 #[cfg(feature = "trie-debug")]
640 {
641 trie_debug_recorders = debug_recorders;
642 }
643
644 if self.config.always_compare_trie_updates() {
646 let _has_diff = self.compare_trie_updates_with_serial(
647 provider_builder.clone(),
648 provider_factory,
649 overlay_builder,
650 &hashed_state,
651 trie_updates.as_ref().clone(),
652 );
653 #[cfg(feature = "trie-debug")]
654 if _has_diff {
655 Self::write_trie_debug_recorders(
656 block.header().number(),
657 &trie_debug_recorders,
658 );
659 }
660 }
661
662 if state_root == block.header().state_root() {
664 maybe_state_root = Some((state_root, trie_updates, elapsed))
665 } else {
666 warn!(
667 target: "engine::tree::payload_validator",
668 ?state_root,
669 block_state_root = ?block.header().state_root(),
670 "State root task returned incorrect state root"
671 );
672 #[cfg(feature = "trie-debug")]
673 Self::write_trie_debug_recorders(
674 block.header().number(),
675 &trie_debug_recorders,
676 );
677 state_root_task_failed = true;
678 }
679 }
680 Err(error) => {
681 debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
682 state_root_task_failed = true;
683 }
684 }
685 }
686 StateRootStrategy::Parallel => {
687 debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
688 match self.compute_state_root_parallel(
689 provider_factory,
690 overlay_builder,
691 &hashed_state,
692 ) {
693 Ok(result) => {
694 let elapsed = root_time.elapsed();
695 info!(
696 target: "engine::tree::payload_validator",
697 regular_state_root = ?result.0,
698 ?elapsed,
699 "Regular root task finished"
700 );
701 maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
702 }
703 Err(error) => {
704 debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
705 }
706 }
707 }
708 StateRootStrategy::Synchronous => {}
709 StateRootStrategy::Custom(custom) => {
710 let (state_root, trie_updates) = ensure_ok_post_block!(
711 custom(CustomStateRootInput {
712 block: &block,
713 parent_block: &parent_block,
714 output: &output,
715 hashed_state: &hashed_state,
716 }),
717 block
718 );
719 maybe_state_root = Some((state_root, Arc::new(trie_updates), root_time.elapsed()));
720 }
721 }
722
723 let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
727 maybe_state_root
728 {
729 maybe_state_root
730 } else {
731 if self.config.state_root_fallback() {
733 debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
734 } else {
735 warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
736 self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
737 }
738
739 let (root, updates) = ensure_ok_post_block!(
740 provider_builder
741 .build()
742 .and_then(|provider| Self::compute_state_root_serial(provider, &hashed_state)),
743 block
744 );
745
746 if state_root_task_failed {
747 self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
748 }
749
750 (root, Arc::new(updates), root_time.elapsed())
751 };
752
753 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
754 self.metrics
755 .record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
756 debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
757
758 if state_root != block.header().state_root() {
760 #[cfg(feature = "trie-debug")]
761 Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
762
763 self.on_invalid_block(
765 &parent_block,
766 &block,
767 &output,
768 Some((&trie_output, state_root)),
769 ctx.state_mut(),
770 );
771 let block_state_root = block.header().state_root();
772 return Err(InsertBlockError::new(
773 block.into_sealed_block(),
774 ConsensusError::BodyStateRootDiff(
775 GotExpected { got: state_root, expected: block_state_root }.into(),
776 )
777 .into(),
778 )
779 .into())
780 }
781
782 let timing_stats = state_provider_stats.map(|stats| {
783 self.calculate_timing_stats(
784 &block,
785 stats,
786 cache_stats,
787 &output,
788 execution_duration,
789 root_elapsed,
790 )
791 });
792
793 if let Some(valid_block_tx) = valid_block_tx {
794 let _ = valid_block_tx.send(());
795 }
796
797 let changeset_provider =
802 ensure_ok_post_block!(overlay_factory.database_provider_ro(), block);
803
804 let executed_block = self.spawn_deferred_trie_task(
805 Arc::new(block),
806 output,
807 ctx.state(),
808 hashed_state,
809 trie_output,
810 changeset_provider,
811 );
812 Ok(ValidationOutput::new(executed_block, timing_stats))
813 }
814
815 #[expect(clippy::type_complexity)]
818 pub fn spawn_convert_and_validate<T>(
819 &self,
820 input: &BlockOrPayload<T>,
821 parent: SealedHeader<N::BlockHeader>,
822 ) -> LazyHandle<Result<SealedBlock<N::Block>, InsertPayloadError<N::Block>>>
823 where
824 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
825 V: PayloadValidator<T, Block = N::Block> + Clone,
826 {
827 let input = input.clone();
828 let validator = self.validator.clone();
829 let consensus = self.consensus.clone();
830 let parent_span = Span::current();
831 self.payload_processor.executor().spawn_blocking_named("payload-convert", move || {
832 let _span = debug_span!(
833 target: "engine::tree::payload_validator",
834 parent: parent_span,
835 "convert_and_validate",
836 )
837 .entered();
838 let block = match input {
839 BlockOrPayload::Block(block) => block,
840 BlockOrPayload::Payload(payload) => {
841 validator.convert_payload_to_block(payload)?
842 }
843 };
844
845 if let Err(e) = consensus.validate_header(block.sealed_header()) {
846 error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
847 return Err(InsertBlockError::consensus_error(e, block).into())
848 }
849
850 let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
852 if let Err(e) = consensus.validate_header_against_parent(block.sealed_header(), &parent)
853 {
854 warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
855 return Err(InsertBlockError::consensus_error(e, block).into())
856 }
857 drop(_enter);
858
859 if let Err(e) =
860 consensus.validate_block_pre_execution_with_tx_root(&block, None)
861 {
862 error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
863 return Err(InsertBlockError::consensus_error(e, block).into())
864 }
865
866 Ok(block)
867 })
868 }
869
870 fn sealed_header_by_hash(
872 &self,
873 hash: B256,
874 state: &EngineApiTreeState<N>,
875 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
876 let header = state.tree_state.sealed_header_by_hash(&hash);
878
879 if header.is_some() {
880 Ok(header)
881 } else {
882 self.provider.sealed_header_by_hash(hash)
883 }
884 }
885
886 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
894 #[expect(clippy::type_complexity)]
895 fn execute_block<S, Err, T>(
896 &mut self,
897 state_provider: S,
898 env: ExecutionEnv<Evm>,
899 input: &BlockOrPayload<T>,
900 handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
901 ) -> Result<
902 (BlockExecutionOutput<N::Receipt>, Vec<Address>, ReceiptRootReceiver),
903 InsertBlockErrorKind,
904 >
905 where
906 S: StateProvider + Send,
907 Err: core::error::Error + Send + Sync + 'static,
908 V: PayloadValidator<T, Block = N::Block>,
909 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
910 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
911 {
912 debug!(target: "engine::tree::payload_validator", "Executing block");
913
914 if let Some(decoded_bal) = &env.decoded_bal {
915 decoded_bal
916 .as_bal()
917 .validate_gas_limit(input.gas_limit())
918 .map_err(|e| {
919 debug!(target: "engine::tree::payload_validator", "BAL is invalid since it contains more items than the gas limit allows");
920 InsertBlockErrorKind::Consensus(ConsensusError::from(e))
921 })?
922 }
923
924 let has_bal = env.decoded_bal.is_some();
925 let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
926 State::builder()
927 .with_database(StateProviderDatabase::new(state_provider))
928 .with_bundle_update()
929 .with_bal_builder_if(has_bal)
930 .build()
931 });
932
933 let (spec_id, mut executor) = {
934 let _span = debug_span!(target: "engine::tree", "create_evm").entered();
935 let spec_id = *env.evm_env.spec_id();
936 let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
937 let ctx = self
938 .execution_ctx_for(input)
939 .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
940 let executor = self.evm_config.create_executor(evm, ctx);
941 (spec_id, executor)
942 };
943
944 if !self.config.precompile_cache_disabled() {
945 let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
946 executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
947 |address, precompile| {
948 let metrics = self
949 .precompile_cache_metrics
950 .entry(*address)
951 .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
952 .clone();
953 CachedPrecompile::wrap(
954 precompile,
955 self.precompile_cache_map.cache_for_address(*address),
956 spec_id,
957 Some(metrics),
958 )
959 },
960 );
961 }
962
963 let transaction_count = input.transaction_count();
964 let (receipt_tx, result_rx) = self.spawn_receipt_root_task(transaction_count);
965 let executed_tx_index = Arc::clone(handle.executed_tx_index());
966 let executor = executor.with_state_hook(
967 handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
968 );
969
970 let execution_start = Instant::now();
971
972 let (executor, senders) = self.execute_transactions(
974 executor,
975 transaction_count,
976 handle.iter_transactions(),
977 &receipt_tx,
978 &executed_tx_index,
979 has_bal,
980 )?;
981 drop(receipt_tx);
982
983 let post_exec_start = Instant::now();
985 let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
986 .in_scope(|| executor.finish())
987 .map(|(evm, result)| (evm.into_db(), result))?;
988 self.metrics.record_post_execution(post_exec_start.elapsed());
989
990 if let Some(decoded_bal) = &env.decoded_bal {
991 crate::tree::payload_processor::bal::validate_bal(&mut db, decoded_bal)?;
995 }
996
997 debug_span!(target: "engine::tree", "merge_transitions")
999 .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
1000
1001 let output = BlockExecutionOutput { result, state: db.take_bundle() };
1002
1003 let execution_duration = execution_start.elapsed();
1004 self.metrics.record_block_execution(&output, execution_duration);
1005 self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1006 debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
1007
1008 Ok((output, senders, result_rx))
1009 }
1010
1011 fn bal_path_eligible(&self, bal: Option<&DecodedBal>) -> Result<bool, InsertBlockErrorKind> {
1020 let has_bal = bal.is_some();
1021 let parallel_execution = has_bal &&
1022 !self.config.disable_state_cache() &&
1023 !self.config.disable_bal_parallel_execution();
1024 if parallel_execution && self.config.disable_bal_parallel_state_root() {
1025 return Err(InsertBlockErrorKind::Other(
1026 "disabling parallel state root is impossible when parallel execution is enabled"
1027 .into(),
1028 ));
1029 }
1030
1031 Ok(parallel_execution)
1032 }
1033
1034 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1044 #[expect(clippy::type_complexity)]
1045 fn execute_block_bal<S, Tx, Err, BalP, T>(
1046 &self,
1047 _state_provider: S,
1048 env: ExecutionEnv<Evm>,
1049 input: &BlockOrPayload<T>,
1050 handle: &PayloadHandle<Tx, Err, N::Receipt>,
1051 provider_builder: StateProviderBuilder<N, BalP>,
1052 ) -> Result<
1053 (BlockExecutionOutput<N::Receipt>, Vec<Address>, ReceiptRootReceiver),
1054 InsertBlockErrorKind,
1055 >
1056 where
1057 S: StateProvider + Send,
1058 Tx: ExecutableTxFor<Evm> + Send,
1059 Err: core::error::Error + Send + Sync + 'static,
1060 BalP: BlockReader + StateProviderFactory + StateReader + Clone + Send + Sync + 'static,
1061 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
1062 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1063 V: PayloadValidator<T, Block = N::Block>,
1064 {
1065 debug!(target: "engine::tree::payload_validator", "Executing block via BAL path");
1066
1067 let cache = handle.caches().ok_or_else(|| {
1068 InsertBlockErrorKind::Other("BAL execute path: no execution cache available".into())
1069 })?;
1070 let cache_metrics = handle.cache_metrics().unwrap_or_default();
1071 let saved_cache = SavedCache::new(env.parent_hash, cache);
1072
1073 let (receipt_tx, result_rx) = self.spawn_receipt_root_task(env.transaction_count);
1074 let input_bal = env.decoded_bal.ok_or_else(|| {
1075 InsertBlockErrorKind::Other("BAL execute path: no decoded BAL available".into())
1076 })?;
1077
1078 let make_db = move || {
1079 let provider = provider_builder
1080 .build()
1081 .map_err(crate::tree::payload_processor::bal::BalExecutionError::Provider)?;
1082 Ok(StateProviderDatabase::new(CachedStateProvider::new_prewarm(
1083 provider,
1084 saved_cache.cache().clone(),
1085 cache_metrics.clone(),
1086 )))
1087 };
1088 let execution_start = Instant::now();
1089 let ctx =
1090 self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
1091 let (output, senders) = crate::tree::payload_processor::bal::execute_block(
1092 &self.runtime,
1093 &self.evm_config,
1094 &make_db,
1095 input_bal,
1096 env.evm_env,
1097 ctx,
1098 env.transaction_count,
1099 handle.clone_transaction_receiver(),
1100 receipt_tx,
1101 )?;
1102 let execution_duration = execution_start.elapsed();
1103
1104 self.metrics.record_block_execution(&output, execution_duration);
1105 self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1106 debug!(
1107 target: "engine::tree::payload_validator",
1108 elapsed = ?execution_duration,
1109 "Executed block via BAL path",
1110 );
1111
1112 Ok((output, senders, result_rx))
1113 }
1114
1115 fn spawn_receipt_root_task(
1116 &self,
1117 receipts_len: usize,
1118 ) -> (ReceiptRootSender<N>, ReceiptRootReceiver) {
1119 let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
1121 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
1122 let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
1123 self.payload_processor
1124 .executor()
1125 .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
1126
1127 (receipt_tx, result_rx)
1128 }
1129
1130 fn execute_transactions<'a, E, Tx, InnerTx, Err, DB>(
1140 &self,
1141 mut executor: E,
1142 transaction_count: usize,
1143 transactions: impl Iterator<Item = Result<Tx, Err>>,
1144 receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
1145 executed_tx_index: &AtomicUsize,
1146 has_bal: bool,
1147 ) -> Result<(E, Vec<Address>), BlockExecutionError>
1148 where
1149 E: BlockExecutor<Receipt = N::Receipt, Evm: alloy_evm::Evm<DB = &'a mut State<DB>>>,
1150 Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
1151 InnerTx: TxHashRef,
1152 DB: revm::Database + 'a,
1153 Err: core::error::Error + Send + Sync + 'static,
1154 {
1155 let mut senders = Vec::with_capacity(transaction_count);
1156
1157 let pre_exec_start = Instant::now();
1159 debug_span!(target: "engine::tree", "pre_execution")
1160 .in_scope(|| executor.apply_pre_execution_changes())?;
1161 self.metrics.record_pre_execution(pre_exec_start.elapsed());
1162
1163 if has_bal {
1165 executor.evm_mut().db_mut().bump_bal_index();
1166 }
1167
1168 let exec_span = debug_span!(target: "engine::tree", "execution").entered();
1170 let mut transactions = transactions.into_iter();
1171 let mut last_sent_len = 0usize;
1176 loop {
1177 let wait_start = Instant::now();
1180 let Some(tx_result) = transactions.next() else { break };
1181 self.metrics.record_transaction_wait(wait_start.elapsed());
1182
1183 let tx = tx_result.map_err(BlockExecutionError::other)?;
1184 let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
1185
1186 senders.push(tx_signer);
1187
1188 let _enter = debug_span!(
1189 target: "engine::tree",
1190 "execute tx",
1191 tx_index = senders.len() - 1,
1192 )
1193 .entered();
1194 trace!(target: "engine::tree", "Executing transaction");
1195
1196 let tx_start = Instant::now();
1197 executor.execute_transaction(tx)?;
1198 self.metrics.record_transaction_execution(tx_start.elapsed());
1199
1200 executed_tx_index.store(senders.len(), Ordering::Relaxed);
1202
1203 let current_len = executor.receipts().len();
1204 if current_len > last_sent_len {
1205 last_sent_len = current_len;
1206 if let Some(receipt) = executor.receipts().last() {
1208 let tx_index = current_len - 1;
1209 let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
1210 }
1211 }
1212 if has_bal {
1214 executor.evm_mut().db_mut().bump_bal_index();
1215 }
1216 }
1217
1218 drop(exec_span);
1219
1220 Ok((executor, senders))
1221 }
1222
1223 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1234 fn compute_state_root_parallel(
1235 &self,
1236 provider_factory: P,
1237 overlay_builder: OverlayBuilder<N>,
1238 hashed_state: &LazyHashedPostState,
1239 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
1240 let hashed_state = hashed_state.get();
1241 let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1245 let overlay_factory = OverlayStateProviderFactory::new(
1246 provider_factory,
1247 overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted()),
1248 );
1249 ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
1250 .incremental_root_with_updates()
1251 }
1252
1253 fn compute_state_root_serial(
1259 state_provider: StateProviderBox,
1260 hashed_state: &LazyHashedPostState,
1261 ) -> ProviderResult<(B256, TrieUpdates)> {
1262 state_provider.state_root_with_updates(hashed_state.get().as_ref().clone())
1263 }
1264
1265 #[instrument(
1279 level = "debug",
1280 target = "engine::tree::payload_validator",
1281 name = "await_state_root",
1282 skip_all
1283 )]
1284 fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
1285 &self,
1286 handle: &mut PayloadHandle<Tx, Err, R>,
1287 state_provider_builder: StateProviderBuilder<N, P>,
1288 hashed_state: &LazyHashedPostState,
1289 ) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
1290 let Some(timeout) = self.config.state_root_task_timeout() else {
1291 return Ok(handle.state_root());
1292 };
1293
1294 let task_rx = handle.take_state_root_rx();
1295
1296 match task_rx.recv_timeout(timeout) {
1297 Ok(result) => Ok(result),
1298 Err(RecvTimeoutError::Disconnected) => {
1299 Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
1300 }
1301 Err(RecvTimeoutError::Timeout) => {
1302 warn!(
1303 target: "engine::tree::payload_validator",
1304 ?timeout,
1305 "State root task timed out, spawning sequential fallback"
1306 );
1307 self.metrics.block_validation.state_root_task_timeout_total.increment(1);
1308
1309 let (seq_tx, seq_rx) =
1310 std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
1311
1312 let seq_hashed_state = hashed_state.clone();
1313 self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
1314 let result = state_provider_builder.build().and_then(|provider| {
1315 Self::compute_state_root_serial(provider, &seq_hashed_state)
1316 });
1317 let _ = seq_tx.send(result);
1318 });
1319
1320 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
1321
1322 loop {
1323 match task_rx.recv_timeout(POLL_INTERVAL) {
1324 Ok(result) => {
1325 debug!(
1326 target: "engine::tree::payload_validator",
1327 source = "task",
1328 "State root timeout race won"
1329 );
1330 return Ok(result);
1331 }
1332 Err(RecvTimeoutError::Disconnected) => {
1333 debug!(
1334 target: "engine::tree::payload_validator",
1335 "State root task dropped, waiting for sequential fallback"
1336 );
1337 let result = seq_rx.recv().map_err(|_| {
1338 ProviderError::other(std::io::Error::other(
1339 "both state root computations failed",
1340 ))
1341 })?;
1342 let (state_root, trie_updates) = result?;
1343 return Ok(Ok(StateRootComputeOutcome {
1344 state_root,
1345 trie_updates: Arc::new(trie_updates),
1346 #[cfg(feature = "trie-debug")]
1347 debug_recorders: Vec::new(),
1348 }));
1349 }
1350 Err(RecvTimeoutError::Timeout) => {}
1351 }
1352
1353 if let Ok(result) = seq_rx.try_recv() {
1354 debug!(
1355 target: "engine::tree::payload_validator",
1356 source = "sequential",
1357 "State root timeout race won"
1358 );
1359 let (state_root, trie_updates) = result?;
1360 return Ok(Ok(StateRootComputeOutcome {
1361 state_root,
1362 trie_updates: Arc::new(trie_updates),
1363 #[cfg(feature = "trie-debug")]
1364 debug_recorders: Vec::new(),
1365 }));
1366 }
1367 }
1368 }
1369 }
1370 }
1371
1372 fn compare_trie_updates_with_serial(
1379 &self,
1380 state_provider_builder: StateProviderBuilder<N, P>,
1381 provider_factory: P,
1382 overlay_builder: OverlayBuilder<N>,
1383 hashed_state: &LazyHashedPostState,
1384 task_trie_updates: TrieUpdates,
1385 ) -> bool {
1386 debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1387
1388 match state_provider_builder
1389 .build()
1390 .and_then(|provider| Self::compute_state_root_serial(provider, hashed_state))
1391 {
1392 Ok((serial_root, serial_trie_updates)) => {
1393 debug!(
1394 target: "engine::tree::payload_validator",
1395 ?serial_root,
1396 "Serial state root computation finished for comparison"
1397 );
1398
1399 let overlay_factory =
1401 OverlayStateProviderFactory::new(provider_factory, overlay_builder);
1402 match overlay_factory.database_provider_ro() {
1403 Ok(provider) => {
1404 match super::trie_updates::compare_trie_updates(
1405 &provider,
1406 task_trie_updates,
1407 serial_trie_updates,
1408 ) {
1409 Ok(has_diff) => return has_diff,
1410 Err(err) => {
1411 warn!(
1412 target: "engine::tree::payload_validator",
1413 %err,
1414 "Error comparing trie updates"
1415 );
1416 return true;
1417 }
1418 }
1419 }
1420 Err(err) => {
1421 warn!(
1422 target: "engine::tree::payload_validator",
1423 %err,
1424 "Failed to get database provider for trie update comparison"
1425 );
1426 }
1427 }
1428 }
1429 Err(err) => {
1430 warn!(
1431 target: "engine::tree::payload_validator",
1432 %err,
1433 "Failed to compute serial state root for comparison"
1434 );
1435 }
1436 }
1437 false
1438 }
1439
1440 #[cfg(feature = "trie-debug")]
1445 fn write_trie_debug_recorders(
1446 block_number: u64,
1447 recorders: &[(Option<B256>, TrieDebugRecorder)],
1448 ) {
1449 let path = format!("trie_debug_block_{block_number}.json");
1450 match serde_json::to_string_pretty(recorders) {
1451 Ok(json) => match std::fs::write(&path, json) {
1452 Ok(()) => {
1453 warn!(
1454 target: "engine::tree::payload_validator",
1455 %path,
1456 "Wrote trie debug recorders to file"
1457 );
1458 }
1459 Err(err) => {
1460 warn!(
1461 target: "engine::tree::payload_validator",
1462 %err,
1463 %path,
1464 "Failed to write trie debug recorders"
1465 );
1466 }
1467 },
1468 Err(err) => {
1469 warn!(
1470 target: "engine::tree::payload_validator",
1471 %err,
1472 "Failed to serialize trie debug recorders"
1473 );
1474 }
1475 }
1476 }
1477
1478 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1490 #[expect(clippy::too_many_arguments)]
1491 fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1492 &self,
1493 block: &RecoveredBlock<N::Block>,
1494 parent_block: &SealedHeader<N::BlockHeader>,
1495 output: &BlockExecutionOutput<N::Receipt>,
1496 ctx: &mut TreeCtx<'_, N>,
1497 receipt_root_bloom: Option<ReceiptRootBloom>,
1498 hashed_state: LazyHashedPostState,
1499 block_access_list_hash: Option<B256>,
1500 ) -> Result<LazyHashedPostState, InsertBlockErrorKind>
1501 where
1502 V: PayloadValidator<T, Block = N::Block>,
1503 {
1504 let start = Instant::now();
1505
1506 trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1507
1508 let _enter =
1510 debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1511 .entered();
1512 if let Err(err) = self.consensus.validate_block_post_execution(
1513 block,
1514 output,
1515 receipt_root_bloom,
1516 block_access_list_hash,
1517 ) {
1518 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1520 return Err(err.into())
1521 }
1522 drop(_enter);
1523
1524 let hashed_state_ref =
1527 debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
1528 .in_scope(|| hashed_state.get());
1529
1530 let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
1531 if let Err(err) =
1532 self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, block)
1533 {
1534 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1536 return Err(err.into())
1537 }
1538
1539 self.metrics
1541 .block_validation
1542 .post_execution_validation_duration
1543 .record(start.elapsed().as_secs_f64());
1544
1545 Ok(hashed_state)
1546 }
1547
1548 #[instrument(
1564 level = "debug",
1565 target = "engine::tree::payload_validator",
1566 skip_all,
1567 fields(?strategy)
1568 )]
1569 fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1570 &mut self,
1571 env: ExecutionEnv<Evm>,
1572 txs: T,
1573 provider_builder: StateProviderBuilder<N, P>,
1574 overlay_factory: OverlayStateProviderFactory<P, N>,
1575 strategy: &StateRootStrategy<N>,
1576 ) -> Result<
1577 PayloadHandle<
1578 impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1579 impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1580 N::Receipt,
1581 >,
1582 InsertBlockErrorKind,
1583 > {
1584 match strategy {
1585 StateRootStrategy::StateRootTask => {
1586 let spawn_start = Instant::now();
1587
1588 let handle = self.payload_processor.spawn(
1590 env,
1591 txs,
1592 provider_builder,
1593 overlay_factory,
1594 &self.config,
1595 );
1596
1597 self.metrics
1599 .block_validation
1600 .spawn_payload_processor
1601 .record(spawn_start.elapsed().as_secs_f64());
1602
1603 Ok(handle)
1604 }
1605 StateRootStrategy::Parallel |
1606 StateRootStrategy::Synchronous |
1607 StateRootStrategy::Custom(_) => {
1608 let start = Instant::now();
1609 let handle =
1610 self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
1611
1612 self.metrics
1614 .block_validation
1615 .spawn_payload_processor
1616 .record(start.elapsed().as_secs_f64());
1617
1618 Ok(handle)
1619 }
1620 }
1621 }
1622
1623 fn state_provider_builder(
1628 &self,
1629 hash: B256,
1630 state: &EngineApiTreeState<N>,
1631 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1632 if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1633 debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1634 return Ok(Some(StateProviderBuilder::new(
1636 self.provider.clone(),
1637 historical,
1638 Some(blocks),
1639 )))
1640 }
1641
1642 if let Some(header) = self.provider.header(hash)? {
1644 debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1645 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1648 }
1649
1650 debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1651 Ok(None)
1652 }
1653
1654 fn plan_state_root_computation(&self) -> StateRootStrategy<N> {
1659 if let Some(custom_state_root) = &self.custom_state_root {
1660 StateRootStrategy::Custom(custom_state_root.clone())
1661 } else if self.config.state_root_fallback() {
1662 StateRootStrategy::Synchronous
1663 } else if self.config.use_state_root_task() {
1664 StateRootStrategy::StateRootTask
1665 } else {
1666 StateRootStrategy::Parallel
1667 }
1668 }
1669
1670 fn on_invalid_block(
1672 &self,
1673 parent_header: &SealedHeader<N::BlockHeader>,
1674 block: &RecoveredBlock<N::Block>,
1675 output: &BlockExecutionOutput<N::Receipt>,
1676 trie_updates: Option<(&TrieUpdates, B256)>,
1677 state: &mut EngineApiTreeState<N>,
1678 ) {
1679 if state.invalid_headers.get(&block.hash()).is_some() {
1680 return
1682 }
1683 self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1684 }
1685
1686 fn get_parent_lazy_overlay(
1696 parent_hash: B256,
1697 state: &EngineApiTreeState<N>,
1698 ) -> (Option<LazyOverlay<N>>, B256) {
1699 let (anchor_hash, blocks) =
1701 state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
1702
1703 if blocks.is_empty() {
1704 debug!(target: "engine::tree::payload_validator", "Parent found on disk, no lazy overlay needed");
1705 return (None, anchor_hash);
1706 }
1707
1708 if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
1710 debug!(
1711 target: "engine::tree::payload_validator",
1712 %parent_hash,
1713 %anchor_hash,
1714 "Using cached canonical overlay"
1715 );
1716 return (Some(cached.overlay.clone()), cached.anchor_hash);
1717 }
1718
1719 debug!(
1720 target: "engine::tree::payload_validator",
1721 %anchor_hash,
1722 num_blocks = blocks.len(),
1723 "Creating lazy overlay for in-memory blocks"
1724 );
1725
1726 (Some(LazyOverlay::new(blocks)), anchor_hash)
1727 }
1728
1729 fn spawn_deferred_trie_task(
1746 &self,
1747 block: Arc<RecoveredBlock<N::Block>>,
1748 execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1749 state: &EngineApiTreeState<N>,
1750 hashed_state: LazyHashedPostState,
1751 trie_output: Arc<TrieUpdates>,
1752 changeset_provider: impl TrieCursorFactory + Send + 'static,
1753 ) -> ExecutedBlock<N> {
1754 let (anchor_hash, overlay_blocks) = state
1756 .tree_state
1757 .blocks_by_hash(block.parent_hash())
1758 .unwrap_or_else(|| (block.parent_hash(), Vec::new()));
1759
1760 let ancestors: Vec<DeferredTrieData> =
1763 overlay_blocks.iter().rev().map(|b| b.trie_data_handle()).collect();
1764
1765 let hashed_state = match hashed_state.try_into_inner() {
1769 Ok(state) => state,
1770 Err(handle) => handle.get().clone(),
1771 };
1772 let deferred_trie_data =
1773 DeferredTrieData::pending(hashed_state, trie_output, anchor_hash, ancestors);
1774 let deferred_handle_task = deferred_trie_data.clone();
1775 let block_validation_metrics = self.metrics.block_validation.clone();
1776
1777 let block_hash = block.hash();
1779 let block_number = block.number();
1780
1781 let pending_changeset_guard = self.changeset_cache.register_pending(block_hash);
1785
1786 let compute_trie_input_task = move || {
1789 let _span = debug_span!(
1790 target: "engine::tree::payload_validator",
1791 "compute_trie_input_task",
1792 block_number
1793 )
1794 .entered();
1795
1796 let result = panic::catch_unwind(AssertUnwindSafe(|| {
1797 let compute_start = Instant::now();
1798 let computed = deferred_handle_task.wait_cloned();
1799 block_validation_metrics
1800 .deferred_trie_compute_duration
1801 .record(compute_start.elapsed().as_secs_f64());
1802
1803 block_validation_metrics
1805 .hashed_post_state_size
1806 .record(computed.hashed_state.total_len() as f64);
1807 block_validation_metrics
1808 .trie_updates_sorted_size
1809 .record(computed.trie_updates.total_len() as f64);
1810 if let Some(anchored) = &computed.anchored_trie_input {
1811 block_validation_metrics
1812 .anchored_overlay_trie_updates_size
1813 .record(anchored.trie_input.nodes.total_len() as f64);
1814 block_validation_metrics
1815 .anchored_overlay_hashed_state_size
1816 .record(anchored.trie_input.state.total_len() as f64);
1817 }
1818
1819 let changeset_start = Instant::now();
1823
1824 match reth_trie::changesets::compute_trie_changesets(
1825 &changeset_provider,
1826 &computed.trie_updates,
1827 ) {
1828 Ok(changesets) => {
1829 debug!(
1830 target: "engine::tree::changeset",
1831 ?block_number,
1832 elapsed = ?changeset_start.elapsed(),
1833 "Computed and caching changesets"
1834 );
1835
1836 pending_changeset_guard.resolve(block_number, Arc::new(changesets));
1837 }
1838 Err(e) => {
1839 warn!(
1840 target: "engine::tree::changeset",
1841 ?block_number,
1842 ?e,
1843 "Failed to compute changesets in deferred trie task"
1844 );
1845 }
1846 }
1847 }));
1848
1849 if result.is_err() {
1850 error!(
1851 target: "engine::tree::payload_validator",
1852 "Deferred trie task panicked; fallback computation will be used when trie data is accessed"
1853 );
1854 }
1855 };
1856
1857 self.payload_processor
1859 .executor()
1860 .spawn_blocking_named("trie-input", compute_trie_input_task);
1861
1862 ExecutedBlock::with_deferred_trie_data(block, execution_outcome, deferred_trie_data)
1863 }
1864
1865 fn calculate_timing_stats(
1866 &self,
1867 block: &RecoveredBlock<N::Block>,
1868 provider_stats: Arc<StateProviderStats>,
1869 cache_stats: Option<Arc<CacheStats>>,
1870 output: &BlockExecutionOutput<N::Receipt>,
1871 execution_duration: Duration,
1872 state_hash_duration: Duration,
1873 ) -> Box<ExecutionTimingStats> {
1874 let accounts_read = provider_stats.total_account_fetches();
1875 let storage_read = provider_stats.total_storage_fetches();
1876 let code_read = provider_stats.total_code_fetches();
1877 let code_bytes_read = provider_stats.total_code_fetched_bytes();
1878
1879 let accounts_changed = output.state.state.len();
1881 let accounts_deleted =
1882 output.state.state.values().filter(|acc| acc.was_destroyed()).count();
1883 let storage_slots_changed =
1884 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
1885 let storage_slots_deleted = output
1886 .state
1887 .state
1888 .values()
1889 .flat_map(|account| account.storage.values())
1890 .filter(|slot| {
1891 slot.present_value.is_zero() && !slot.previous_or_original_value.is_zero()
1892 })
1893 .count();
1894
1895 let is_new_deployment = |acc: &BundleAccount| -> bool {
1897 let has_code_now = acc.info.as_ref().is_some_and(|info| info.code_hash != KECCAK_EMPTY);
1898 let had_no_code_before = acc
1899 .original_info
1900 .as_ref()
1901 .map(|info| info.code_hash == KECCAK_EMPTY)
1902 .unwrap_or(true);
1903 has_code_now && had_no_code_before
1904 };
1905
1906 let bytecodes_changed =
1907 output.state.state.values().filter(|acc| is_new_deployment(acc)).count();
1908
1909 let unique_new_code_hashes: B256Set = output
1911 .state
1912 .state
1913 .values()
1914 .filter(|acc| is_new_deployment(acc))
1915 .filter_map(|acc| acc.info.as_ref().map(|info| info.code_hash))
1916 .collect();
1917 let code_bytes_written: usize = unique_new_code_hashes
1918 .iter()
1919 .filter_map(|hash| {
1920 output.state.contracts.get(hash).map(|bytecode| bytecode.original_bytes().len())
1921 })
1922 .sum();
1923
1924 let state_read_duration = provider_stats.total_account_fetch_latency() +
1926 provider_stats.total_storage_fetch_latency() +
1927 provider_stats.total_code_fetch_latency();
1928
1929 let eip7702_delegations_set =
1932 output.state.contracts.values().filter(|bytecode| bytecode.is_eip7702()).count();
1933 let eip7702_delegations_cleared = output
1938 .state
1939 .state
1940 .values()
1941 .filter(|acc| {
1942 let original_was_eip7702 = acc
1944 .original_info
1945 .as_ref()
1946 .and_then(|info| info.code.as_ref())
1947 .map(|bytecode| bytecode.is_eip7702())
1948 .unwrap_or(false);
1949
1950 let code_now_empty =
1952 acc.info.as_ref().map(|info| info.code_hash == KECCAK_EMPTY).unwrap_or(false);
1953
1954 original_was_eip7702 && code_now_empty
1955 })
1956 .count();
1957
1958 let (account_cache_hits, account_cache_misses) = cache_stats
1960 .as_ref()
1961 .map(|s| (s.account_hits(), s.account_misses()))
1962 .unwrap_or_default();
1963 let (storage_cache_hits, storage_cache_misses) = cache_stats
1964 .as_ref()
1965 .map(|s| (s.storage_hits(), s.storage_misses()))
1966 .unwrap_or_default();
1967 let (code_cache_hits, code_cache_misses) =
1968 cache_stats.as_ref().map(|s| (s.code_hits(), s.code_misses())).unwrap_or_default();
1969
1970 Box::new(ExecutionTimingStats {
1972 block_number: block.number(),
1973 block_hash: block.hash(),
1974 gas_used: output.result.gas_used,
1975 tx_count: block.transaction_count(),
1976 execution_duration,
1977 state_read_duration,
1978 state_hash_duration,
1979 accounts_read,
1980 storage_read,
1981 code_read,
1982 code_bytes_read,
1983 accounts_changed,
1984 accounts_deleted,
1985 storage_slots_changed,
1986 storage_slots_deleted,
1987 bytecodes_changed,
1988 code_bytes_written,
1989 eip7702_delegations_set,
1990 eip7702_delegations_cleared,
1991 account_cache_hits,
1992 account_cache_misses,
1993 storage_cache_hits,
1994 storage_cache_misses,
1995 code_cache_hits,
1996 code_cache_misses,
1997 })
1998 }
1999}
2000
2001#[derive(derive_more::Debug, Clone)]
2003enum StateRootStrategy<N: NodePrimitives> {
2004 StateRootTask,
2006 Parallel,
2008 Synchronous,
2010 Custom(#[debug(skip)] CustomStateRoot<N>),
2012}
2013
2014pub trait EngineValidator<
2018 Types: PayloadTypes,
2019 N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
2020>: Send + Sync + 'static
2021{
2022 fn validate_payload_attributes_against_header(
2032 &self,
2033 attr: &Types::PayloadAttributes,
2034 header: &N::BlockHeader,
2035 ) -> Result<(), InvalidPayloadAttributesError>;
2036
2037 fn convert_payload_to_block(
2046 &self,
2047 payload: Types::ExecutionData,
2048 ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
2049
2050 fn validate_payload(
2052 &mut self,
2053 payload: Types::ExecutionData,
2054 ctx: TreeCtx<'_, N>,
2055 ) -> ValidationOutcome<N>;
2056
2057 fn validate_block(
2059 &mut self,
2060 block: SealedBlock<N::Block>,
2061 ctx: TreeCtx<'_, N>,
2062 ) -> ValidationOutcome<N>;
2063
2064 fn on_inserted_executed_block(
2069 &self,
2070 block: BuiltPayloadExecutedBlock<N>,
2071 state: &EngineApiTreeState<N>,
2072 ) -> ProviderResult<ExecutedBlock<N>>;
2073
2074 fn cache_for(&self, _block_hash: B256) -> Option<SavedCache>;
2076
2077 fn sparse_trie_handle_for(
2079 &self,
2080 parent_hash: B256,
2081 parent_state_root: B256,
2082 state: &EngineApiTreeState<N>,
2083 ) -> Option<StateRootHandle>;
2084}
2085
2086impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
2087where
2088 P: DatabaseProviderFactory<
2089 Provider: BlockReader
2090 + StageCheckpointReader
2091 + PruneCheckpointReader
2092 + ChangeSetReader
2093 + StorageChangeSetReader
2094 + BlockNumReader
2095 + StorageSettingsCache,
2096 > + BlockReader<Header = N::BlockHeader>
2097 + StateProviderFactory
2098 + StateReader
2099 + ChangeSetReader
2100 + BlockNumReader
2101 + HashedPostStateProvider
2102 + Clone
2103 + 'static,
2104 N: NodePrimitives,
2105 V: PayloadValidator<Types, Block = N::Block> + Clone,
2106 Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
2107 Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
2108{
2109 fn validate_payload_attributes_against_header(
2110 &self,
2111 attr: &Types::PayloadAttributes,
2112 header: &N::BlockHeader,
2113 ) -> Result<(), InvalidPayloadAttributesError> {
2114 self.validator.validate_payload_attributes_against_header(attr, header)
2115 }
2116
2117 fn convert_payload_to_block(
2118 &self,
2119 payload: Types::ExecutionData,
2120 ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
2121 let block = self.validator.convert_payload_to_block(payload)?;
2122 Ok(block)
2123 }
2124
2125 fn validate_payload(
2126 &mut self,
2127 payload: Types::ExecutionData,
2128 ctx: TreeCtx<'_, N>,
2129 ) -> ValidationOutcome<N> {
2130 self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
2131 }
2132
2133 fn validate_block(
2134 &mut self,
2135 block: SealedBlock<N::Block>,
2136 ctx: TreeCtx<'_, N>,
2137 ) -> ValidationOutcome<N> {
2138 self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
2139 }
2140
2141 fn on_inserted_executed_block(
2142 &self,
2143 block: BuiltPayloadExecutedBlock<N>,
2144 state: &EngineApiTreeState<N>,
2145 ) -> ProviderResult<ExecutedBlock<N>> {
2146 self.payload_processor.on_inserted_executed_block(
2147 block.recovered_block.block_with_parent(),
2148 &block.execution_output.state,
2149 );
2150
2151 let (lazy_overlay, anchor_hash) =
2152 Self::get_parent_lazy_overlay(block.recovered_block.parent_hash(), state);
2153 let overlay_factory = OverlayStateProviderFactory::new(
2154 self.provider.clone(),
2155 OverlayBuilder::<N>::new(anchor_hash, self.changeset_cache.clone())
2156 .with_lazy_overlay(lazy_overlay),
2157 );
2158 let changeset_provider = overlay_factory.database_provider_ro()?;
2159
2160 Ok(self.spawn_deferred_trie_task(
2161 block.recovered_block,
2162 block.execution_output,
2163 state,
2164 LazyHashedPostState::ready(block.hashed_state),
2165 block.trie_updates,
2166 changeset_provider,
2167 ))
2168 }
2169
2170 fn cache_for(&self, block_hash: B256) -> Option<SavedCache> {
2171 Some(self.payload_processor.cache_for(block_hash))
2172 }
2173
2174 fn sparse_trie_handle_for(
2175 &self,
2176 parent_hash: B256,
2177 parent_state_root: B256,
2178 state: &EngineApiTreeState<N>,
2179 ) -> Option<StateRootHandle> {
2180 let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, state);
2181 let overlay_factory = OverlayStateProviderFactory::new(
2182 self.provider.clone(),
2183 OverlayBuilder::<N>::new(anchor_hash, self.changeset_cache.clone())
2184 .with_lazy_overlay(lazy_overlay),
2185 );
2186
2187 Some(self.payload_processor.spawn_state_root(
2188 overlay_factory,
2189 parent_state_root,
2190 false,
2192 &self.config,
2193 ))
2194 }
2195}
2196
2197impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
2198where
2199 Evm: ConfigureEvm,
2200{
2201 fn wait_for_caches(&self) -> CacheWaitDurations {
2202 self.payload_processor.wait_for_caches()
2203 }
2204}
2205
2206#[derive(Debug, Clone)]
2208pub enum BlockOrPayload<T: PayloadTypes> {
2209 Payload(T::ExecutionData),
2211 Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
2213}
2214
2215impl<T: PayloadTypes> BlockOrPayload<T> {
2216 pub fn hash(&self) -> B256 {
2218 match self {
2219 Self::Payload(payload) => payload.block_hash(),
2220 Self::Block(block) => block.hash(),
2221 }
2222 }
2223
2224 pub fn num_hash(&self) -> NumHash {
2226 match self {
2227 Self::Payload(payload) => payload.num_hash(),
2228 Self::Block(block) => block.num_hash(),
2229 }
2230 }
2231
2232 pub fn parent_hash(&self) -> B256 {
2234 match self {
2235 Self::Payload(payload) => payload.parent_hash(),
2236 Self::Block(block) => block.parent_hash(),
2237 }
2238 }
2239
2240 pub fn block_with_parent(&self) -> BlockWithParent {
2242 match self {
2243 Self::Payload(payload) => payload.block_with_parent(),
2244 Self::Block(block) => block.block_with_parent(),
2245 }
2246 }
2247
2248 pub const fn type_name(&self) -> &'static str {
2250 match self {
2251 Self::Payload(_) => "payload",
2252 Self::Block(_) => "block",
2253 }
2254 }
2255
2256 pub const fn is_payload(&self) -> bool {
2258 matches!(self, Self::Payload(_))
2259 }
2260
2261 pub const fn is_block(&self) -> bool {
2263 matches!(self, Self::Block(_))
2264 }
2265
2266 pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
2268 match self {
2269 Self::Payload(payload) => payload
2270 .block_access_list()
2271 .map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
2272 .transpose(),
2273 Self::Block(_) => Ok(None),
2274 }
2275 }
2276
2277 pub fn transaction_count(&self) -> usize
2279 where
2280 T::ExecutionData: ExecutionPayload,
2281 {
2282 match self {
2283 Self::Payload(payload) => payload.transaction_count(),
2284 Self::Block(block) => block.transaction_count(),
2285 }
2286 }
2287
2288 pub fn withdrawals(&self) -> Option<&[Withdrawal]>
2290 where
2291 T::ExecutionData: ExecutionPayload,
2292 {
2293 match self {
2294 Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
2295 Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
2296 }
2297 }
2298
2299 pub fn gas_used(&self) -> u64
2301 where
2302 T::ExecutionData: ExecutionPayload,
2303 {
2304 match self {
2305 Self::Payload(payload) => payload.gas_used(),
2306 Self::Block(block) => block.gas_used(),
2307 }
2308 }
2309
2310 pub fn gas_limit(&self) -> u64
2312 where
2313 T::ExecutionData: ExecutionPayload,
2314 {
2315 match self {
2316 Self::Payload(payload) => payload.gas_limit(),
2317 Self::Block(block) => block.gas_limit(),
2318 }
2319 }
2320}
2321
2322#[derive(Debug, Clone)]
2324pub struct CustomStateRootInput<'a, N: NodePrimitives> {
2325 pub block: &'a SealedBlock<N::Block>,
2327 pub parent_block: &'a SealedHeader<N::BlockHeader>,
2329 pub output: &'a BlockExecutionOutput<N::Receipt>,
2331 pub hashed_state: &'a LazyHashedPostState,
2333}
2334
2335pub type CustomStateRoot<N> = Arc<
2337 dyn Fn(CustomStateRootInput<'_, N>) -> ProviderResult<(B256, TrieUpdates)>
2338 + Send
2339 + Sync
2340 + 'static,
2341>;