Skip to main content

reth_engine_tree/tree/
payload_validator.rs

1//! Types and traits for validating blocks and payloads.
2//!
3//! # Validation pipeline
4//!
5//! When the engine processes a new payload (`engine_newPayload`), validation happens in phases:
6//!
7//! ## Phase 1 – Payload conversion
8//! [`PayloadValidator::convert_payload_to_block`] decodes the execution payload (RLP, hashing)
9//! into a [`SealedBlock`]. This runs on a background thread concurrently with state setup.
10//!
11//! ## Phase 2 – Pre-execution consensus
12//! - [`HeaderValidator::validate_header`] — standalone header checks (hash, gas, base fee,
13//!   fork-specific fields)
14//! - [`Consensus::validate_block_pre_execution`] — body vs header (tx root, ommer hash, withdrawals
15//!   root)
16//! - [`HeaderValidator::validate_header_against_parent`] — sequential checks (block number,
17//!   timestamp, gas limit, base fee vs parent)
18//!
19//! ## Phase 3 – Execution
20//! Block transactions are executed via the EVM. Receipt roots are computed incrementally.
21//!
22//! ## Phase 4 – Post-execution consensus
23//! - [`FullConsensus::validate_block_post_execution`] — gas used, receipt root, logs bloom,
24//!   requests hash
25//! - [`PayloadValidator::validate_block_post_execution_with_hashed_state`] — network-specific
26//!   (no-op on L1, used by OP Stack)
27//!
28//! ## Payload attributes validation (`engine_forkchoiceUpdated`)
29//! When the CL provides payload attributes to start building a block:
30//! - [`PayloadValidator::validate_payload_attributes_against_header`] — ensures timestamp ordering
31//!
32//! If validation passes, a payload build job is started. If it fails,
33//! `INVALID_PAYLOAD_ATTRIBUTES` is returned without rolling back the forkchoice update.
34//!
35//! [`HeaderValidator::validate_header`]: reth_consensus::HeaderValidator::validate_header
36//! [`Consensus::validate_block_pre_execution`]: reth_consensus::Consensus::validate_block_pre_execution
37//! [`HeaderValidator::validate_header_against_parent`]: reth_consensus::HeaderValidator::validate_header_against_parent
38//! [`FullConsensus::validate_block_post_execution`]: reth_consensus::FullConsensus::validate_block_post_execution
39//! [`SealedBlock`]: reth_primitives_traits::SealedBlock
40
41use crate::tree::{
42    error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
43    instrumented_state::{InstrumentedStateProvider, StateProviderMetrics, StateProviderStats},
44    multiproof::{StateRootComputeOutcome, StateRootHandle},
45    payload_processor::PayloadProcessor,
46    precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
47    types::{InsertPayloadResult, ValidationOutput},
48    CacheWaitDurations, CachedStateProvider, EngineApiMetrics, EngineApiTreeState, ExecutionEnv,
49    PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
50};
51use alloy_consensus::transaction::{Either, TxHashRef};
52use alloy_eip7928::{bal::DecodedBal, compute_block_access_list_hash, BlockAccessList};
53use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
54use alloy_evm::Evm;
55use alloy_primitives::{map::B256Set, B256};
56use reth_tasks::LazyHandle;
57#[cfg(feature = "trie-debug")]
58use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
59
60use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
61use reth_chain_state::{
62    CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, ExecutionTimingStats,
63};
64use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
65use reth_engine_primitives::{
66    ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
67};
68use reth_errors::{BlockExecutionError, ProviderResult};
69use reth_evm::{
70    block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
71    OnStateHook, SpecFor,
72};
73use reth_execution_cache::{CacheFillMode, CacheStats, SavedCache};
74use reth_payload_primitives::{
75    BuiltPayload, BuiltPayloadExecutedBlock, InvalidPayloadAttributesError, NewPayloadError,
76    PayloadTypes,
77};
78use reth_primitives_traits::{
79    AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
80    RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
81};
82use reth_provider::{
83    providers::{OverlayBuilder, OverlayStateProvider, OverlayStateProviderFactory},
84    BlockExecutionOutput, BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory,
85    DatabaseProviderROFactory, HashedPostStateProvider, ProviderError, PruneCheckpointReader,
86    StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateReader,
87    StorageChangeSetReader, StorageSettingsCache,
88};
89use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
90use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState};
91use reth_trie_db::ChangesetCache;
92use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
93use revm_primitives::{Address, KECCAK_EMPTY};
94use std::{
95    collections::HashMap,
96    panic::{self, AssertUnwindSafe},
97    sync::{
98        atomic::{AtomicUsize, Ordering},
99        mpsc::RecvTimeoutError,
100        Arc,
101    },
102    time::Duration,
103};
104use tracing::{debug, debug_span, error, info, instrument, trace, warn, Level, Span};
105
106pub use crate::tree::types::ValidationOutcome;
107
108/// Handle to a [`HashedPostState`] computed on a background thread.
109type LazyHashedPostState = reth_tasks::LazyHandle<Arc<HashedPostState>>;
110
111/// Multiplier over the parent's gas limit beyond which a block's claimed gas usage cannot be
112/// legitimate. Gas limit can change by at most 1/1024 per block, so anything over this is rejected
113/// without entering execution.
114const MAX_EXPECTED_GAS_USAGE_MULTIPLIER: u64 = 2;
115
116/// Worker name for deferred trie data and changeset provider preparation.
117const DEFERRED_TRIE_WORKER_NAME: &str = "deferred-trie";
118
119type ReceiptRootSender<N> =
120    crossbeam_channel::Sender<IndexedReceipt<<N as NodePrimitives>::Receipt>>;
121type ReceiptRootReceiver = tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>;
122
123/// Context providing access to tree state during validation.
124///
125/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
126/// internals
127pub struct TreeCtx<'a, N: NodePrimitives> {
128    /// The engine API tree state
129    state: &'a mut EngineApiTreeState<N>,
130    /// Reference to the canonical in-memory state
131    canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
132}
133
134impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        f.debug_struct("TreeCtx")
137            .field("state", &"EngineApiTreeState")
138            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
139            .finish()
140    }
141}
142
143impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
144    /// Creates a new tree context
145    pub const fn new(
146        state: &'a mut EngineApiTreeState<N>,
147        canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
148    ) -> Self {
149        Self { state, canonical_in_memory_state }
150    }
151}
152
153impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
154    /// Returns a reference to the engine tree state
155    pub const fn state(&self) -> &EngineApiTreeState<N> {
156        &*self.state
157    }
158
159    /// Returns a mutable reference to the engine tree state
160    pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
161        self.state
162    }
163
164    /// Returns a reference to the canonical in-memory state
165    pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
166        self.canonical_in_memory_state
167    }
168}
169
170/// A helper type that provides reusable payload validation logic for network-specific validators.
171///
172/// This type satisfies [`EngineValidator`] and is responsible for executing blocks/payloads.
173///
174/// This type contains common validation, execution, and state root computation logic that can be
175/// used by network-specific payload validators (e.g., Ethereum, Optimism). It is not meant to be
176/// used as a standalone component, but rather as a building block for concrete implementations.
177#[derive(derive_more::Debug)]
178pub struct BasicEngineValidator<P, Evm, V>
179where
180    Evm: ConfigureEvm,
181{
182    /// Provider for database access.
183    provider: P,
184    /// Consensus implementation for validation.
185    consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
186    /// EVM configuration.
187    evm_config: Evm,
188    /// Configuration for the tree.
189    config: TreeConfig,
190    /// Payload processor for state root computation.
191    payload_processor: PayloadProcessor<Evm>,
192    /// Precompile cache map.
193    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
194    /// Precompile cache metrics.
195    precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
196    /// Hook to call when invalid blocks are encountered.
197    #[debug(skip)]
198    invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
199    /// Metrics for the engine api.
200    metrics: EngineApiMetrics,
201    /// Validator for the payload.
202    validator: V,
203    /// Changeset cache for in-memory trie changesets
204    changeset_cache: ChangesetCache,
205    /// Task runtime for spawning parallel work.
206    runtime: reth_tasks::Runtime,
207    /// Custom state root computation function.
208    custom_state_root: Option<CustomStateRoot<Evm::Primitives>>,
209}
210
211impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
212where
213    N: NodePrimitives,
214    P: DatabaseProviderFactory<
215            Provider: BlockReader
216                          + StageCheckpointReader
217                          + PruneCheckpointReader
218                          + ChangeSetReader
219                          + StorageChangeSetReader
220                          + BlockNumReader
221                          + StorageSettingsCache,
222        > + BlockReader<Header = N::BlockHeader>
223        + ChangeSetReader
224        + BlockNumReader
225        + StateProviderFactory
226        + StateReader
227        + HashedPostStateProvider
228        + Clone
229        + 'static,
230    Evm: ConfigureEvm<Primitives = N> + 'static,
231{
232    /// Creates a new `TreePayloadValidator`.
233    #[expect(clippy::too_many_arguments)]
234    pub fn new(
235        provider: P,
236        consensus: Arc<dyn FullConsensus<N>>,
237        evm_config: Evm,
238        validator: V,
239        config: TreeConfig,
240        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
241        changeset_cache: ChangesetCache,
242        runtime: reth_tasks::Runtime,
243    ) -> Self {
244        let precompile_cache_map = PrecompileCacheMap::default();
245        let payload_processor = PayloadProcessor::new(
246            runtime.clone(),
247            evm_config.clone(),
248            &config,
249            precompile_cache_map.clone(),
250        );
251        Self {
252            provider,
253            consensus,
254            evm_config,
255            payload_processor,
256            precompile_cache_map,
257            precompile_cache_metrics: HashMap::new(),
258            config,
259            invalid_block_hook,
260            metrics: EngineApiMetrics::default(),
261            validator,
262            changeset_cache,
263            runtime,
264            custom_state_root: None,
265        }
266    }
267
268    /// Sets a custom state root computation handler.
269    pub fn with_custom_state_root(mut self, custom_state_root: CustomStateRoot<N>) -> Self {
270        self.custom_state_root = Some(custom_state_root);
271        self
272    }
273
274    /// Converts a [`BlockOrPayload`] to a recovered block.
275    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
276    pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
277        &self,
278        input: BlockOrPayload<T>,
279    ) -> Result<SealedBlock<N::Block>, NewPayloadError>
280    where
281        V: PayloadValidator<T, Block = N::Block>,
282    {
283        match input {
284            BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
285            BlockOrPayload::Block(block) => Ok(block),
286        }
287    }
288
289    /// Returns EVM environment for the given payload or block.
290    pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
291        &self,
292        input: &BlockOrPayload<T>,
293    ) -> Result<EvmEnvFor<Evm>, Evm::Error>
294    where
295        V: PayloadValidator<T, Block = N::Block>,
296        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
297    {
298        match input {
299            BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
300            BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
301        }
302    }
303
304    /// Returns [`ExecutableTxIterator`] for the given payload or block.
305    pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
306        &'a self,
307        input: &'a BlockOrPayload<T>,
308    ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
309    where
310        V: PayloadValidator<T, Block = N::Block>,
311        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
312    {
313        Ok(match input {
314            BlockOrPayload::Payload(payload) => {
315                let iter = self
316                    .evm_config
317                    .tx_iterator_for_payload(payload)
318                    .map_err(NewPayloadError::other)?;
319                Either::Left(iter)
320            }
321            BlockOrPayload::Block(block) => {
322                let txs = block.body().clone_transactions();
323                let convert = |tx: N::SignedTx| tx.try_into_recovered();
324                Either::Right((txs, convert))
325            }
326        })
327    }
328
329    /// Returns a [`ExecutionCtxFor`] for the given payload or block.
330    pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
331        &self,
332        input: &'a BlockOrPayload<T>,
333    ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
334    where
335        V: PayloadValidator<T, Block = N::Block>,
336        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
337    {
338        match input {
339            BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
340            BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
341        }
342    }
343
344    /// Validates a block that has already been converted from a payload.
345    ///
346    /// This method performs:
347    /// - Consensus validation
348    /// - Block execution
349    /// - State root computation
350    /// - Fork detection
351    #[instrument(
352        level = "debug",
353        target = "engine::tree::payload_validator",
354        skip_all,
355        fields(
356            parent = ?input.parent_hash(),
357            type_name = ?input.type_name(),
358        )
359    )]
360    pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
361        &mut self,
362        input: BlockOrPayload<T>,
363        mut ctx: TreeCtx<'_, N>,
364    ) -> InsertPayloadResult<N>
365    where
366        V: PayloadValidator<T, Block = N::Block> + Clone,
367        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
368    {
369        let parent_hash = input.parent_hash();
370
371        // Fetch parent block. This goes to memory most of the time unless the parent block is
372        // beyond the in-memory buffer.
373        let parent_block = match self.sealed_header_by_hash(parent_hash, ctx.state()) {
374            Ok(Some(parent_block)) => parent_block,
375            Ok(None) => {
376                return Err(InsertBlockError::new(
377                    self.convert_to_block(input)?,
378                    ProviderError::HeaderNotFound(parent_hash.into()).into(),
379                )
380                .into())
381            }
382            Err(e) => {
383                return Err(InsertBlockError::new(self.convert_to_block(input)?, e.into()).into())
384            }
385        };
386
387        // Spawn payload conversion and basic validation on a background thread so it runs
388        // concurrently with the rest of the function (setup + execution). For payloads this
389        // overlaps the cost of RLP decoding + header hashing.
390        let validated_block = self.spawn_convert_and_validate(&input, parent_block.clone());
391
392        /// A helper macro that returns the block in case there was an error
393        /// This macro is used for early returns before block conversion
394        macro_rules! ensure_ok {
395            ($expr:expr) => {
396                match $expr {
397                    Ok(val) => val,
398                    Err(e) => {
399                        let block = validated_block.try_into_inner().expect("sole handle")?;
400                        return Err(InsertBlockError::new(block, e.into()).into())
401                    }
402                }
403            };
404        }
405
406        /// A helper macro for handling errors after the input has been converted to a block
407        macro_rules! ensure_ok_post_block {
408            ($expr:expr, $block:expr) => {
409                match $expr {
410                    Ok(val) => val,
411                    Err(e) => {
412                        return Err(
413                            InsertBlockError::new($block.into_sealed_block(), e.into()).into()
414                        )
415                    }
416                }
417            };
418        }
419
420        // If the gas usage is suspiciously high (multiple times higher than parent's gas limit), be
421        // cautious and block on pre-execution checks of the block.
422        if input.gas_used() > parent_block.gas_limit() * MAX_EXPECTED_GAS_USAGE_MULTIPLIER {
423            // Call `.get()` to await the pre-execution checks and exit early if they fail.
424            if validated_block.get().is_err() {
425                return Err(validated_block
426                    .try_into_inner()
427                    .expect("sole handle")
428                    .expect_err("Err result checked"))
429            }
430        }
431
432        trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
433        let _enter =
434            debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
435        let Some(provider_builder) =
436            ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
437        else {
438            // this is pre-validated in the tree
439            return Err(InsertBlockError::new(
440                validated_block.try_into_inner().expect("sole handle")?,
441                ProviderError::HeaderNotFound(parent_hash.into()).into(),
442            )
443            .into())
444        };
445        drop(_enter);
446
447        let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
448            .in_scope(|| self.evm_env_for(&input))
449            .map_err(NewPayloadError::other)?;
450
451        // Extract the decoded BAL, if valid and available.
452        let decoded_bal = ensure_ok!(input
453            .try_decoded_access_list()
454            .map_err(|err| ConsensusError::BlockAccessListInvalid(err.to_string())))
455        .map(Arc::new);
456
457        if let Some(decoded_bal) = decoded_bal.as_deref() {
458            ensure_ok!(Self::validate_received_bal_gas(decoded_bal, input.gas_limit()));
459        }
460
461        let env = ExecutionEnv {
462            evm_env,
463            hash: input.hash(),
464            parent_hash: input.parent_hash(),
465            parent_state_root: parent_block.state_root(),
466            transaction_count: input.transaction_count(),
467            gas_used: input.gas_used(),
468            withdrawals: input.withdrawals().map(|w| w.to_vec()),
469            decoded_bal,
470        };
471
472        // Plan the strategy used for state root computation.
473        let strategy = self.plan_state_root_computation();
474
475        debug!(
476            target: "engine::tree::payload_validator",
477            ?strategy,
478            "Decided which state root algorithm to run"
479        );
480
481        // Get an iterator over the transactions in the payload
482        let txs = self.tx_iterator_for(&input)?;
483
484        // Create overlay factory for payload processor (StateRootTask path needs it for
485        // multiproofs)
486        let provider_factory = self.provider.clone();
487        let overlay_builder = Self::overlay_builder_for_parent(
488            parent_hash,
489            ctx.state(),
490            self.changeset_cache.clone(),
491        );
492        let overlay_factory =
493            OverlayStateProviderFactory::new(provider_factory.clone(), overlay_builder.clone());
494        let changeset_provider = self.spawn_changeset_provider_task(overlay_factory.clone());
495
496        let parallel_bal_execution = ensure_ok!(self.bal_path_eligible(env.decoded_bal.as_deref()));
497
498        // Spawn the appropriate processor based on strategy
499        let mut handle = ensure_ok!(self.spawn_payload_processor(
500            env.clone(),
501            txs,
502            provider_builder.clone(),
503            overlay_factory,
504            &strategy,
505            parallel_bal_execution,
506        ));
507
508        // Create optional cache stats for detailed block logging
509        let slow_block_enabled = self.config.slow_block_threshold().is_some();
510        let cache_stats = slow_block_enabled.then(|| Arc::new(CacheStats::default()));
511        let instrument_state_provider = slow_block_enabled || self.config.state_provider_metrics();
512        let state_provider_metrics =
513            instrument_state_provider.then(|| StateProviderMetrics::with_source("engine"));
514        let state_provider_stats =
515            instrument_state_provider.then(|| Arc::new(StateProviderStats::default()));
516        let execution_cache = handle.caches().map(|caches| (caches, handle.cache_metrics()));
517
518        // This state provider factory is parametrized by:
519        //
520        // 1. fill_on_miss?
521        // 2. instrument_state_provider?
522        //
523        // `fill_on_miss` controls whether the loaded value after a cache miss will be inserted
524        // back into the cache. On a glance it seems to be always useful to do this. However,
525        // in practice, for the serial/non-BAL execution, it's not needed and is net negative:
526        //
527        // - It's not necessary because the revm machinery provides layer of caching itself. That
528        //   means a value for a miss will be recorded in revm's cache.
529        // - Inserting back into the cache is not free.
530        // - After execution, the execution post-state will be dumped into the execution cache as
531        //   whole anyway.
532        //
533        // Therefore, there `fill_on_miss` is going to be false for those paths.
534        //
535        // The second parameter `instrument_state_provider` controls whether we should
536        // instrument the state provider with metrics.
537        let make_state_provider = |fill_on_miss: bool| -> ProviderResult<StateProviderBox> {
538            let provider = provider_builder.build()?;
539            let mut provider = if let Some((caches, cache_metrics)) = &execution_cache {
540                let fill_mode = if fill_on_miss {
541                    CacheFillMode::FillOnMiss
542                } else {
543                    CacheFillMode::LookupOnly
544                };
545                Box::new(CachedStateProvider::new_with_mode(
546                    provider,
547                    caches.clone(),
548                    fill_mode,
549                    cache_metrics.clone(),
550                    cache_stats.clone(),
551                )) as StateProviderBox
552            } else {
553                provider
554            };
555
556            if instrument_state_provider {
557                let stats = state_provider_stats
558                    .as_ref()
559                    .expect("instrumented state provider requires shared stats");
560                let metrics = state_provider_metrics
561                    .as_ref()
562                    .expect("instrumented state provider requires metrics");
563                provider = Box::new(InstrumentedStateProvider::with_stats(
564                    provider,
565                    metrics.clone(),
566                    Arc::clone(stats),
567                ));
568            }
569
570            Ok(provider)
571        };
572
573        // Execute the block and handle any execution errors.
574        // The receipt root task is spawned before execution and receives receipts incrementally
575        // as transactions complete, allowing parallel computation during execution.
576        let execute_block_start = Instant::now();
577        let execution_result = if parallel_bal_execution {
578            self.execute_block_bal(env, &input, &handle, &make_state_provider)
579        } else {
580            let state_provider = make_state_provider(false);
581            match state_provider {
582                Ok(state_provider) => self.execute_block(state_provider, env, &input, &mut handle),
583                Err(err) => Err(err.into()),
584            }
585        };
586        let execution_duration = execute_block_start.elapsed();
587        if let (Some(metrics), Some(stats)) = (&state_provider_metrics, &state_provider_stats) {
588            metrics.record_totals(stats);
589        }
590        let (output, senders, receipt_root_rx, built_bal) = ensure_ok!(execution_result);
591
592        // After executing the block we can stop prewarming transactions
593        handle.stop_prewarming_execution();
594
595        // Create ExecutionOutcome early so we can terminate caching before validation and state
596        // root computation. Using Arc allows sharing with both the caching task and the deferred
597        // trie task without cloning the expensive BundleState.
598        let output = Arc::new(output);
599
600        // Terminate caching task early since execution is complete and caching is no longer
601        // needed. This frees up resources while state root computation continues.
602        let valid_block_tx = handle.terminate_caching(Some(output.clone()));
603
604        // Spawn hashed post state computation in background so it runs concurrently with
605        // block conversion and receipt root computation. This is a pure CPU-bound task
606        // (keccak256 hashing of all changed addresses and storage slots).
607        let hashed_state_output = output.clone();
608        let hashed_state_provider = self.provider.clone();
609        let mut hashed_state_rx = handle.take_hashed_state_rx();
610        let mut hashed_state: LazyHashedPostState =
611            self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
612                let _span = debug_span!(
613                    target: "engine::tree::payload_validator",
614                    "hashed_post_state",
615                )
616                .entered();
617                let state = if let Some(Ok(state)) = hashed_state_rx.as_mut().map(|rx| rx.recv()) {
618                    state
619                } else {
620                    hashed_state_provider.hashed_post_state(&hashed_state_output.state)
621                };
622                Arc::new(state)
623            });
624
625        let block = validated_block.try_into_inner().expect("sole handle")?;
626        let block = block.with_senders(senders);
627
628        // Wait for the receipt root computation to complete.
629        let receipt_root_bloom = {
630            let _enter = debug_span!(
631                target: "engine::tree::payload_validator",
632                "wait_receipt_root",
633            )
634            .entered();
635
636            receipt_root_rx
637                .blocking_recv()
638                .inspect_err(|_| {
639                    tracing::error!(
640                        target: "engine::tree::payload_validator",
641                        "Receipt root task dropped sender without result, receipt root calculation likely aborted"
642                    );
643                })
644                .ok()
645        };
646
647        ensure_ok_post_block!(
648            self.validate_post_execution(
649                &block,
650                &parent_block,
651                &output,
652                &mut ctx,
653                receipt_root_bloom,
654                built_bal
655            ),
656            block
657        );
658
659        // Run the hashed state validation hook but don't propagate the error yet. If the state root
660        // task fails, we might need to re-run this check against a fallback state.
661        let mut hashed_state_validate_result = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").in_scope(|| {
662            // Wait for the background keccak256 hashing task to complete. This blocks until
663            // all changed addresses and storage slots have been hashed.
664            let hashed_state_ref =
665                debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
666                    .in_scope(|| hashed_state.get());
667
668            self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, &block)
669        });
670
671        let root_time = Instant::now();
672        let mut maybe_state_root = None;
673        let mut state_root_task_failed = false;
674        #[cfg(feature = "trie-debug")]
675        let mut trie_debug_recorders = Vec::new();
676
677        match strategy {
678            StateRootStrategy::StateRootTask => {
679                debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
680
681                let task_result = ensure_ok_post_block!(
682                    self.await_state_root_with_timeout(
683                        &mut handle,
684                        provider_builder.clone(),
685                        output.clone(),
686                    ),
687                    block
688                );
689
690                let maybe_new_hashed_state = match task_result {
691                    Ok((
692                        StateRootComputeOutcome {
693                            state_root,
694                            trie_updates,
695                            #[cfg(feature = "trie-debug")]
696                            debug_recorders,
697                        },
698                        maybe_new_hashed_state,
699                    )) => {
700                        let elapsed = root_time.elapsed();
701                        info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
702
703                        #[cfg(feature = "trie-debug")]
704                        {
705                            trie_debug_recorders = debug_recorders;
706                        }
707
708                        // Compare trie updates with serial computation if configured
709                        if self.config.always_compare_trie_updates() {
710                            let _has_diff = self.compare_trie_updates_with_serial(
711                                provider_builder.clone(),
712                                provider_factory,
713                                overlay_builder,
714                                &output,
715                                trie_updates.as_ref().clone(),
716                            );
717                            #[cfg(feature = "trie-debug")]
718                            if _has_diff {
719                                Self::write_trie_debug_recorders(
720                                    block.header().number(),
721                                    &trie_debug_recorders,
722                                );
723                            }
724                        }
725
726                        // we double check the state root here for good measure
727                        if state_root == block.header().state_root() {
728                            maybe_state_root = Some((state_root, trie_updates, elapsed))
729                        } else {
730                            warn!(
731                                target: "engine::tree::payload_validator",
732                                ?state_root,
733                                block_state_root = ?block.header().state_root(),
734                                "State root task returned incorrect state root"
735                            );
736                            #[cfg(feature = "trie-debug")]
737                            Self::write_trie_debug_recorders(
738                                block.header().number(),
739                                &trie_debug_recorders,
740                            );
741                            state_root_task_failed = true;
742                        }
743
744                        maybe_new_hashed_state
745                    }
746                    Err(error) => {
747                        debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
748                        state_root_task_failed = true;
749                        None
750                    }
751                };
752
753                // If the state root task failed or we got a new hashed state from the fallback that
754                // won the race, we need to replace the hashed state handle and re-run the
755                // validation.
756                if maybe_new_hashed_state.is_some() || state_root_task_failed {
757                    hashed_state = maybe_new_hashed_state.unwrap_or_else(|| {
758                        LazyHandle::ready(Arc::new(self.provider.hashed_post_state(&output.state)))
759                    });
760                    hashed_state_validate_result =
761                        self.validator.validate_block_post_execution_with_hashed_state(
762                            hashed_state.get(),
763                            &block,
764                        );
765                }
766            }
767            StateRootStrategy::Parallel => {
768                debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
769                match self.compute_state_root_parallel(
770                    provider_factory,
771                    overlay_builder,
772                    &hashed_state,
773                ) {
774                    Ok(result) => {
775                        let elapsed = root_time.elapsed();
776                        info!(
777                            target: "engine::tree::payload_validator",
778                            regular_state_root = ?result.0,
779                            ?elapsed,
780                            "Regular root task finished"
781                        );
782                        maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
783                    }
784                    Err(error) => {
785                        debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
786                    }
787                }
788            }
789            StateRootStrategy::Synchronous => {}
790            StateRootStrategy::Custom(custom) => {
791                let (state_root, trie_updates) = ensure_ok_post_block!(
792                    custom(CustomStateRootInput {
793                        block: &block,
794                        parent_block: &parent_block,
795                        output: &output,
796                        hashed_state: &hashed_state,
797                    }),
798                    block
799                );
800                maybe_state_root = Some((state_root, Arc::new(trie_updates), root_time.elapsed()));
801            }
802        }
803
804        // Determine the state root.
805        // If the state root was computed in parallel, we use it.
806        // Otherwise, we fall back to computing it synchronously.
807        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
808            maybe_state_root
809        {
810            maybe_state_root
811        } else {
812            // fallback is to compute the state root regularly in sync
813            if self.config.state_root_fallback() {
814                debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
815            } else {
816                warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
817                self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
818            }
819
820            let (root, updates) = ensure_ok_post_block!(
821                provider_builder
822                    .build()
823                    .and_then(|provider| Self::compute_state_root_serial(provider, &hashed_state)),
824                block
825            );
826
827            if state_root_task_failed {
828                self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
829            }
830
831            (root, Arc::new(updates), root_time.elapsed())
832        };
833
834        if let Err(err) = hashed_state_validate_result {
835            // call post-block hook
836            self.on_invalid_block(&parent_block, &block, &output, None, ctx.state_mut());
837            return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into())
838        }
839
840        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
841        self.metrics
842            .record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
843        debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
844
845        // ensure state root matches
846        if state_root != block.header().state_root() {
847            #[cfg(feature = "trie-debug")]
848            Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
849
850            // call post-block hook
851            self.on_invalid_block(
852                &parent_block,
853                &block,
854                &output,
855                Some((&trie_output, state_root)),
856                ctx.state_mut(),
857            );
858            let block_state_root = block.header().state_root();
859            return Err(InsertBlockError::new(
860                block.into_sealed_block(),
861                ConsensusError::BodyStateRootDiff(
862                    GotExpected { got: state_root, expected: block_state_root }.into(),
863                )
864                .into(),
865            )
866            .into())
867        }
868
869        let timing_stats = state_provider_stats.filter(|_| slow_block_enabled).map(|stats| {
870            self.calculate_timing_stats(
871                &block,
872                stats,
873                cache_stats,
874                &output,
875                execution_duration,
876                root_elapsed,
877            )
878        });
879
880        if let Some(valid_block_tx) = valid_block_tx {
881            let _ = valid_block_tx.send(());
882        }
883
884        let changeset_provider = ensure_ok_post_block!(
885            changeset_provider
886                .try_into_inner()
887                .ok()
888                .expect("changeset provider handle is not cloned"),
889            block
890        );
891
892        let executed_block = self.spawn_deferred_trie_task(
893            Arc::new(block),
894            output,
895            hashed_state,
896            trie_output,
897            changeset_provider,
898        );
899        Ok(ValidationOutput::new(executed_block, timing_stats))
900    }
901
902    /// Spawns a background task to convert a [`BlockOrPayload`] into a [`SealedBlock`] and perform
903    /// basic consensus validations on it.
904    #[expect(clippy::type_complexity)]
905    pub fn spawn_convert_and_validate<T>(
906        &self,
907        input: &BlockOrPayload<T>,
908        parent: SealedHeader<N::BlockHeader>,
909    ) -> LazyHandle<Result<SealedBlock<N::Block>, InsertPayloadError<N::Block>>>
910    where
911        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
912        V: PayloadValidator<T, Block = N::Block> + Clone,
913    {
914        let input = input.clone();
915        let validator = self.validator.clone();
916        let consensus = self.consensus.clone();
917        let parent_span = Span::current();
918        self.payload_processor.executor().spawn_blocking_named("payload-convert", move || {
919            let _span = debug_span!(
920                target: "engine::tree::payload_validator",
921                parent: parent_span,
922                "convert_and_validate",
923            )
924            .entered();
925            let block = match input {
926                BlockOrPayload::Block(block) => block,
927                BlockOrPayload::Payload(payload) => {
928                    validator.convert_payload_to_block(payload)?
929                }
930            };
931
932            if let Err(e) = consensus.validate_header(block.sealed_header()) {
933                error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
934                return Err(InsertBlockError::consensus_error(e, block).into())
935            }
936
937            // now validate against the parent
938            let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
939            if let Err(e) = consensus.validate_header_against_parent(block.sealed_header(), &parent)
940            {
941                warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
942                return Err(InsertBlockError::consensus_error(e, block).into())
943            }
944            drop(_enter);
945
946            if let Err(e) =
947                consensus.validate_block_pre_execution_with_tx_root(&block, None)
948            {
949                error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
950                return Err(InsertBlockError::consensus_error(e, block).into())
951            }
952
953            Ok(block)
954        })
955    }
956
957    /// Spawns a background task that creates the changeset provider used by the deferred trie task.
958    ///
959    /// This is started before execution so overlay construction can run concurrently with payload
960    /// validation, then awaited before the deferred trie task is spawned.
961    fn spawn_changeset_provider_task(
962        &self,
963        overlay_factory: OverlayStateProviderFactory<P, N>,
964    ) -> LazyHandle<ProviderResult<OverlayStateProvider<P::Provider>>> {
965        let parent_span = Span::current();
966        self.payload_processor.executor().spawn_blocking_named(
967            DEFERRED_TRIE_WORKER_NAME,
968            move || {
969                let _span = debug_span!(
970                    target: "engine::tree::payload_validator",
971                    parent: parent_span,
972                    "changeset_provider",
973                )
974                .entered();
975                overlay_factory.database_provider_ro()
976            },
977        )
978    }
979
980    /// Return sealed block header from database or in-memory state by hash.
981    fn sealed_header_by_hash(
982        &self,
983        hash: B256,
984        state: &EngineApiTreeState<N>,
985    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
986        // check memory first
987        let header = state.tree_state.sealed_header_by_hash(&hash);
988
989        if header.is_some() {
990            Ok(header)
991        } else {
992            self.provider.sealed_header_by_hash(hash)
993        }
994    }
995
996    fn validate_received_bal_gas(
997        decoded_bal: &DecodedBal,
998        gas_limit: u64,
999    ) -> Result<(), ConsensusError> {
1000        decoded_bal.as_bal().validate_gas_limit(gas_limit).map_err(ConsensusError::from)
1001    }
1002
1003    /// Executes a block with the given state provider.
1004    ///
1005    /// This method orchestrates block execution:
1006    /// 1. Sets up the EVM with state database and precompile caching
1007    /// 2. Spawns a background task for incremental receipt root computation
1008    /// 3. Executes transactions with metrics collection via state hooks
1009    /// 4. Merges state transitions and records execution metrics
1010    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1011    #[expect(clippy::type_complexity)]
1012    fn execute_block<S, Err, T>(
1013        &mut self,
1014        state_provider: S,
1015        env: ExecutionEnv<Evm>,
1016        input: &BlockOrPayload<T>,
1017        handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
1018    ) -> Result<
1019        (
1020            BlockExecutionOutput<N::Receipt>,
1021            Vec<Address>,
1022            ReceiptRootReceiver,
1023            Option<BlockAccessList>,
1024        ),
1025        InsertBlockErrorKind,
1026    >
1027    where
1028        S: StateProvider + Send,
1029        Err: core::error::Error + Send + Sync + 'static,
1030        V: PayloadValidator<T, Block = N::Block>,
1031        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1032        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
1033    {
1034        debug!(target: "engine::tree::payload_validator", "Executing block");
1035
1036        let has_bal = env.decoded_bal.is_some();
1037        let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
1038            State::builder()
1039                .with_database(StateProviderDatabase::new(state_provider))
1040                .with_bundle_update()
1041                .with_bal_builder_if(has_bal)
1042                .build()
1043        });
1044
1045        let (spec_id, mut executor) = {
1046            let _span = debug_span!(target: "engine::tree", "create_evm").entered();
1047            let spec_id = *env.evm_env.spec_id();
1048            let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
1049            let ctx = self
1050                .execution_ctx_for(input)
1051                .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
1052            let executor = self.evm_config.create_executor(evm, ctx);
1053            (spec_id, executor)
1054        };
1055
1056        if !self.config.precompile_cache_disabled() {
1057            let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
1058            executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
1059                |address, precompile| {
1060                    let metrics = self
1061                        .precompile_cache_metrics
1062                        .entry(*address)
1063                        .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
1064                        .clone();
1065                    CachedPrecompile::wrap(
1066                        precompile,
1067                        self.precompile_cache_map.cache_for_address(*address),
1068                        spec_id,
1069                        Some(metrics),
1070                    )
1071                },
1072            );
1073        }
1074
1075        let transaction_count = input.transaction_count();
1076        let (receipt_tx, result_rx) = self.spawn_receipt_root_task(transaction_count);
1077        let executed_tx_index = Arc::clone(handle.executed_tx_index());
1078        executor.evm_mut().db_mut().set_state_hook(
1079            handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook + 'static>),
1080        );
1081
1082        let execution_start = Instant::now();
1083
1084        // Execute all transactions and finalize
1085        let (executor, senders) = self.execute_transactions(
1086            executor,
1087            transaction_count,
1088            handle.iter_transactions(),
1089            &receipt_tx,
1090            &executed_tx_index,
1091            has_bal,
1092        )?;
1093        drop(receipt_tx);
1094
1095        // Finish execution and get the result
1096        let post_exec_start = Instant::now();
1097        let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
1098            .in_scope(|| executor.finish())
1099            .map(|(evm, result)| (evm.into_db(), result))?;
1100        self.metrics.record_post_execution(post_exec_start.elapsed());
1101
1102        // Merge transitions into bundle state
1103        debug_span!(target: "engine::tree", "merge_transitions")
1104            .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
1105
1106        let built_bal = if has_bal { db.take_built_alloy_bal() } else { None };
1107        let output = BlockExecutionOutput { result, state: db.take_bundle() };
1108
1109        let execution_duration = execution_start.elapsed();
1110        self.metrics.record_block_execution(&output, execution_duration);
1111        self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1112        debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
1113
1114        Ok((output, senders, result_rx, built_bal))
1115    }
1116
1117    /// Returns true when the BAL execute path should be used for this block.
1118    // TODO: extend with stronger gating before enabling on mainnet:
1119    //   - Fork check: `Amsterdam.active_at_timestamp(env.evm_env.timestamp)`. Today a BAL only
1120    //     exists post-Amsterdam, so the BAL-presence check is a sufficient proxy. It is a proxy,
1121    //     not a guarantee.
1122    //   - Tx-count threshold (`bal_execute_path_min_tx_count`): below the parallelism break-even
1123    //     point, provider setup and worker scheduling overhead can exceed the gain. Tune
1124    //     empirically once workers are parallel; meaningless while the commit loop is sequential.
1125    fn bal_path_eligible(&self, bal: Option<&DecodedBal>) -> Result<bool, InsertBlockErrorKind> {
1126        let has_bal = bal.is_some();
1127        let parallel_execution = has_bal && !self.config.disable_bal_parallel_execution();
1128        if parallel_execution && self.config.disable_bal_parallel_state_root() {
1129            return Err(InsertBlockErrorKind::Other(
1130                "disabling parallel state root is impossible when parallel execution is enabled"
1131                    .into(),
1132            ));
1133        }
1134
1135        Ok(parallel_execution)
1136    }
1137
1138    /// Executes the block on the BAL path. Mirrors the return shape of [`Self::execute_block`]
1139    /// so the dispatch site stays uniform.
1140    ///
1141    /// Inside, this:
1142    /// 1. Creates a shared parent-state cache handle for provider-backed workers.
1143    /// 2. Relies on BAL prewarm to stream sparse-trie updates and optional state prefetches.
1144    /// 3. Spawns the receipt-root task.
1145    /// 4. Calls [`crate::tree::payload_processor::bal::execute_block`].
1146    /// 5. Returns the rebuilt BAL for post-execution consensus validation.
1147    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1148    #[expect(clippy::type_complexity)]
1149    fn execute_block_bal<Tx, Err, MakeStateProvider, T>(
1150        &self,
1151        env: ExecutionEnv<Evm>,
1152        input: &BlockOrPayload<T>,
1153        handle: &PayloadHandle<Tx, Err, N::Receipt>,
1154        make_state_provider: &MakeStateProvider,
1155    ) -> Result<
1156        (
1157            BlockExecutionOutput<N::Receipt>,
1158            Vec<Address>,
1159            ReceiptRootReceiver,
1160            Option<BlockAccessList>,
1161        ),
1162        InsertBlockErrorKind,
1163    >
1164    where
1165        Tx: ExecutableTxFor<Evm> + Send,
1166        Err: core::error::Error + Send + Sync + 'static,
1167        MakeStateProvider: Fn(bool) -> ProviderResult<StateProviderBox> + Sync,
1168        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
1169        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1170        V: PayloadValidator<T, Block = N::Block>,
1171    {
1172        debug!(target: "engine::tree::payload_validator", "Executing block via BAL path");
1173
1174        let (receipt_tx, result_rx) = self.spawn_receipt_root_task(env.transaction_count);
1175        let input_bal = env.decoded_bal.ok_or_else(|| {
1176            InsertBlockErrorKind::Other("BAL execute path: no decoded BAL available".into())
1177        })?;
1178
1179        let make_db = |fill_on_miss| {
1180            let provider = make_state_provider(fill_on_miss)
1181                .map_err(crate::tree::payload_processor::bal::BalExecutionError::Provider)?;
1182            Ok(StateProviderDatabase::new(provider))
1183        };
1184        let execution_start = Instant::now();
1185        let ctx =
1186            self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
1187        let (output, senders, built_bal) = crate::tree::payload_processor::bal::execute_block(
1188            &self.runtime,
1189            &self.evm_config,
1190            &make_db,
1191            input_bal,
1192            env.evm_env,
1193            ctx,
1194            env.transaction_count,
1195            handle.clone_transaction_receiver(),
1196            receipt_tx,
1197        )?;
1198        let execution_duration = execution_start.elapsed();
1199
1200        self.metrics.record_block_execution(&output, execution_duration);
1201        self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1202        debug!(
1203            target: "engine::tree::payload_validator",
1204            elapsed = ?execution_duration,
1205            "Executed block via BAL path",
1206        );
1207
1208        Ok((output, senders, result_rx, Some(built_bal)))
1209    }
1210
1211    fn spawn_receipt_root_task(
1212        &self,
1213        receipts_len: usize,
1214    ) -> (ReceiptRootSender<N>, ReceiptRootReceiver) {
1215        // Unbounded channel is used since tx count bounds capacity anyway.
1216        let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
1217        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
1218        let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
1219        self.payload_processor
1220            .executor()
1221            .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
1222
1223        (receipt_tx, result_rx)
1224    }
1225
1226    /// Executes transactions and collects senders, streaming receipts to a background task.
1227    ///
1228    /// This method handles:
1229    /// - Applying pre-execution changes (e.g., beacon root updates)
1230    /// - Executing each transaction with timing metrics
1231    /// - Streaming receipts to the receipt root computation task
1232    /// - Collecting transaction senders for later use
1233    ///
1234    /// Returns the executor (for finalization) and the collected senders.
1235    fn execute_transactions<'a, E, Tx, InnerTx, Err, DB>(
1236        &self,
1237        mut executor: E,
1238        transaction_count: usize,
1239        transactions: impl Iterator<Item = Result<Tx, Err>>,
1240        receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
1241        executed_tx_index: &AtomicUsize,
1242        has_bal: bool,
1243    ) -> Result<(E, Vec<Address>), BlockExecutionError>
1244    where
1245        E: BlockExecutor<Receipt = N::Receipt, Evm: alloy_evm::Evm<DB = &'a mut State<DB>>>,
1246        Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
1247        InnerTx: TxHashRef,
1248        DB: revm::Database + 'a,
1249        Err: core::error::Error + Send + Sync + 'static,
1250    {
1251        let mut senders = Vec::with_capacity(transaction_count);
1252
1253        // Apply pre-execution changes (e.g., beacon root update)
1254        let pre_exec_start = Instant::now();
1255        debug_span!(target: "engine::tree", "pre_execution")
1256            .in_scope(|| executor.apply_pre_execution_changes())?;
1257        self.metrics.record_pre_execution(pre_exec_start.elapsed());
1258
1259        // Bump BAL index after pre-execution changes (EIP-7928: index 0 is pre-execution)
1260        if has_bal {
1261            executor.evm_mut().db_mut().bump_bal_index();
1262        }
1263
1264        // Execute transactions
1265        let exec_span = debug_span!(target: "engine::tree", "execution").entered();
1266        let mut transactions = transactions.into_iter();
1267        // Some executors may execute transactions that do not append receipts during the
1268        // main loop (e.g., system transactions whose receipts are added during finalization).
1269        // In that case, invoking the callback on every transaction would resend the previous
1270        // receipt with the same index and can panic the ordered root builder.
1271        let mut last_sent_len = 0usize;
1272        loop {
1273            // Measure time spent waiting for next transaction from iterator
1274            // (e.g., parallel signature recovery)
1275            let wait_start = Instant::now();
1276            let Some(tx_result) = transactions.next() else { break };
1277            self.metrics.record_transaction_wait(wait_start.elapsed());
1278
1279            let tx = tx_result.map_err(BlockExecutionError::other)?;
1280            let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
1281
1282            senders.push(tx_signer);
1283
1284            let _enter = tracing::enabled!(target: "engine::tree", Level::DEBUG).then(|| {
1285                debug_span!(
1286                    target: "engine::tree",
1287                    "execute tx",
1288                    tx_index = senders.len() - 1,
1289                )
1290                .entered()
1291            });
1292            if tracing::enabled!(target: "engine::tree", Level::TRACE) {
1293                trace!(target: "engine::tree", "Executing transaction");
1294            }
1295
1296            let tx_start = Instant::now();
1297            executor.execute_transaction(tx)?;
1298            self.metrics.record_transaction_execution(tx_start.elapsed());
1299
1300            // advance the shared counter so prewarm workers skip already-executed txs
1301            executed_tx_index.store(senders.len(), Ordering::Relaxed);
1302
1303            let current_len = executor.receipts().len();
1304            if current_len > last_sent_len {
1305                last_sent_len = current_len;
1306                // Send the latest receipt to the background task for incremental root computation.
1307                if let Some(receipt) = executor.receipts().last() {
1308                    let tx_index = current_len - 1;
1309                    let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
1310                }
1311            }
1312            // Bump BAL index after each transaction (EIP-7928)
1313            if has_bal {
1314                executor.evm_mut().db_mut().bump_bal_index();
1315            }
1316        }
1317
1318        drop(exec_span);
1319
1320        Ok((executor, senders))
1321    }
1322
1323    /// Compute state root for the given hashed post state in parallel.
1324    ///
1325    /// Uses an overlay factory which provides the state of the parent block, along with the
1326    /// [`HashedPostState`] containing the changes of this block, to compute the state root and
1327    /// trie updates for this block.
1328    ///
1329    /// # Returns
1330    ///
1331    /// Returns `Ok(_)` if computed successfully.
1332    /// Returns `Err(_)` if error was encountered during computation.
1333    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1334    fn compute_state_root_parallel(
1335        &self,
1336        provider_factory: P,
1337        overlay_builder: OverlayBuilder<N>,
1338        hashed_state: &LazyHashedPostState,
1339    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
1340        let hashed_state = hashed_state.get();
1341        // The `hashed_state` argument will be taken into account as part of the overlay, but we
1342        // need to use the prefix sets which were generated from it to indicate to the
1343        // ParallelStateRoot which parts of the trie need to be recomputed.
1344        let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1345        let overlay_builder =
1346            overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1347        let overlay_factory = OverlayStateProviderFactory::new(provider_factory, overlay_builder);
1348        ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
1349            .incremental_root_with_updates()
1350    }
1351
1352    /// Compute state root for the given hashed post state in serial.
1353    ///
1354    /// Uses the same provider construction path as main execution and computes the state root and
1355    /// trie updates for this block directly via
1356    /// [`reth_provider::StateRootProvider::state_root_with_updates`].
1357    fn compute_state_root_serial(
1358        state_provider: StateProviderBox,
1359        hashed_state: &LazyHashedPostState,
1360    ) -> ProviderResult<(B256, TrieUpdates)> {
1361        state_provider.state_root_with_updates(hashed_state.get().as_ref().clone())
1362    }
1363
1364    /// Awaits the state root from the background task, with an optional timeout fallback.
1365    ///
1366    /// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
1367    /// state root task up to the timeout duration. If the task doesn't complete in time, a
1368    /// sequential state root computation is spawned via `spawn_blocking`. Both computations
1369    /// then race: the main thread polls the task receiver and the sequential result channel
1370    /// in a loop, returning whichever finishes first.
1371    ///
1372    /// If no timeout is configured, this simply awaits the state root task without any fallback.
1373    ///
1374    /// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
1375    /// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
1376    /// `Result` captures parallel state root task errors that can still fall back to serial.
1377    #[instrument(
1378        level = "debug",
1379        target = "engine::tree::payload_validator",
1380        name = "await_state_root",
1381        skip_all
1382    )]
1383    fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
1384        &self,
1385        handle: &mut PayloadHandle<Tx, Err, R>,
1386        state_provider_builder: StateProviderBuilder<N, P>,
1387        output: Arc<BlockExecutionOutput<R>>,
1388    ) -> ProviderResult<
1389        Result<(StateRootComputeOutcome, Option<LazyHashedPostState>), ParallelStateRootError>,
1390    > {
1391        let Some(timeout) = self.config.state_root_task_timeout() else {
1392            return Ok(handle.state_root().map(|outcome| (outcome, None)));
1393        };
1394
1395        let task_rx = handle.take_state_root_rx();
1396
1397        match task_rx.recv_timeout(timeout) {
1398            Ok(result) => Ok(result.map(|outcome| (outcome, None))),
1399            Err(RecvTimeoutError::Disconnected) => {
1400                Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
1401            }
1402            Err(RecvTimeoutError::Timeout) => {
1403                warn!(
1404                    target: "engine::tree::payload_validator",
1405                    ?timeout,
1406                    "State root task timed out, spawning sequential fallback"
1407                );
1408                self.metrics.block_validation.state_root_task_timeout_total.increment(1);
1409
1410                let (seq_tx, seq_rx) = std::sync::mpsc::channel();
1411
1412                self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
1413                    let result = state_provider_builder.build().and_then(|provider| {
1414                        let hashed_state =
1415                            LazyHandle::ready(Arc::new(provider.hashed_post_state(&output.state)));
1416                        let (state_root, trie_updates) =
1417                            Self::compute_state_root_serial(provider, &hashed_state)?;
1418
1419                        Ok((state_root, trie_updates, hashed_state))
1420                    });
1421                    let _ = seq_tx.send(result);
1422                });
1423
1424                const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
1425
1426                loop {
1427                    match task_rx.recv_timeout(POLL_INTERVAL) {
1428                        Ok(result) => {
1429                            debug!(
1430                                target: "engine::tree::payload_validator",
1431                                source = "task",
1432                                "State root timeout race won"
1433                            );
1434                            return Ok(result.map(|outcome| (outcome, None)));
1435                        }
1436                        Err(RecvTimeoutError::Disconnected) => {
1437                            debug!(
1438                                target: "engine::tree::payload_validator",
1439                                "State root task dropped, waiting for sequential fallback"
1440                            );
1441                            let result = seq_rx.recv().map_err(|_| {
1442                                ProviderError::other(std::io::Error::other(
1443                                    "both state root computations failed",
1444                                ))
1445                            })?;
1446                            let (state_root, trie_updates, hashed_state) = result?;
1447                            return Ok(Ok((
1448                                StateRootComputeOutcome {
1449                                    state_root,
1450                                    trie_updates: Arc::new(trie_updates),
1451                                    #[cfg(feature = "trie-debug")]
1452                                    debug_recorders: Vec::new(),
1453                                },
1454                                Some(hashed_state),
1455                            )));
1456                        }
1457                        Err(RecvTimeoutError::Timeout) => {}
1458                    }
1459
1460                    if let Ok(result) = seq_rx.try_recv() {
1461                        debug!(
1462                            target: "engine::tree::payload_validator",
1463                            source = "sequential",
1464                            "State root timeout race won"
1465                        );
1466                        let (state_root, trie_updates, hashed_state) = result?;
1467                        return Ok(Ok((
1468                            StateRootComputeOutcome {
1469                                state_root,
1470                                trie_updates: Arc::new(trie_updates),
1471                                #[cfg(feature = "trie-debug")]
1472                                debug_recorders: Vec::new(),
1473                            },
1474                            Some(hashed_state),
1475                        )));
1476                    }
1477                }
1478            }
1479        }
1480    }
1481
1482    /// Compares trie updates from the state root task with serial state root computation.
1483    ///
1484    /// This is used for debugging and validating the correctness of the parallel state root
1485    /// task implementation. When enabled via `--engine.state-root-task-compare-updates`, this
1486    /// method runs a separate serial state root computation and compares the resulting trie
1487    /// updates.
1488    fn compare_trie_updates_with_serial(
1489        &self,
1490        state_provider_builder: StateProviderBuilder<N, P>,
1491        provider_factory: P,
1492        overlay_builder: OverlayBuilder<N>,
1493        output: &BlockExecutionOutput<N::Receipt>,
1494        task_trie_updates: TrieUpdates,
1495    ) -> bool {
1496        debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1497
1498        match state_provider_builder.build().and_then(|provider| {
1499            let hashed_state = Arc::new(provider.hashed_post_state(&output.state));
1500            Self::compute_state_root_serial(provider, &LazyHandle::ready(hashed_state))
1501        }) {
1502            Ok((serial_root, serial_trie_updates)) => {
1503                debug!(
1504                    target: "engine::tree::payload_validator",
1505                    ?serial_root,
1506                    "Serial state root computation finished for comparison"
1507                );
1508
1509                // Get a database provider to use as trie cursor factory
1510                let overlay_factory =
1511                    OverlayStateProviderFactory::new(provider_factory, overlay_builder);
1512                match overlay_factory.database_provider_ro() {
1513                    Ok(provider) => {
1514                        match super::trie_updates::compare_trie_updates(
1515                            &provider,
1516                            task_trie_updates,
1517                            serial_trie_updates,
1518                        ) {
1519                            Ok(has_diff) => return has_diff,
1520                            Err(err) => {
1521                                warn!(
1522                                    target: "engine::tree::payload_validator",
1523                                    %err,
1524                                    "Error comparing trie updates"
1525                                );
1526                                return true;
1527                            }
1528                        }
1529                    }
1530                    Err(err) => {
1531                        warn!(
1532                            target: "engine::tree::payload_validator",
1533                            %err,
1534                            "Failed to get database provider for trie update comparison"
1535                        );
1536                    }
1537                }
1538            }
1539            Err(err) => {
1540                warn!(
1541                    target: "engine::tree::payload_validator",
1542                    %err,
1543                    "Failed to compute serial state root for comparison"
1544                );
1545            }
1546        }
1547        false
1548    }
1549
1550    /// Writes trie debug recorders to a JSON file for the given block number.
1551    ///
1552    /// The file is written to the current working directory as
1553    /// `trie_debug_block_{block_number}.json`.
1554    #[cfg(feature = "trie-debug")]
1555    fn write_trie_debug_recorders(
1556        block_number: u64,
1557        recorders: &[(Option<B256>, TrieDebugRecorder)],
1558    ) {
1559        let path = format!("trie_debug_block_{block_number}.json");
1560        match serde_json::to_string_pretty(recorders) {
1561            Ok(json) => match std::fs::write(&path, json) {
1562                Ok(()) => {
1563                    warn!(
1564                        target: "engine::tree::payload_validator",
1565                        %path,
1566                        "Wrote trie debug recorders to file"
1567                    );
1568                }
1569                Err(err) => {
1570                    warn!(
1571                        target: "engine::tree::payload_validator",
1572                        %err,
1573                        %path,
1574                        "Failed to write trie debug recorders"
1575                    );
1576                }
1577            },
1578            Err(err) => {
1579                warn!(
1580                    target: "engine::tree::payload_validator",
1581                    %err,
1582                    "Failed to serialize trie debug recorders"
1583                );
1584            }
1585        }
1586    }
1587
1588    /// Validates the block after execution.
1589    ///
1590    /// This performs:
1591    /// - parent header validation
1592    /// - post-execution consensus validation
1593    /// - state-root based post-execution validation
1594    ///
1595    /// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
1596    /// and logs bloom from the receipts.
1597    ///
1598    /// The `hashed_state` handle wraps the background hashed post state computation.
1599    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1600    fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1601        &self,
1602        block: &RecoveredBlock<N::Block>,
1603        parent_block: &SealedHeader<N::BlockHeader>,
1604        output: &BlockExecutionOutput<N::Receipt>,
1605        ctx: &mut TreeCtx<'_, N>,
1606        receipt_root_bloom: Option<ReceiptRootBloom>,
1607        built_bal: Option<BlockAccessList>,
1608    ) -> Result<(), InsertBlockErrorKind>
1609    where
1610        V: PayloadValidator<T, Block = N::Block>,
1611    {
1612        let start = Instant::now();
1613
1614        trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1615
1616        // Validate block post-execution rules
1617        let _enter =
1618            debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1619                .entered();
1620        let block_access_list_hash =
1621            built_bal.as_ref().map(|bal| compute_block_access_list_hash(bal));
1622
1623        if let Err(err) = self.consensus.validate_block_post_execution(
1624            block,
1625            output,
1626            receipt_root_bloom,
1627            block_access_list_hash,
1628        ) {
1629            // call post-block hook
1630            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1631            return Err(err.into())
1632        }
1633        drop(_enter);
1634
1635        // record post-execution validation duration
1636        self.metrics
1637            .block_validation
1638            .post_execution_validation_duration
1639            .record(start.elapsed().as_secs_f64());
1640
1641        Ok(())
1642    }
1643
1644    /// Spawns a payload processor task based on the state root strategy.
1645    ///
1646    /// This method determines how to execute the block and compute its state root based on
1647    /// the selected strategy:
1648    /// - `StateRootTask`: Uses a dedicated task for state root computation with proof generation
1649    /// - `Parallel`: Computes state root in parallel with block execution
1650    /// - `Synchronous`: Falls back to sequential execution and state root computation
1651    ///
1652    /// The method handles strategy fallbacks if the preferred approach fails, ensuring
1653    /// block execution always completes with a valid state root.
1654    ///
1655    /// # Arguments
1656    ///
1657    /// * `overlay_factory` - Pre-computed overlay factory for multiproof generation
1658    ///   (`StateRootTask`)
1659    #[instrument(
1660        level = "debug",
1661        target = "engine::tree::payload_validator",
1662        skip_all,
1663        fields(?strategy, parallel_bal_execution)
1664    )]
1665    fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1666        &mut self,
1667        env: ExecutionEnv<Evm>,
1668        txs: T,
1669        provider_builder: StateProviderBuilder<N, P>,
1670        overlay_factory: OverlayStateProviderFactory<P, N>,
1671        strategy: &StateRootStrategy<N>,
1672        parallel_bal_execution: bool,
1673    ) -> Result<
1674        PayloadHandle<
1675            impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1676            impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1677            N::Receipt,
1678        >,
1679        InsertBlockErrorKind,
1680    > {
1681        match strategy {
1682            StateRootStrategy::StateRootTask => {
1683                let spawn_start = Instant::now();
1684
1685                // Use the pre-computed overlay factory for multiproofs
1686                let handle = self.payload_processor.spawn(
1687                    env,
1688                    txs,
1689                    provider_builder,
1690                    overlay_factory,
1691                    &self.config,
1692                    parallel_bal_execution,
1693                );
1694
1695                // record prewarming initialization duration
1696                self.metrics
1697                    .block_validation
1698                    .spawn_payload_processor
1699                    .record(spawn_start.elapsed().as_secs_f64());
1700
1701                Ok(handle)
1702            }
1703            StateRootStrategy::Parallel |
1704            StateRootStrategy::Synchronous |
1705            StateRootStrategy::Custom(_) => {
1706                let start = Instant::now();
1707                let handle =
1708                    self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
1709
1710                // Record prewarming initialization duration
1711                self.metrics
1712                    .block_validation
1713                    .spawn_payload_processor
1714                    .record(start.elapsed().as_secs_f64());
1715
1716                Ok(handle)
1717            }
1718        }
1719    }
1720
1721    /// Creates a `StateProviderBuilder` for the given parent hash.
1722    ///
1723    /// This method checks if the parent is in the tree state (in-memory) or persisted to disk,
1724    /// and creates the appropriate provider builder.
1725    fn state_provider_builder(
1726        &self,
1727        hash: B256,
1728        state: &EngineApiTreeState<N>,
1729    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1730        if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1731            debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1732            // the block leads back to the canonical chain
1733            return Ok(Some(StateProviderBuilder::new(
1734                self.provider.clone(),
1735                historical,
1736                Some(blocks),
1737            )))
1738        }
1739
1740        // Check if the block is persisted
1741        if let Some(header) = self.provider.header(hash)? {
1742            debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1743            // For persisted blocks, we create a builder that will fetch state directly from the
1744            // database
1745            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1746        }
1747
1748        debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1749        Ok(None)
1750    }
1751
1752    /// Determines the state root computation strategy based on configuration.
1753    ///
1754    /// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
1755    /// too expensive because it requires walking all paths in every proof.
1756    fn plan_state_root_computation(&self) -> StateRootStrategy<N> {
1757        if let Some(custom_state_root) = &self.custom_state_root {
1758            StateRootStrategy::Custom(custom_state_root.clone())
1759        } else if self.config.state_root_fallback() {
1760            StateRootStrategy::Synchronous
1761        } else if self.config.use_state_root_task() {
1762            StateRootStrategy::StateRootTask
1763        } else {
1764            StateRootStrategy::Parallel
1765        }
1766    }
1767
1768    /// Called when an invalid block is encountered during validation.
1769    fn on_invalid_block(
1770        &self,
1771        parent_header: &SealedHeader<N::BlockHeader>,
1772        block: &RecoveredBlock<N::Block>,
1773        output: &BlockExecutionOutput<N::Receipt>,
1774        trie_updates: Option<(&TrieUpdates, B256)>,
1775        state: &mut EngineApiTreeState<N>,
1776    ) {
1777        if state.invalid_headers.get(&block.hash()).is_some() {
1778            // we already marked this block as invalid
1779            return
1780        }
1781        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1782    }
1783
1784    /// Returns an overlay builder configured for a payload parent.
1785    fn overlay_builder_for_parent(
1786        parent_hash: B256,
1787        state: &EngineApiTreeState<N>,
1788        changeset_cache: ChangesetCache,
1789    ) -> OverlayBuilder<N> {
1790        OverlayBuilder::new(parent_hash, changeset_cache)
1791            .with_state_trie_overlay_manager(state.tree_state.state_trie_overlays.clone())
1792    }
1793
1794    /// Spawns a background task to compute and sort trie data for the executed block.
1795    ///
1796    /// This function creates a [`DeferredTrieData`] handle with fallback inputs and spawns a
1797    /// blocking task that calls `wait_cloned()` to:
1798    /// 1. Sort the block's hashed state and trie updates
1799    /// 2. Cache the result so subsequent calls return immediately
1800    ///
1801    /// If the background task hasn't completed when `trie_data()` is called, `wait_cloned()`
1802    /// computes from the stored inputs, eliminating deadlock risk and duplicate computation.
1803    ///
1804    /// The validation hot path can return immediately after state root verification,
1805    /// while consumers (DB writes, overlay providers, proofs) get trie data either
1806    /// from the completed task or via fallback computation.
1807    fn spawn_deferred_trie_task(
1808        &self,
1809        block: Arc<RecoveredBlock<N::Block>>,
1810        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1811        hashed_state: LazyHashedPostState,
1812        trie_output: Arc<TrieUpdates>,
1813        changeset_provider: impl TrieCursorFactory + Send + 'static,
1814    ) -> ExecutedBlock<N> {
1815        // Create deferred handle with fallback inputs in case the background task hasn't completed.
1816        // Resolve the lazy handle into Arc<HashedPostState>. By this point the hashed state has
1817        // already been computed and used for state root verification, so .get() returns instantly.
1818        let hashed_state = match hashed_state.try_into_inner() {
1819            Ok(state) => state,
1820            Err(handle) => handle.get().clone(),
1821        };
1822        let deferred_trie_data = DeferredTrieData::pending(hashed_state, trie_output);
1823        let deferred_handle_task = deferred_trie_data.clone();
1824        let block_validation_metrics = self.metrics.block_validation.clone();
1825
1826        // Capture block info and cache handle for changeset computation
1827        let block_hash = block.hash();
1828        let block_number = block.number();
1829
1830        // Register a pending changeset entry so that concurrent readers will wait for
1831        // this computation to finish rather than falling back to the expensive DB path.
1832        // The guard ensures the pending entry is cancelled if the task panics.
1833        let pending_changeset_guard = self.changeset_cache.register_pending(block_hash);
1834
1835        // Spawn background task to compute trie data. Calling `wait_cloned` will compute from
1836        // the stored inputs and cache the result, so subsequent calls return immediately.
1837        let compute_trie_input_task = move || {
1838            let _span = debug_span!(
1839                target: "engine::tree::payload_validator",
1840                "compute_trie_input_task",
1841                block_number
1842            )
1843            .entered();
1844
1845            let result = panic::catch_unwind(AssertUnwindSafe(|| {
1846                let compute_start = Instant::now();
1847                let computed = deferred_handle_task.wait_cloned();
1848                block_validation_metrics
1849                    .deferred_trie_compute_duration
1850                    .record(compute_start.elapsed().as_secs_f64());
1851
1852                // Record sizes of the computed trie data
1853                block_validation_metrics
1854                    .hashed_post_state_size
1855                    .record(computed.hashed_state.total_len() as f64);
1856                block_validation_metrics
1857                    .trie_updates_sorted_size
1858                    .record(computed.trie_updates.total_len() as f64);
1859                // Compute and cache changesets using the computed trie_updates.
1860                // Use the pre-created provider to avoid races with changeset cache
1861                // eviction that can happen between task spawn and execution.
1862                let changeset_start = Instant::now();
1863
1864                match reth_trie::changesets::compute_trie_changesets(
1865                    &changeset_provider,
1866                    &computed.trie_updates,
1867                ) {
1868                    Ok(changesets) => {
1869                        debug!(
1870                            target: "engine::tree::changeset",
1871                            ?block_number,
1872                            elapsed = ?changeset_start.elapsed(),
1873                            "Computed and caching changesets"
1874                        );
1875
1876                        pending_changeset_guard.resolve(block_number, Arc::new(changesets));
1877                    }
1878                    Err(e) => {
1879                        warn!(
1880                            target: "engine::tree::changeset",
1881                            ?block_number,
1882                            ?e,
1883                            "Failed to compute changesets in deferred trie task"
1884                        );
1885                    }
1886                }
1887            }));
1888
1889            if result.is_err() {
1890                error!(
1891                    target: "engine::tree::payload_validator",
1892                    "Deferred trie task panicked; fallback computation will be used when trie data is accessed"
1893                );
1894            }
1895        };
1896
1897        // Spawn task that computes trie data asynchronously.
1898        self.payload_processor
1899            .executor()
1900            .spawn_blocking_named(DEFERRED_TRIE_WORKER_NAME, compute_trie_input_task);
1901
1902        ExecutedBlock::with_deferred_trie_data(block, execution_outcome, deferred_trie_data)
1903    }
1904
1905    fn calculate_timing_stats(
1906        &self,
1907        block: &RecoveredBlock<N::Block>,
1908        provider_stats: Arc<StateProviderStats>,
1909        cache_stats: Option<Arc<CacheStats>>,
1910        output: &BlockExecutionOutput<N::Receipt>,
1911        execution_duration: Duration,
1912        state_hash_duration: Duration,
1913    ) -> Box<ExecutionTimingStats> {
1914        let accounts_read = provider_stats.total_account_fetches();
1915        let storage_read = provider_stats.total_storage_fetches();
1916        let code_read = provider_stats.total_code_fetches();
1917        let code_bytes_read = provider_stats.total_code_fetched_bytes();
1918
1919        // Write stats from BundleState (final state changes)
1920        let accounts_changed = output.state.state.len();
1921        let accounts_deleted =
1922            output.state.state.values().filter(|acc| acc.was_destroyed()).count();
1923        let storage_slots_changed =
1924            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
1925        let storage_slots_deleted = output
1926            .state
1927            .state
1928            .values()
1929            .flat_map(|account| account.storage.values())
1930            .filter(|slot| {
1931                slot.present_value.is_zero() && !slot.previous_or_original_value.is_zero()
1932            })
1933            .count();
1934
1935        // Helper: check if account represents a new contract deployment
1936        let is_new_deployment = |acc: &BundleAccount| -> bool {
1937            let has_code_now = acc.info.as_ref().is_some_and(|info| info.code_hash != KECCAK_EMPTY);
1938            let had_no_code_before = acc
1939                .original_info
1940                .as_ref()
1941                .map(|info| info.code_hash == KECCAK_EMPTY)
1942                .unwrap_or(true);
1943            has_code_now && had_no_code_before
1944        };
1945
1946        let bytecodes_changed =
1947            output.state.state.values().filter(|acc| is_new_deployment(acc)).count();
1948
1949        // Unique new code hashes to count actual bytes persisted (deduplicated)
1950        let unique_new_code_hashes: B256Set = output
1951            .state
1952            .state
1953            .values()
1954            .filter(|acc| is_new_deployment(acc))
1955            .filter_map(|acc| acc.info.as_ref().map(|info| info.code_hash))
1956            .collect();
1957        let code_bytes_written: usize = unique_new_code_hashes
1958            .iter()
1959            .filter_map(|hash| {
1960                output.state.contracts.get(hash).map(|bytecode| bytecode.original_bytes().len())
1961            })
1962            .sum();
1963
1964        // Total time spent fetching state during execution
1965        let state_read_duration = provider_stats.total_account_fetch_latency() +
1966            provider_stats.total_storage_fetch_latency() +
1967            provider_stats.total_code_fetch_latency();
1968
1969        // EIP-7702 delegation tracking from bytecode changes
1970        // Count new EIP-7702 bytecodes as delegations set
1971        let eip7702_delegations_set =
1972            output.state.contracts.values().filter(|bytecode| bytecode.is_eip7702()).count();
1973        // Delegations cleared: accounts where bytecode changed FROM EIP-7702 TO empty
1974        // This detects when an EIP-7702 delegation is removed by setting code to empty
1975        // Note: Clearing a delegation does NOT destroy the account - it just empties the
1976        // bytecode
1977        let eip7702_delegations_cleared = output
1978            .state
1979            .state
1980            .values()
1981            .filter(|acc| {
1982                // Check if original bytecode was EIP-7702
1983                let original_was_eip7702 = acc
1984                    .original_info
1985                    .as_ref()
1986                    .and_then(|info| info.code.as_ref())
1987                    .map(|bytecode| bytecode.is_eip7702())
1988                    .unwrap_or(false);
1989
1990                // Check if current code is empty (delegation cleared)
1991                let code_now_empty =
1992                    acc.info.as_ref().map(|info| info.code_hash == KECCAK_EMPTY).unwrap_or(false);
1993
1994                original_was_eip7702 && code_now_empty
1995            })
1996            .count();
1997
1998        // Get cache statistics for detailed block logging
1999        let (account_cache_hits, account_cache_misses) = cache_stats
2000            .as_ref()
2001            .map(|s| (s.account_hits(), s.account_misses()))
2002            .unwrap_or_default();
2003        let (storage_cache_hits, storage_cache_misses) = cache_stats
2004            .as_ref()
2005            .map(|s| (s.storage_hits(), s.storage_misses()))
2006            .unwrap_or_default();
2007        let (code_cache_hits, code_cache_misses) =
2008            cache_stats.as_ref().map(|s| (s.code_hits(), s.code_misses())).unwrap_or_default();
2009
2010        // Build execution timing stats for detailed block logging
2011        Box::new(ExecutionTimingStats {
2012            block_number: block.number(),
2013            block_hash: block.hash(),
2014            gas_used: output.result.gas_used,
2015            tx_count: block.transaction_count(),
2016            execution_duration,
2017            state_read_duration,
2018            state_hash_duration,
2019            accounts_read,
2020            storage_read,
2021            code_read,
2022            code_bytes_read,
2023            accounts_changed,
2024            accounts_deleted,
2025            storage_slots_changed,
2026            storage_slots_deleted,
2027            bytecodes_changed,
2028            code_bytes_written,
2029            eip7702_delegations_set,
2030            eip7702_delegations_cleared,
2031            account_cache_hits,
2032            account_cache_misses,
2033            storage_cache_hits,
2034            storage_cache_misses,
2035            code_cache_hits,
2036            code_cache_misses,
2037        })
2038    }
2039}
2040
2041/// Strategy describing how to compute the state root.
2042#[derive(derive_more::Debug, Clone)]
2043enum StateRootStrategy<N: NodePrimitives> {
2044    /// Use the state root task (background sparse trie computation).
2045    StateRootTask,
2046    /// Run the parallel state root computation on the calling thread.
2047    Parallel,
2048    /// Fall back to synchronous computation via the state provider.
2049    Synchronous,
2050    /// Custom state root computation strategy.
2051    Custom(#[debug(skip)] CustomStateRoot<N>),
2052}
2053
2054/// Type that validates the payloads processed by the engine.
2055///
2056/// This provides the necessary functions for validating/executing payloads/blocks.
2057pub trait EngineValidator<
2058    Types: PayloadTypes,
2059    N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
2060>: Send + Sync + 'static
2061{
2062    /// Validates the payload attributes with respect to the header.
2063    ///
2064    /// By default, this enforces that the payload attributes timestamp is greater than the
2065    /// timestamp according to:
2066    ///   > 7. Client software MUST ensure that payloadAttributes.timestamp is greater than
2067    ///   > timestamp
2068    ///   > of a block referenced by forkchoiceState.headBlockHash.
2069    ///
2070    /// See also: <https://github.com/ethereum/execution-apis/blob/main/src/engine/common.md#specification-1>
2071    fn validate_payload_attributes_against_header(
2072        &self,
2073        attr: &Types::PayloadAttributes,
2074        header: &N::BlockHeader,
2075    ) -> Result<(), InvalidPayloadAttributesError>;
2076
2077    /// Ensures that the given payload does not violate any consensus rules that concern the block's
2078    /// layout.
2079    ///
2080    /// This function must convert the payload into the executable block and pre-validate its
2081    /// fields.
2082    ///
2083    /// Implementers should ensure that the checks are done in the order that conforms with the
2084    /// engine-API specification.
2085    fn convert_payload_to_block(
2086        &self,
2087        payload: Types::ExecutionData,
2088    ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
2089
2090    /// Validates a payload received from engine API.
2091    fn validate_payload(
2092        &mut self,
2093        payload: Types::ExecutionData,
2094        ctx: TreeCtx<'_, N>,
2095    ) -> ValidationOutcome<N>;
2096
2097    /// Validates a block downloaded from the network.
2098    fn validate_block(
2099        &mut self,
2100        block: SealedBlock<N::Block>,
2101        ctx: TreeCtx<'_, N>,
2102    ) -> ValidationOutcome<N>;
2103
2104    /// Hook called after an executed block is inserted directly into the tree.
2105    ///
2106    /// This is invoked when blocks are inserted via `InsertExecutedBlock` (e.g., locally built
2107    /// blocks by sequencers) to allow implementations to update internal state such as caches.
2108    fn on_inserted_executed_block(
2109        &self,
2110        block: BuiltPayloadExecutedBlock<N>,
2111        state: &EngineApiTreeState<N>,
2112    ) -> ProviderResult<ExecutedBlock<N>>;
2113
2114    /// Returns [`SavedCache`] for the given block hash.
2115    fn cache_for(&self, _block_hash: B256) -> Option<SavedCache>;
2116
2117    /// Spawns a sparse trie pipeline and returns a handle for the payload builder.
2118    fn sparse_trie_handle_for(
2119        &self,
2120        parent_hash: B256,
2121        parent_state_root: B256,
2122        state: &EngineApiTreeState<N>,
2123    ) -> Option<StateRootHandle>;
2124}
2125
2126impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
2127where
2128    P: DatabaseProviderFactory<
2129            Provider: BlockReader
2130                          + StageCheckpointReader
2131                          + PruneCheckpointReader
2132                          + ChangeSetReader
2133                          + StorageChangeSetReader
2134                          + BlockNumReader
2135                          + StorageSettingsCache,
2136        > + BlockReader<Header = N::BlockHeader>
2137        + StateProviderFactory
2138        + StateReader
2139        + ChangeSetReader
2140        + BlockNumReader
2141        + HashedPostStateProvider
2142        + Clone
2143        + 'static,
2144    N: NodePrimitives,
2145    V: PayloadValidator<Types, Block = N::Block> + Clone,
2146    Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
2147    Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
2148{
2149    fn validate_payload_attributes_against_header(
2150        &self,
2151        attr: &Types::PayloadAttributes,
2152        header: &N::BlockHeader,
2153    ) -> Result<(), InvalidPayloadAttributesError> {
2154        self.validator.validate_payload_attributes_against_header(attr, header)
2155    }
2156
2157    fn convert_payload_to_block(
2158        &self,
2159        payload: Types::ExecutionData,
2160    ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
2161        let block = self.validator.convert_payload_to_block(payload)?;
2162        Ok(block)
2163    }
2164
2165    fn validate_payload(
2166        &mut self,
2167        payload: Types::ExecutionData,
2168        ctx: TreeCtx<'_, N>,
2169    ) -> ValidationOutcome<N> {
2170        self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
2171    }
2172
2173    fn validate_block(
2174        &mut self,
2175        block: SealedBlock<N::Block>,
2176        ctx: TreeCtx<'_, N>,
2177    ) -> ValidationOutcome<N> {
2178        self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
2179    }
2180
2181    fn on_inserted_executed_block(
2182        &self,
2183        block: BuiltPayloadExecutedBlock<N>,
2184        state: &EngineApiTreeState<N>,
2185    ) -> ProviderResult<ExecutedBlock<N>> {
2186        self.payload_processor.on_inserted_executed_block(
2187            block.recovered_block.block_with_parent(),
2188            &block.execution_output.state,
2189        );
2190
2191        let overlay_factory = OverlayStateProviderFactory::new(
2192            self.provider.clone(),
2193            Self::overlay_builder_for_parent(
2194                block.recovered_block.parent_hash(),
2195                state,
2196                self.changeset_cache.clone(),
2197            ),
2198        );
2199        let changeset_provider = overlay_factory.database_provider_ro()?;
2200
2201        Ok(self.spawn_deferred_trie_task(
2202            block.recovered_block,
2203            block.execution_output,
2204            LazyHashedPostState::ready(block.hashed_state),
2205            block.trie_updates,
2206            changeset_provider,
2207        ))
2208    }
2209
2210    fn cache_for(&self, block_hash: B256) -> Option<SavedCache> {
2211        Some(self.payload_processor.cache_for(block_hash))
2212    }
2213
2214    fn sparse_trie_handle_for(
2215        &self,
2216        parent_hash: B256,
2217        parent_state_root: B256,
2218        state: &EngineApiTreeState<N>,
2219    ) -> Option<StateRootHandle> {
2220        let overlay_factory = OverlayStateProviderFactory::new(
2221            self.provider.clone(),
2222            Self::overlay_builder_for_parent(parent_hash, state, self.changeset_cache.clone()),
2223        );
2224
2225        Some(self.payload_processor.spawn_state_root(
2226            overlay_factory,
2227            parent_state_root,
2228            // Full proof workers — tx count unknown at FCU time (block built incrementally)
2229            false,
2230            &self.config,
2231        ))
2232    }
2233}
2234
2235impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
2236where
2237    Evm: ConfigureEvm,
2238{
2239    fn wait_for_caches(&self) -> CacheWaitDurations {
2240        self.payload_processor.wait_for_caches()
2241    }
2242}
2243
2244/// Enum representing either block or payload being validated.
2245#[derive(Debug, Clone)]
2246pub enum BlockOrPayload<T: PayloadTypes> {
2247    /// Payload.
2248    Payload(T::ExecutionData),
2249    /// Block.
2250    Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
2251}
2252
2253impl<T: PayloadTypes> BlockOrPayload<T> {
2254    /// Returns the hash of the block.
2255    pub fn hash(&self) -> B256 {
2256        match self {
2257            Self::Payload(payload) => payload.block_hash(),
2258            Self::Block(block) => block.hash(),
2259        }
2260    }
2261
2262    /// Returns the number and hash of the block.
2263    pub fn num_hash(&self) -> NumHash {
2264        match self {
2265            Self::Payload(payload) => payload.num_hash(),
2266            Self::Block(block) => block.num_hash(),
2267        }
2268    }
2269
2270    /// Returns the parent hash of the block.
2271    pub fn parent_hash(&self) -> B256 {
2272        match self {
2273            Self::Payload(payload) => payload.parent_hash(),
2274            Self::Block(block) => block.parent_hash(),
2275        }
2276    }
2277
2278    /// Returns [`BlockWithParent`] for the block.
2279    pub fn block_with_parent(&self) -> BlockWithParent {
2280        match self {
2281            Self::Payload(payload) => payload.block_with_parent(),
2282            Self::Block(block) => block.block_with_parent(),
2283        }
2284    }
2285
2286    /// Returns a string showing whether or not this is a block or payload.
2287    pub const fn type_name(&self) -> &'static str {
2288        match self {
2289            Self::Payload(_) => "payload",
2290            Self::Block(_) => "block",
2291        }
2292    }
2293
2294    /// Returns true if this is a payload.
2295    pub const fn is_payload(&self) -> bool {
2296        matches!(self, Self::Payload(_))
2297    }
2298
2299    /// Returns true if this is a block.
2300    pub const fn is_block(&self) -> bool {
2301        matches!(self, Self::Block(_))
2302    }
2303
2304    /// Returns the decoded block access list, if present and successfully decoded.
2305    pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
2306        match self {
2307            Self::Payload(payload) => payload
2308                .block_access_list()
2309                .map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
2310                .transpose(),
2311            Self::Block(_) => Ok(None),
2312        }
2313    }
2314
2315    /// Returns the number of transactions in the payload or block.
2316    pub fn transaction_count(&self) -> usize
2317    where
2318        T::ExecutionData: ExecutionPayload,
2319    {
2320        match self {
2321            Self::Payload(payload) => payload.transaction_count(),
2322            Self::Block(block) => block.transaction_count(),
2323        }
2324    }
2325
2326    /// Returns the withdrawals from the payload or block.
2327    pub fn withdrawals(&self) -> Option<&[Withdrawal]>
2328    where
2329        T::ExecutionData: ExecutionPayload,
2330    {
2331        match self {
2332            Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
2333            Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
2334        }
2335    }
2336
2337    /// Returns the total gas used by the block.
2338    pub fn gas_used(&self) -> u64
2339    where
2340        T::ExecutionData: ExecutionPayload,
2341    {
2342        match self {
2343            Self::Payload(payload) => payload.gas_used(),
2344            Self::Block(block) => block.gas_used(),
2345        }
2346    }
2347
2348    /// Returns the gas limit used by the block.
2349    pub fn gas_limit(&self) -> u64
2350    where
2351        T::ExecutionData: ExecutionPayload,
2352    {
2353        match self {
2354            Self::Payload(payload) => payload.gas_limit(),
2355            Self::Block(block) => block.gas_limit(),
2356        }
2357    }
2358}
2359
2360/// Input for [`CustomStateRoot`].
2361#[derive(Debug, Clone)]
2362pub struct CustomStateRootInput<'a, N: NodePrimitives> {
2363    /// The block being validated.
2364    pub block: &'a SealedBlock<N::Block>,
2365    /// The parent block.
2366    pub parent_block: &'a SealedHeader<N::BlockHeader>,
2367    /// The execution output.
2368    pub output: &'a BlockExecutionOutput<N::Receipt>,
2369    /// The hashed state.
2370    pub hashed_state: &'a LazyHashedPostState,
2371}
2372
2373/// A custom state root computation handler.
2374pub type CustomStateRoot<N> = Arc<
2375    dyn Fn(CustomStateRootInput<'_, N>) -> ProviderResult<(B256, TrieUpdates)>
2376        + Send
2377        + Sync
2378        + 'static,
2379>;