1use crate::tree::{
4 cached_state::CachedStateProvider,
5 error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
6 instrumented_state::InstrumentedStateProvider,
7 payload_processor::PayloadProcessor,
8 precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
9 sparse_trie::StateRootComputeOutcome,
10 CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
11 StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
12};
13use alloy_consensus::transaction::{Either, TxHashRef};
14use alloy_eip7928::BlockAccessList;
15use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
16use alloy_evm::Evm;
17use alloy_primitives::B256;
18#[cfg(feature = "trie-debug")]
19use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
20
21use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
22use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
23use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
24use reth_engine_primitives::{
25 ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
26};
27use reth_errors::{BlockExecutionError, ProviderResult};
28use reth_evm::{
29 block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
30 OnStateHook, SpecFor,
31};
32use reth_payload_primitives::{
33 BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
34};
35use reth_primitives_traits::{
36 AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
37 RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
38};
39use reth_provider::{
40 providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
41 ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
42 ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
43 StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
44};
45use reth_revm::db::{states::bundle_state::BundleRetention, State};
46use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState, StateRoot};
47use reth_trie_db::ChangesetCache;
48use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
49use revm_primitives::Address;
50use std::{
51 collections::HashMap,
52 panic::{self, AssertUnwindSafe},
53 sync::{
54 atomic::{AtomicUsize, Ordering},
55 mpsc::RecvTimeoutError,
56 Arc,
57 },
58};
59use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
60
61type LazyHashedPostState = reth_tasks::LazyHandle<HashedPostState>;
63
64pub struct TreeCtx<'a, N: NodePrimitives> {
69 state: &'a mut EngineApiTreeState<N>,
71 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
73}
74
75impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("TreeCtx")
78 .field("state", &"EngineApiTreeState")
79 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
80 .finish()
81 }
82}
83
84impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
85 pub const fn new(
87 state: &'a mut EngineApiTreeState<N>,
88 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
89 ) -> Self {
90 Self { state, canonical_in_memory_state }
91 }
92
93 pub const fn state(&self) -> &EngineApiTreeState<N> {
95 &*self.state
96 }
97
98 pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
100 self.state
101 }
102
103 pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
105 self.canonical_in_memory_state
106 }
107}
108
109#[derive(derive_more::Debug)]
117pub struct BasicEngineValidator<P, Evm, V>
118where
119 Evm: ConfigureEvm,
120{
121 provider: P,
123 consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
125 evm_config: Evm,
127 config: TreeConfig,
129 payload_processor: PayloadProcessor<Evm>,
131 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
133 precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
135 #[debug(skip)]
137 invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
138 metrics: EngineApiMetrics,
140 validator: V,
142 changeset_cache: ChangesetCache,
144 runtime: reth_tasks::Runtime,
146}
147
148impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
149where
150 N: NodePrimitives,
151 P: DatabaseProviderFactory<
152 Provider: BlockReader
153 + StageCheckpointReader
154 + PruneCheckpointReader
155 + ChangeSetReader
156 + StorageChangeSetReader
157 + BlockNumReader
158 + StorageSettingsCache,
159 > + BlockReader<Header = N::BlockHeader>
160 + ChangeSetReader
161 + BlockNumReader
162 + StateProviderFactory
163 + StateReader
164 + HashedPostStateProvider
165 + Clone
166 + 'static,
167 Evm: ConfigureEvm<Primitives = N> + 'static,
168{
169 #[allow(clippy::too_many_arguments)]
171 pub fn new(
172 provider: P,
173 consensus: Arc<dyn FullConsensus<N>>,
174 evm_config: Evm,
175 validator: V,
176 config: TreeConfig,
177 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
178 changeset_cache: ChangesetCache,
179 runtime: reth_tasks::Runtime,
180 ) -> Self {
181 let precompile_cache_map = PrecompileCacheMap::default();
182 let payload_processor = PayloadProcessor::new(
183 runtime.clone(),
184 evm_config.clone(),
185 &config,
186 precompile_cache_map.clone(),
187 );
188 Self {
189 provider,
190 consensus,
191 evm_config,
192 payload_processor,
193 precompile_cache_map,
194 precompile_cache_metrics: HashMap::new(),
195 config,
196 invalid_block_hook,
197 metrics: EngineApiMetrics::default(),
198 validator,
199 changeset_cache,
200 runtime,
201 }
202 }
203
204 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
206 pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
207 &self,
208 input: BlockOrPayload<T>,
209 ) -> Result<SealedBlock<N::Block>, NewPayloadError>
210 where
211 V: PayloadValidator<T, Block = N::Block>,
212 {
213 match input {
214 BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
215 BlockOrPayload::Block(block) => Ok(block),
216 }
217 }
218
219 pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
221 &self,
222 input: &BlockOrPayload<T>,
223 ) -> Result<EvmEnvFor<Evm>, Evm::Error>
224 where
225 V: PayloadValidator<T, Block = N::Block>,
226 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
227 {
228 match input {
229 BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
230 BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
231 }
232 }
233
234 pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
236 &'a self,
237 input: &'a BlockOrPayload<T>,
238 ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
239 where
240 V: PayloadValidator<T, Block = N::Block>,
241 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
242 {
243 Ok(match input {
244 BlockOrPayload::Payload(payload) => {
245 let iter = self
246 .evm_config
247 .tx_iterator_for_payload(payload)
248 .map_err(NewPayloadError::other)?;
249 Either::Left(iter)
250 }
251 BlockOrPayload::Block(block) => {
252 let txs = block.body().clone_transactions();
253 let convert = |tx: N::SignedTx| tx.try_into_recovered();
254 Either::Right((txs, convert))
255 }
256 })
257 }
258
259 pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
261 &self,
262 input: &'a BlockOrPayload<T>,
263 ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
264 where
265 V: PayloadValidator<T, Block = N::Block>,
266 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
267 {
268 match input {
269 BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
270 BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
271 }
272 }
273
274 fn handle_execution_error<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
279 &self,
280 input: BlockOrPayload<T>,
281 execution_err: InsertBlockErrorKind,
282 parent_block: &SealedHeader<N::BlockHeader>,
283 ) -> Result<ExecutedBlock<N>, InsertPayloadError<N::Block>>
284 where
285 V: PayloadValidator<T, Block = N::Block>,
286 {
287 debug!(
288 target: "engine::tree::payload_validator",
289 ?execution_err,
290 block = ?input.num_hash(),
291 "Block execution failed, checking for header validation errors"
292 );
293
294 let block = self.convert_to_block(input)?;
297
298 if let Err(consensus_err) = self.validate_block_inner(&block, None) {
300 return Err(InsertBlockError::new(block, consensus_err.into()).into())
302 }
303
304 if let Err(consensus_err) =
306 self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
307 {
308 return Err(InsertBlockError::new(block, consensus_err.into()).into())
310 }
311
312 Err(InsertBlockError::new(block, execution_err).into())
314 }
315
316 #[instrument(
324 level = "debug",
325 target = "engine::tree::payload_validator",
326 skip_all,
327 fields(
328 parent = ?input.parent_hash(),
329 type_name = ?input.type_name(),
330 )
331 )]
332 pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
333 &mut self,
334 input: BlockOrPayload<T>,
335 mut ctx: TreeCtx<'_, N>,
336 ) -> ValidationOutcome<N, InsertPayloadError<N::Block>>
337 where
338 V: PayloadValidator<T, Block = N::Block> + Clone,
339 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
340 {
341 let is_payload = matches!(&input, BlockOrPayload::Payload(_));
345 let convert_to_block = match &input {
346 BlockOrPayload::Payload(_) => {
347 let payload_clone = input.clone();
348 let validator = self.validator.clone();
349 let handle = self.payload_processor.executor().spawn_blocking_named(
350 "payload-convert",
351 move || {
352 let BlockOrPayload::Payload(payload) = payload_clone else {
353 unreachable!()
354 };
355 validator.convert_payload_to_block(payload)
356 },
357 );
358 Either::Left(handle)
359 }
360 BlockOrPayload::Block(_) => Either::Right(()),
361 };
362
363 let convert_to_block =
366 move |input: BlockOrPayload<T>| -> Result<SealedBlock<N::Block>, NewPayloadError> {
367 match convert_to_block {
368 Either::Left(handle) => handle.try_into_inner().expect("sole handle"),
369 Either::Right(()) => {
370 let BlockOrPayload::Block(block) = input else { unreachable!() };
371 Ok(block)
372 }
373 }
374 };
375
376 macro_rules! ensure_ok {
379 ($expr:expr) => {
380 match $expr {
381 Ok(val) => val,
382 Err(e) => {
383 let block = convert_to_block(input)?;
384 return Err(InsertBlockError::new(block, e.into()).into())
385 }
386 }
387 };
388 }
389
390 macro_rules! ensure_ok_post_block {
392 ($expr:expr, $block:expr) => {
393 match $expr {
394 Ok(val) => val,
395 Err(e) => {
396 return Err(
397 InsertBlockError::new($block.into_sealed_block(), e.into()).into()
398 )
399 }
400 }
401 };
402 }
403
404 let parent_hash = input.parent_hash();
405
406 trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
407 let _enter =
408 debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
409 let Some(provider_builder) =
410 ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
411 else {
412 return Err(InsertBlockError::new(
414 convert_to_block(input)?,
415 ProviderError::HeaderNotFound(parent_hash.into()).into(),
416 )
417 .into())
418 };
419 let mut state_provider = ensure_ok!(provider_builder.build());
420 drop(_enter);
421
422 let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
425 else {
426 return Err(InsertBlockError::new(
427 convert_to_block(input)?,
428 ProviderError::HeaderNotFound(parent_hash.into()).into(),
429 )
430 .into())
431 };
432
433 let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
434 .in_scope(|| self.evm_env_for(&input))
435 .map_err(NewPayloadError::other)?;
436
437 let env = ExecutionEnv {
438 evm_env,
439 hash: input.hash(),
440 parent_hash: input.parent_hash(),
441 parent_state_root: parent_block.state_root(),
442 transaction_count: input.transaction_count(),
443 gas_used: input.gas_used(),
444 withdrawals: input.withdrawals().map(|w| w.to_vec()),
445 };
446
447 let strategy = self.plan_state_root_computation();
449
450 debug!(
451 target: "engine::tree::payload_validator",
452 ?strategy,
453 "Decided which state root algorithm to run"
454 );
455
456 let txs = self.tx_iterator_for(&input)?;
458
459 let block_access_list = ensure_ok!(input
461 .block_access_list()
462 .transpose()
463 .map_err(Box::<dyn std::error::Error + Send + Sync>::from))
465 .map(Arc::new);
466
467 let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
470
471 let overlay_factory =
474 OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
475 .with_block_hash(Some(anchor_hash))
476 .with_lazy_overlay(lazy_overlay);
477
478 let mut handle = ensure_ok!(self.spawn_payload_processor(
480 env.clone(),
481 txs,
482 provider_builder,
483 overlay_factory.clone(),
484 strategy,
485 block_access_list,
486 ));
487
488 if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
491 state_provider =
492 Box::new(CachedStateProvider::new(state_provider, caches, cache_metrics));
493 };
494
495 if self.config.state_provider_metrics() {
496 state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "engine"));
497 }
498
499 let (output, senders, receipt_root_rx) =
503 match self.execute_block(state_provider, env, &input, &mut handle) {
504 Ok(output) => output,
505 Err(err) => return self.handle_execution_error(input, err, &parent_block),
506 };
507
508 handle.stop_prewarming_execution();
510
511 let output = Arc::new(output);
515
516 let valid_block_tx = handle.terminate_caching(Some(output.clone()));
519
520 let hashed_state_output = output.clone();
524 let hashed_state_provider = self.provider.clone();
525 let hashed_state: LazyHashedPostState =
526 self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
527 let _span = debug_span!(
528 target: "engine::tree::payload_validator",
529 "hashed_post_state",
530 )
531 .entered();
532 hashed_state_provider.hashed_post_state(&hashed_state_output.state)
533 });
534
535 let block = convert_to_block(input)?;
536 let transaction_root = is_payload.then(|| {
537 let block = block.clone();
538 let parent_span = Span::current();
539 let num_hash = block.num_hash();
540 self.payload_processor.executor().spawn_blocking_named("payload-tx-root", move || {
541 let _span =
542 debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
543 .entered();
544 block.body().calculate_tx_root()
545 })
546 });
547 let block = block.with_senders(senders);
548
549 let receipt_root_bloom = {
551 let _enter = debug_span!(
552 target: "engine::tree::payload_validator",
553 "wait_receipt_root",
554 )
555 .entered();
556
557 receipt_root_rx
558 .blocking_recv()
559 .inspect_err(|_| {
560 tracing::error!(
561 target: "engine::tree::payload_validator",
562 "Receipt root task dropped sender without result, receipt root calculation likely aborted"
563 );
564 })
565 .ok()
566 };
567 let transaction_root = transaction_root.map(|handle| {
568 let _span =
569 debug_span!(target: "engine::tree::payload_validator", "wait_payload_tx_root")
570 .entered();
571 handle.try_into_inner().expect("sole handle")
572 });
573
574 let hashed_state = ensure_ok_post_block!(
575 self.validate_post_execution(
576 &block,
577 &parent_block,
578 &output,
579 &mut ctx,
580 transaction_root,
581 receipt_root_bloom,
582 hashed_state,
583 ),
584 block
585 );
586
587 let root_time = Instant::now();
588 let mut maybe_state_root = None;
589 let mut state_root_task_failed = false;
590 #[cfg(feature = "trie-debug")]
591 let mut trie_debug_recorders = Vec::new();
592
593 match strategy {
594 StateRootStrategy::StateRootTask => {
595 debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
596
597 let task_result = ensure_ok_post_block!(
598 self.await_state_root_with_timeout(
599 &mut handle,
600 overlay_factory.clone(),
601 &hashed_state,
602 ),
603 block
604 );
605
606 match task_result {
607 Ok(StateRootComputeOutcome {
608 state_root,
609 trie_updates,
610 #[cfg(feature = "trie-debug")]
611 debug_recorders,
612 }) => {
613 let elapsed = root_time.elapsed();
614 info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
615
616 #[cfg(feature = "trie-debug")]
617 {
618 trie_debug_recorders = debug_recorders;
619 }
620
621 if self.config.always_compare_trie_updates() {
623 let _has_diff = self.compare_trie_updates_with_serial(
624 overlay_factory.clone(),
625 &hashed_state,
626 trie_updates.as_ref().clone(),
627 );
628 #[cfg(feature = "trie-debug")]
629 if _has_diff {
630 Self::write_trie_debug_recorders(
631 block.header().number(),
632 &trie_debug_recorders,
633 );
634 }
635 }
636
637 if state_root == block.header().state_root() {
639 maybe_state_root = Some((state_root, trie_updates, elapsed))
640 } else {
641 warn!(
642 target: "engine::tree::payload_validator",
643 ?state_root,
644 block_state_root = ?block.header().state_root(),
645 "State root task returned incorrect state root"
646 );
647 #[cfg(feature = "trie-debug")]
648 Self::write_trie_debug_recorders(
649 block.header().number(),
650 &trie_debug_recorders,
651 );
652 state_root_task_failed = true;
653 }
654 }
655 Err(error) => {
656 debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
657 state_root_task_failed = true;
658 }
659 }
660 }
661 StateRootStrategy::Parallel => {
662 debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
663 match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
664 Ok(result) => {
665 let elapsed = root_time.elapsed();
666 info!(
667 target: "engine::tree::payload_validator",
668 regular_state_root = ?result.0,
669 ?elapsed,
670 "Regular root task finished"
671 );
672 maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
673 }
674 Err(error) => {
675 debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
676 }
677 }
678 }
679 StateRootStrategy::Synchronous => {}
680 }
681
682 let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
686 maybe_state_root
687 {
688 maybe_state_root
689 } else {
690 if self.config.state_root_fallback() {
692 debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
693 } else {
694 warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
695 self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
696 }
697
698 let (root, updates) = ensure_ok_post_block!(
699 Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
700 block
701 );
702
703 if state_root_task_failed {
704 self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
705 }
706
707 (root, Arc::new(updates), root_time.elapsed())
708 };
709
710 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
711 self.metrics
712 .record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
713 debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
714
715 if state_root != block.header().state_root() {
717 #[cfg(feature = "trie-debug")]
718 Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
719
720 self.on_invalid_block(
722 &parent_block,
723 &block,
724 &output,
725 Some((&trie_output, state_root)),
726 ctx.state_mut(),
727 );
728 let block_state_root = block.header().state_root();
729 return Err(InsertBlockError::new(
730 block.into_sealed_block(),
731 ConsensusError::BodyStateRootDiff(
732 GotExpected { got: state_root, expected: block_state_root }.into(),
733 )
734 .into(),
735 )
736 .into())
737 }
738
739 if let Some(valid_block_tx) = valid_block_tx {
740 let _ = valid_block_tx.send(());
741 }
742
743 let changeset_provider =
748 ensure_ok_post_block!(overlay_factory.database_provider_ro(), block);
749
750 Ok(self.spawn_deferred_trie_task(
751 block,
752 output,
753 &ctx,
754 hashed_state,
755 trie_output,
756 changeset_provider,
757 ))
758 }
759
760 fn sealed_header_by_hash(
762 &self,
763 hash: B256,
764 state: &EngineApiTreeState<N>,
765 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
766 let header = state.tree_state.sealed_header_by_hash(&hash);
768
769 if header.is_some() {
770 Ok(header)
771 } else {
772 self.provider.sealed_header_by_hash(hash)
773 }
774 }
775
776 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
779 fn validate_block_inner(
780 &self,
781 block: &SealedBlock<N::Block>,
782 transaction_root: Option<B256>,
783 ) -> Result<(), ConsensusError> {
784 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
785 error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
786 return Err(e)
787 }
788
789 if let Err(e) =
790 self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
791 {
792 error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
793 return Err(e)
794 }
795
796 Ok(())
797 }
798
799 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
807 #[expect(clippy::type_complexity)]
808 fn execute_block<S, Err, T>(
809 &mut self,
810 state_provider: S,
811 env: ExecutionEnv<Evm>,
812 input: &BlockOrPayload<T>,
813 handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
814 ) -> Result<
815 (
816 BlockExecutionOutput<N::Receipt>,
817 Vec<Address>,
818 tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
819 ),
820 InsertBlockErrorKind,
821 >
822 where
823 S: StateProvider + Send,
824 Err: core::error::Error + Send + Sync + 'static,
825 V: PayloadValidator<T, Block = N::Block>,
826 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
827 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
828 {
829 debug!(target: "engine::tree::payload_validator", "Executing block");
830
831 let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
832 State::builder()
833 .with_database(StateProviderDatabase::new(state_provider))
834 .with_bundle_update()
835 .build()
836 });
837
838 let (spec_id, mut executor) = {
839 let _span = debug_span!(target: "engine::tree", "create_evm").entered();
840 let spec_id = *env.evm_env.spec_id();
841 let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
842 let ctx = self
843 .execution_ctx_for(input)
844 .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
845 let executor = self.evm_config.create_executor(evm, ctx);
846 (spec_id, executor)
847 };
848
849 if !self.config.precompile_cache_disabled() {
850 let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
851 executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
852 |address, precompile| {
853 let metrics = self
854 .precompile_cache_metrics
855 .entry(*address)
856 .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
857 .clone();
858 CachedPrecompile::wrap(
859 precompile,
860 self.precompile_cache_map.cache_for_address(*address),
861 spec_id,
862 Some(metrics),
863 )
864 },
865 );
866 }
867
868 let receipts_len = input.transaction_count();
871 let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
872 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
873 let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
874 self.payload_processor
875 .executor()
876 .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
877
878 let transaction_count = input.transaction_count();
879 let executed_tx_index = Arc::clone(handle.executed_tx_index());
880 let executor = executor.with_state_hook(
881 handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
882 );
883
884 let execution_start = Instant::now();
885
886 let (executor, senders) = self.execute_transactions(
888 executor,
889 transaction_count,
890 handle.iter_transactions(),
891 &receipt_tx,
892 &executed_tx_index,
893 )?;
894 drop(receipt_tx);
895
896 let post_exec_start = Instant::now();
898 let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
899 .in_scope(|| executor.finish())
900 .map(|(evm, result)| (evm.into_db(), result))?;
901 self.metrics.record_post_execution(post_exec_start.elapsed());
902
903 debug_span!(target: "engine::tree", "merge_transitions")
905 .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
906
907 let output = BlockExecutionOutput { result, state: db.take_bundle() };
908
909 let execution_duration = execution_start.elapsed();
910 self.metrics.record_block_execution(&output, execution_duration);
911 self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
912
913 debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
914 Ok((output, senders, result_rx))
915 }
916
917 fn execute_transactions<E, Tx, InnerTx, Err>(
927 &self,
928 mut executor: E,
929 transaction_count: usize,
930 transactions: impl Iterator<Item = Result<Tx, Err>>,
931 receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
932 executed_tx_index: &AtomicUsize,
933 ) -> Result<(E, Vec<Address>), BlockExecutionError>
934 where
935 E: BlockExecutor<Receipt = N::Receipt>,
936 Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
937 InnerTx: TxHashRef,
938 Err: core::error::Error + Send + Sync + 'static,
939 {
940 let mut senders = Vec::with_capacity(transaction_count);
941
942 let pre_exec_start = Instant::now();
944 debug_span!(target: "engine::tree", "pre_execution")
945 .in_scope(|| executor.apply_pre_execution_changes())?;
946 self.metrics.record_pre_execution(pre_exec_start.elapsed());
947
948 let exec_span = debug_span!(target: "engine::tree", "execution").entered();
950 let mut transactions = transactions.into_iter();
951 let mut last_sent_len = 0usize;
956 loop {
957 let wait_start = Instant::now();
960 let Some(tx_result) = transactions.next() else { break };
961 self.metrics.record_transaction_wait(wait_start.elapsed());
962
963 let tx = tx_result.map_err(BlockExecutionError::other)?;
964 let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
965
966 senders.push(tx_signer);
967
968 let _enter = debug_span!(
969 target: "engine::tree",
970 "execute tx",
971 )
972 .entered();
973 trace!(target: "engine::tree", "Executing transaction");
974
975 let tx_start = Instant::now();
976 executor.execute_transaction(tx)?;
977 self.metrics.record_transaction_execution(tx_start.elapsed());
978
979 executed_tx_index.store(senders.len(), Ordering::Relaxed);
981
982 let current_len = executor.receipts().len();
983 if current_len > last_sent_len {
984 last_sent_len = current_len;
985 if let Some(receipt) = executor.receipts().last() {
987 let tx_index = current_len - 1;
988 let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
989 }
990 }
991 }
992 drop(exec_span);
993
994 Ok((executor, senders))
995 }
996
997 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1008 fn compute_state_root_parallel(
1009 &self,
1010 overlay_factory: OverlayStateProviderFactory<P>,
1011 hashed_state: &LazyHashedPostState,
1012 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
1013 let hashed_state = hashed_state.get();
1014 let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1018 let overlay_factory =
1019 overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1020 ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
1021 .incremental_root_with_updates()
1022 }
1023
1024 fn compute_state_root_serial(
1030 overlay_factory: OverlayStateProviderFactory<P>,
1031 hashed_state: &LazyHashedPostState,
1032 ) -> ProviderResult<(B256, TrieUpdates)> {
1033 let hashed_state = hashed_state.get();
1034 let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1038 let overlay_factory =
1039 overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1040
1041 let provider = overlay_factory.database_provider_ro()?;
1042
1043 Ok(StateRoot::new(&provider, &provider)
1044 .with_prefix_sets(prefix_sets)
1045 .root_with_updates()?)
1046 }
1047
1048 #[instrument(
1062 level = "debug",
1063 target = "engine::tree::payload_validator",
1064 name = "await_state_root",
1065 skip_all
1066 )]
1067 fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
1068 &self,
1069 handle: &mut PayloadHandle<Tx, Err, R>,
1070 overlay_factory: OverlayStateProviderFactory<P>,
1071 hashed_state: &LazyHashedPostState,
1072 ) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
1073 let Some(timeout) = self.config.state_root_task_timeout() else {
1074 return Ok(handle.state_root());
1075 };
1076
1077 let task_rx = handle.take_state_root_rx();
1078
1079 match task_rx.recv_timeout(timeout) {
1080 Ok(result) => Ok(result),
1081 Err(RecvTimeoutError::Disconnected) => {
1082 Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
1083 }
1084 Err(RecvTimeoutError::Timeout) => {
1085 warn!(
1086 target: "engine::tree::payload_validator",
1087 ?timeout,
1088 "State root task timed out, spawning sequential fallback"
1089 );
1090 self.metrics.block_validation.state_root_task_timeout_total.increment(1);
1091
1092 let (seq_tx, seq_rx) =
1093 std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
1094
1095 let seq_overlay = overlay_factory;
1096 let seq_hashed_state = hashed_state.clone();
1097 self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
1098 let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
1099 let _ = seq_tx.send(result);
1100 });
1101
1102 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
1103
1104 loop {
1105 match task_rx.recv_timeout(POLL_INTERVAL) {
1106 Ok(result) => {
1107 debug!(
1108 target: "engine::tree::payload_validator",
1109 source = "task",
1110 "State root timeout race won"
1111 );
1112 return Ok(result);
1113 }
1114 Err(RecvTimeoutError::Disconnected) => {
1115 debug!(
1116 target: "engine::tree::payload_validator",
1117 "State root task dropped, waiting for sequential fallback"
1118 );
1119 let result = seq_rx.recv().map_err(|_| {
1120 ProviderError::other(std::io::Error::other(
1121 "both state root computations failed",
1122 ))
1123 })?;
1124 let (state_root, trie_updates) = result?;
1125 return Ok(Ok(StateRootComputeOutcome {
1126 state_root,
1127 trie_updates: Arc::new(trie_updates),
1128 #[cfg(feature = "trie-debug")]
1129 debug_recorders: Vec::new(),
1130 }));
1131 }
1132 Err(RecvTimeoutError::Timeout) => {}
1133 }
1134
1135 if let Ok(result) = seq_rx.try_recv() {
1136 debug!(
1137 target: "engine::tree::payload_validator",
1138 source = "sequential",
1139 "State root timeout race won"
1140 );
1141 let (state_root, trie_updates) = result?;
1142 return Ok(Ok(StateRootComputeOutcome {
1143 state_root,
1144 trie_updates: Arc::new(trie_updates),
1145 #[cfg(feature = "trie-debug")]
1146 debug_recorders: Vec::new(),
1147 }));
1148 }
1149 }
1150 }
1151 }
1152 }
1153
1154 fn compare_trie_updates_with_serial(
1161 &self,
1162 overlay_factory: OverlayStateProviderFactory<P>,
1163 hashed_state: &LazyHashedPostState,
1164 task_trie_updates: TrieUpdates,
1165 ) -> bool {
1166 debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1167
1168 match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
1169 Ok((serial_root, serial_trie_updates)) => {
1170 debug!(
1171 target: "engine::tree::payload_validator",
1172 ?serial_root,
1173 "Serial state root computation finished for comparison"
1174 );
1175
1176 match overlay_factory.database_provider_ro() {
1178 Ok(provider) => {
1179 match super::trie_updates::compare_trie_updates(
1180 &provider,
1181 task_trie_updates,
1182 serial_trie_updates,
1183 ) {
1184 Ok(has_diff) => return has_diff,
1185 Err(err) => {
1186 warn!(
1187 target: "engine::tree::payload_validator",
1188 %err,
1189 "Error comparing trie updates"
1190 );
1191 return true;
1192 }
1193 }
1194 }
1195 Err(err) => {
1196 warn!(
1197 target: "engine::tree::payload_validator",
1198 %err,
1199 "Failed to get database provider for trie update comparison"
1200 );
1201 }
1202 }
1203 }
1204 Err(err) => {
1205 warn!(
1206 target: "engine::tree::payload_validator",
1207 %err,
1208 "Failed to compute serial state root for comparison"
1209 );
1210 }
1211 }
1212 false
1213 }
1214
1215 #[cfg(feature = "trie-debug")]
1220 fn write_trie_debug_recorders(
1221 block_number: u64,
1222 recorders: &[(Option<B256>, TrieDebugRecorder)],
1223 ) {
1224 let path = format!("trie_debug_block_{block_number}.json");
1225 match serde_json::to_string_pretty(recorders) {
1226 Ok(json) => match std::fs::write(&path, json) {
1227 Ok(()) => {
1228 warn!(
1229 target: "engine::tree::payload_validator",
1230 %path,
1231 "Wrote trie debug recorders to file"
1232 );
1233 }
1234 Err(err) => {
1235 warn!(
1236 target: "engine::tree::payload_validator",
1237 %err,
1238 %path,
1239 "Failed to write trie debug recorders"
1240 );
1241 }
1242 },
1243 Err(err) => {
1244 warn!(
1245 target: "engine::tree::payload_validator",
1246 %err,
1247 "Failed to serialize trie debug recorders"
1248 );
1249 }
1250 }
1251 }
1252
1253 #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1265 #[expect(clippy::too_many_arguments)]
1266 fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1267 &self,
1268 block: &RecoveredBlock<N::Block>,
1269 parent_block: &SealedHeader<N::BlockHeader>,
1270 output: &BlockExecutionOutput<N::Receipt>,
1271 ctx: &mut TreeCtx<'_, N>,
1272 transaction_root: Option<B256>,
1273 receipt_root_bloom: Option<ReceiptRootBloom>,
1274 hashed_state: LazyHashedPostState,
1275 ) -> Result<LazyHashedPostState, InsertBlockErrorKind>
1276 where
1277 V: PayloadValidator<T, Block = N::Block>,
1278 {
1279 let start = Instant::now();
1280
1281 trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1282 if let Err(e) = self.validate_block_inner(block, transaction_root) {
1284 return Err(e.into())
1285 }
1286
1287 let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
1289 if let Err(e) =
1290 self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
1291 {
1292 warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
1293 return Err(e.into())
1294 }
1295 drop(_enter);
1296
1297 let _enter =
1299 debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1300 .entered();
1301 if let Err(err) =
1302 self.consensus.validate_block_post_execution(block, output, receipt_root_bloom)
1303 {
1304 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1306 return Err(err.into())
1307 }
1308 drop(_enter);
1309
1310 let hashed_state_ref =
1313 debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
1314 .in_scope(|| hashed_state.get());
1315
1316 let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
1317 if let Err(err) =
1318 self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, block)
1319 {
1320 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1322 return Err(err.into())
1323 }
1324
1325 self.metrics
1327 .block_validation
1328 .post_execution_validation_duration
1329 .record(start.elapsed().as_secs_f64());
1330
1331 Ok(hashed_state)
1332 }
1333
1334 #[allow(clippy::too_many_arguments)]
1350 #[instrument(
1351 level = "debug",
1352 target = "engine::tree::payload_validator",
1353 skip_all,
1354 fields(?strategy)
1355 )]
1356 fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1357 &mut self,
1358 env: ExecutionEnv<Evm>,
1359 txs: T,
1360 provider_builder: StateProviderBuilder<N, P>,
1361 overlay_factory: OverlayStateProviderFactory<P>,
1362 strategy: StateRootStrategy,
1363 block_access_list: Option<Arc<BlockAccessList>>,
1364 ) -> Result<
1365 PayloadHandle<
1366 impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1367 impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1368 N::Receipt,
1369 >,
1370 InsertBlockErrorKind,
1371 > {
1372 match strategy {
1373 StateRootStrategy::StateRootTask => {
1374 let spawn_start = Instant::now();
1375
1376 let handle = self.payload_processor.spawn(
1378 env,
1379 txs,
1380 provider_builder,
1381 overlay_factory,
1382 &self.config,
1383 block_access_list,
1384 );
1385
1386 self.metrics
1388 .block_validation
1389 .spawn_payload_processor
1390 .record(spawn_start.elapsed().as_secs_f64());
1391
1392 Ok(handle)
1393 }
1394 StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
1395 let start = Instant::now();
1396 let handle = self.payload_processor.spawn_cache_exclusive(
1397 env,
1398 txs,
1399 provider_builder,
1400 block_access_list,
1401 );
1402
1403 self.metrics
1405 .block_validation
1406 .spawn_payload_processor
1407 .record(start.elapsed().as_secs_f64());
1408
1409 Ok(handle)
1410 }
1411 }
1412 }
1413
1414 fn state_provider_builder(
1419 &self,
1420 hash: B256,
1421 state: &EngineApiTreeState<N>,
1422 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1423 if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1424 debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1425 return Ok(Some(StateProviderBuilder::new(
1427 self.provider.clone(),
1428 historical,
1429 Some(blocks),
1430 )))
1431 }
1432
1433 if let Some(header) = self.provider.header(hash)? {
1435 debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1436 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1439 }
1440
1441 debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1442 Ok(None)
1443 }
1444
1445 const fn plan_state_root_computation(&self) -> StateRootStrategy {
1450 if self.config.state_root_fallback() {
1451 StateRootStrategy::Synchronous
1452 } else if self.config.use_state_root_task() {
1453 StateRootStrategy::StateRootTask
1454 } else {
1455 StateRootStrategy::Parallel
1456 }
1457 }
1458
1459 fn on_invalid_block(
1461 &self,
1462 parent_header: &SealedHeader<N::BlockHeader>,
1463 block: &RecoveredBlock<N::Block>,
1464 output: &BlockExecutionOutput<N::Receipt>,
1465 trie_updates: Option<(&TrieUpdates, B256)>,
1466 state: &mut EngineApiTreeState<N>,
1467 ) {
1468 if state.invalid_headers.get(&block.hash()).is_some() {
1469 return
1471 }
1472 self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1473 }
1474
1475 fn get_parent_lazy_overlay(
1485 parent_hash: B256,
1486 state: &EngineApiTreeState<N>,
1487 ) -> (Option<LazyOverlay>, B256) {
1488 let (anchor_hash, blocks) =
1490 state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
1491
1492 if blocks.is_empty() {
1493 debug!(target: "engine::tree::payload_validator", "Parent found on disk, no lazy overlay needed");
1494 return (None, anchor_hash);
1495 }
1496
1497 if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
1499 debug!(
1500 target: "engine::tree::payload_validator",
1501 %parent_hash,
1502 %anchor_hash,
1503 "Using cached canonical overlay"
1504 );
1505 return (Some(cached.overlay.clone()), cached.anchor_hash);
1506 }
1507
1508 debug!(
1509 target: "engine::tree::payload_validator",
1510 %anchor_hash,
1511 num_blocks = blocks.len(),
1512 "Creating lazy overlay for in-memory blocks"
1513 );
1514
1515 let handles: Vec<DeferredTrieData> = blocks.iter().map(|b| b.trie_data_handle()).collect();
1517
1518 (Some(LazyOverlay::new(anchor_hash, handles)), anchor_hash)
1519 }
1520
1521 fn spawn_deferred_trie_task(
1538 &self,
1539 block: RecoveredBlock<N::Block>,
1540 execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1541 ctx: &TreeCtx<'_, N>,
1542 hashed_state: LazyHashedPostState,
1543 trie_output: Arc<TrieUpdates>,
1544 changeset_provider: impl TrieCursorFactory + Send + 'static,
1545 ) -> ExecutedBlock<N> {
1546 let (anchor_hash, overlay_blocks) = ctx
1548 .state()
1549 .tree_state
1550 .blocks_by_hash(block.parent_hash())
1551 .unwrap_or_else(|| (block.parent_hash(), Vec::new()));
1552
1553 let ancestors: Vec<DeferredTrieData> =
1556 overlay_blocks.iter().rev().map(|b| b.trie_data_handle()).collect();
1557
1558 let hashed_state = match hashed_state.try_into_inner() {
1562 Ok(state) => Arc::new(state),
1563 Err(handle) => Arc::new(handle.get().clone()),
1564 };
1565 let deferred_trie_data =
1566 DeferredTrieData::pending(hashed_state, trie_output, anchor_hash, ancestors);
1567 let deferred_handle_task = deferred_trie_data.clone();
1568 let block_validation_metrics = self.metrics.block_validation.clone();
1569
1570 let block_hash = block.hash();
1572 let block_number = block.number();
1573
1574 let pending_changeset_guard = self.changeset_cache.register_pending(block_hash);
1578
1579 let compute_trie_input_task = move || {
1582 let _span = debug_span!(
1583 target: "engine::tree::payload_validator",
1584 "compute_trie_input_task",
1585 block_number
1586 )
1587 .entered();
1588
1589 let result = panic::catch_unwind(AssertUnwindSafe(|| {
1590 let compute_start = Instant::now();
1591 let computed = deferred_handle_task.wait_cloned();
1592 block_validation_metrics
1593 .deferred_trie_compute_duration
1594 .record(compute_start.elapsed().as_secs_f64());
1595
1596 block_validation_metrics
1598 .hashed_post_state_size
1599 .record(computed.hashed_state.total_len() as f64);
1600 block_validation_metrics
1601 .trie_updates_sorted_size
1602 .record(computed.trie_updates.total_len() as f64);
1603 if let Some(anchored) = &computed.anchored_trie_input {
1604 block_validation_metrics
1605 .anchored_overlay_trie_updates_size
1606 .record(anchored.trie_input.nodes.total_len() as f64);
1607 block_validation_metrics
1608 .anchored_overlay_hashed_state_size
1609 .record(anchored.trie_input.state.total_len() as f64);
1610 }
1611
1612 let changeset_start = Instant::now();
1616
1617 match reth_trie::changesets::compute_trie_changesets(
1618 &changeset_provider,
1619 &computed.trie_updates,
1620 ) {
1621 Ok(changesets) => {
1622 debug!(
1623 target: "engine::tree::changeset",
1624 ?block_number,
1625 elapsed = ?changeset_start.elapsed(),
1626 "Computed and caching changesets"
1627 );
1628
1629 pending_changeset_guard.resolve(block_number, Arc::new(changesets));
1630 }
1631 Err(e) => {
1632 warn!(
1633 target: "engine::tree::changeset",
1634 ?block_number,
1635 ?e,
1636 "Failed to compute changesets in deferred trie task"
1637 );
1638 }
1639 }
1640 }));
1641
1642 if result.is_err() {
1643 error!(
1644 target: "engine::tree::payload_validator",
1645 "Deferred trie task panicked; fallback computation will be used when trie data is accessed"
1646 );
1647 }
1648 };
1649
1650 self.payload_processor
1652 .executor()
1653 .spawn_blocking_named("trie-input", compute_trie_input_task);
1654
1655 ExecutedBlock::with_deferred_trie_data(
1656 Arc::new(block),
1657 execution_outcome,
1658 deferred_trie_data,
1659 )
1660 }
1661}
1662
1663pub type ValidationOutcome<N, E = InsertPayloadError<BlockTy<N>>> = Result<ExecutedBlock<N>, E>;
1665
1666#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1668enum StateRootStrategy {
1669 StateRootTask,
1671 Parallel,
1673 Synchronous,
1675}
1676
1677pub trait EngineValidator<
1681 Types: PayloadTypes,
1682 N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
1683>: Send + Sync + 'static
1684{
1685 fn validate_payload_attributes_against_header(
1695 &self,
1696 attr: &Types::PayloadAttributes,
1697 header: &N::BlockHeader,
1698 ) -> Result<(), InvalidPayloadAttributesError>;
1699
1700 fn convert_payload_to_block(
1709 &self,
1710 payload: Types::ExecutionData,
1711 ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
1712
1713 fn validate_payload(
1715 &mut self,
1716 payload: Types::ExecutionData,
1717 ctx: TreeCtx<'_, N>,
1718 ) -> ValidationOutcome<N>;
1719
1720 fn validate_block(
1722 &mut self,
1723 block: SealedBlock<N::Block>,
1724 ctx: TreeCtx<'_, N>,
1725 ) -> ValidationOutcome<N>;
1726
1727 fn on_inserted_executed_block(&self, block: ExecutedBlock<N>);
1732}
1733
1734impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
1735where
1736 P: DatabaseProviderFactory<
1737 Provider: BlockReader
1738 + StageCheckpointReader
1739 + PruneCheckpointReader
1740 + ChangeSetReader
1741 + StorageChangeSetReader
1742 + BlockNumReader
1743 + StorageSettingsCache,
1744 > + BlockReader<Header = N::BlockHeader>
1745 + StateProviderFactory
1746 + StateReader
1747 + ChangeSetReader
1748 + BlockNumReader
1749 + HashedPostStateProvider
1750 + Clone
1751 + 'static,
1752 N: NodePrimitives,
1753 V: PayloadValidator<Types, Block = N::Block> + Clone,
1754 Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
1755 Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1756{
1757 fn validate_payload_attributes_against_header(
1758 &self,
1759 attr: &Types::PayloadAttributes,
1760 header: &N::BlockHeader,
1761 ) -> Result<(), InvalidPayloadAttributesError> {
1762 self.validator.validate_payload_attributes_against_header(attr, header)
1763 }
1764
1765 fn convert_payload_to_block(
1766 &self,
1767 payload: Types::ExecutionData,
1768 ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
1769 let block = self.validator.convert_payload_to_block(payload)?;
1770 Ok(block)
1771 }
1772
1773 fn validate_payload(
1774 &mut self,
1775 payload: Types::ExecutionData,
1776 ctx: TreeCtx<'_, N>,
1777 ) -> ValidationOutcome<N> {
1778 self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
1779 }
1780
1781 fn validate_block(
1782 &mut self,
1783 block: SealedBlock<N::Block>,
1784 ctx: TreeCtx<'_, N>,
1785 ) -> ValidationOutcome<N> {
1786 self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
1787 }
1788
1789 fn on_inserted_executed_block(&self, block: ExecutedBlock<N>) {
1790 self.payload_processor.on_inserted_executed_block(
1791 block.recovered_block.block_with_parent(),
1792 &block.execution_output.state,
1793 );
1794 }
1795}
1796
1797impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
1798where
1799 Evm: ConfigureEvm,
1800{
1801 fn wait_for_caches(&self) -> CacheWaitDurations {
1802 self.payload_processor.wait_for_caches()
1803 }
1804}
1805
1806#[derive(Debug, Clone)]
1808pub enum BlockOrPayload<T: PayloadTypes> {
1809 Payload(T::ExecutionData),
1811 Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
1813}
1814
1815impl<T: PayloadTypes> BlockOrPayload<T> {
1816 pub fn hash(&self) -> B256 {
1818 match self {
1819 Self::Payload(payload) => payload.block_hash(),
1820 Self::Block(block) => block.hash(),
1821 }
1822 }
1823
1824 pub fn num_hash(&self) -> NumHash {
1826 match self {
1827 Self::Payload(payload) => payload.num_hash(),
1828 Self::Block(block) => block.num_hash(),
1829 }
1830 }
1831
1832 pub fn parent_hash(&self) -> B256 {
1834 match self {
1835 Self::Payload(payload) => payload.parent_hash(),
1836 Self::Block(block) => block.parent_hash(),
1837 }
1838 }
1839
1840 pub fn block_with_parent(&self) -> BlockWithParent {
1842 match self {
1843 Self::Payload(payload) => payload.block_with_parent(),
1844 Self::Block(block) => block.block_with_parent(),
1845 }
1846 }
1847
1848 pub const fn type_name(&self) -> &'static str {
1850 match self {
1851 Self::Payload(_) => "payload",
1852 Self::Block(_) => "block",
1853 }
1854 }
1855
1856 pub const fn block_access_list(&self) -> Option<Result<BlockAccessList, alloy_rlp::Error>> {
1858 None
1860 }
1861
1862 pub fn transaction_count(&self) -> usize
1864 where
1865 T::ExecutionData: ExecutionPayload,
1866 {
1867 match self {
1868 Self::Payload(payload) => payload.transaction_count(),
1869 Self::Block(block) => block.transaction_count(),
1870 }
1871 }
1872
1873 pub fn withdrawals(&self) -> Option<&[Withdrawal]>
1875 where
1876 T::ExecutionData: ExecutionPayload,
1877 {
1878 match self {
1879 Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
1880 Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
1881 }
1882 }
1883
1884 pub fn gas_used(&self) -> u64
1886 where
1887 T::ExecutionData: ExecutionPayload,
1888 {
1889 match self {
1890 Self::Payload(payload) => payload.gas_used(),
1891 Self::Block(block) => block.gas_used(),
1892 }
1893 }
1894}