Skip to main content

reth_engine_tree/tree/
payload_validator.rs

1//! Types and traits for validating blocks and payloads.
2//!
3//! # Payload validation flow
4//!
5//! [`BasicEngineValidator::validate_block_with_state`] is the engine-side entry point for an
6//! inserted block or an `engine_newPayload` payload. It overlaps payload conversion, transaction
7//! preparation, cache prewarming, receipt-root computation, state-root computation, and deferred
8//! trie input construction wherever those tasks do not depend on each other.
9//!
10//! ## Validation phases
11//!
12//! 1. Fetch the parent header and spawn `payload-convert`, which converts payloads into sealed
13//!    blocks and runs header, parent-header, and pre-execution consensus validation.
14//! 2. Build the parent state provider, EVM environment, transaction iterator, lazy ancestor
15//!    overlay, and optional decoded EIP-7928 block access list (BAL).
16//! 3. Choose the state-root strategy: `StateRootTask`, `Parallel`, `Synchronous`, or `Custom`.
17//! 4. Spawn the payload processor. This always prepares transaction conversion and prewarming; the
18//!    `StateRootTask` strategy also starts proof workers and the sparse trie task.
19//! 5. Execute the block. BAL payloads use the parallel BAL execute path only when state caching and
20//!    BAL parallel execution are enabled. Otherwise the regular executor still builds and validates
21//!    the BAL before post-execution consensus uses the decoded BAL hash.
22//! 6. Stop prewarming, terminate execution caching, spawn `hash-post-state`, await
23//!    `payload-convert` and `receipt-root`, then run post-execution consensus validation.
24//! 7. Resolve the state root from the selected strategy and fall back to serial computation when a
25//!    non-custom parallel path fails to produce a usable root.
26//! 8. Verify the header state root, spawn deferred trie input computation, and return the executed
27//!    block without waiting for that deferred trie task on the hot path.
28//!
29//! ## Spawned background work
30//!
31//! | Work | Spawned when | Role | Completion point |
32//! | --- | --- | --- | --- |
33//! | `payload-convert` | parent is known | convert payloads, validate header and body roots | after execution, unless the gas sanity check awaits it earlier |
34//! | `tx-iterator` | payload processor setup | convert transactions, using rayon for larger blocks | consumed by regular and BAL execution |
35//! | `prewarm` | payload processor setup | warm execution caches; in BAL mode, stream BAL-derived trie targets | stopped after execution, then caching is terminated |
36//! | proof workers | `StateRootTask` setup | fetch trie proofs for sparse trie updates | consumed by the sparse trie task |
37//! | `sparse-trie` | `StateRootTask` setup | apply execution or BAL updates and compute the state root | awaited by `await_state_root_with_timeout` |
38//! | `receipt-root` | execution start | compute receipt root and logs bloom incrementally | awaited before post-execution consensus |
39//! | `hash-post-state` | after execution | hash changed accounts and storage from `BundleState` | awaited by post-execution validation and root computation |
40//! | `serial-root` | sparse trie timeout fallback | race serial state-root computation against the sparse trie task | polled by `await_state_root_with_timeout` |
41//! | deferred trie task | after root verification | sort trie data, merge overlays, and cache changesets | not awaited by the validation hot path |
42//!
43//! ```mermaid
44//! sequenceDiagram
45//!     autonumber
46//!     participant Main as validate_block_with_state
47//!     participant Convert as payload-convert
48//!     participant Tx as tx-iterator
49//!     participant Prewarm as prewarm
50//!     participant Exec as EVM execution
51//!     participant Receipt as receipt-root
52//!     participant Trie as sparse trie and proofs
53//!     participant Hash as hash-post-state
54//!     participant Deferred as deferred trie task
55//!
56//!     Main->>Convert: spawn convert and pre-execution validation
57//!     Main->>Main: parent provider, EVM env, optional BAL decode
58//!     Main->>Tx: spawn transaction conversion
59//!     alt StateRootTask
60//!         Main->>Trie: spawn proof workers and sparse trie
61//!     end
62//!     Main->>Prewarm: spawn transaction, BAL, or skipped prewarm
63//!     Main->>Receipt: spawn receipt root task
64//!     alt BAL path eligible
65//!         Main->>Exec: execute_block_bal
66//!         Prewarm->>Trie: BAL-derived sparse trie updates
67//!     else regular execution
68//!         Tx-->>Exec: recovered transactions in block order
69//!         Main->>Exec: execute_block
70//!         Exec->>Receipt: stream receipts
71//!         Exec->>Trie: stream state hook updates
72//!         Exec->>Exec: rebuild and validate BAL when present
73//!     end
74//!     Main->>Prewarm: stop prewarming and terminate cache
75//!     Main->>Hash: spawn changed-state hashing
76//!     Convert-->>Main: sealed block
77//!     Receipt-->>Main: receipt root and logs bloom
78//!     Main->>Main: post-execution consensus and BAL hash check
79//!     Hash-->>Main: hashed post state
80//!     alt StateRootTask
81//!         Trie-->>Main: state root and trie updates
82//!     else Parallel
83//!         Main->>Main: compute ParallelStateRoot
84//!     else Custom
85//!         Main->>Main: call custom root function
86//!     else Synchronous or fallback
87//!         Main->>Main: compute serial StateRoot
88//!     end
89//!     Main->>Main: verify header state root
90//!     Main->>Deferred: spawn trie input sorting and cache update
91//!     Main-->>Main: return ValidationOutput
92//! ```
93//!
94//! ## Payload attributes validation
95//!
96//! During `engine_forkchoiceUpdated`,
97//! [`PayloadValidator::validate_payload_attributes_against_header`] checks payload attributes
98//! before a payload build job starts. On failure, the engine returns
99//! `INVALID_PAYLOAD_ATTRIBUTES` without rolling back the forkchoice update.
100
101use crate::tree::{
102    error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
103    instrumented_state::{InstrumentedStateProvider, StateProviderMetrics, StateProviderStats},
104    multiproof::{StateRootComputeOutcome, StateRootHandle},
105    payload_processor::PayloadProcessor,
106    precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
107    types::{InsertPayloadResult, ValidationOutput},
108    CacheWaitDurations, CachedStateProvider, EngineApiMetrics, EngineApiTreeState, ExecutionEnv,
109    PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
110};
111use alloy_consensus::transaction::{Either, TxHashRef};
112use alloy_eip7928::{bal::DecodedBal, compute_block_access_list_hash, BlockAccessList};
113use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
114use alloy_evm::Evm;
115use alloy_primitives::{map::B256Set, B256};
116use reth_tasks::LazyHandle;
117#[cfg(feature = "trie-debug")]
118use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
119
120use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
121use reth_chain_state::{
122    CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, ExecutionTimingStats,
123};
124use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
125use reth_engine_primitives::{
126    ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
127};
128use reth_errors::{BlockExecutionError, ProviderResult};
129use reth_evm::{
130    block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
131    OnStateHook, SpecFor,
132};
133use reth_execution_cache::{CacheFillMode, CacheStats, SavedCache};
134use reth_payload_primitives::{
135    BuiltPayload, BuiltPayloadExecutedBlock, InvalidPayloadAttributesError, NewPayloadError,
136    PayloadTypes,
137};
138use reth_primitives_traits::{
139    AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
140    RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
141};
142use reth_provider::{
143    providers::{OverlayBuilder, OverlayStateProvider, OverlayStateProviderFactory},
144    BlockExecutionOutput, BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory,
145    DatabaseProviderROFactory, HashedPostStateProvider, ProviderError, PruneCheckpointReader,
146    StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateReader,
147    StorageChangeSetReader, StorageSettingsCache,
148};
149use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
150use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState};
151use reth_trie_db::ChangesetCache;
152use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
153use revm_primitives::{Address, KECCAK_EMPTY};
154use std::{
155    collections::HashMap,
156    panic::{self, AssertUnwindSafe},
157    sync::{
158        atomic::{AtomicUsize, Ordering},
159        mpsc::RecvTimeoutError,
160        Arc,
161    },
162    time::Duration,
163};
164use tracing::{debug, debug_span, error, info, instrument, trace, warn, Level, Span};
165
166pub use crate::tree::types::ValidationOutcome;
167
168/// Handle to a [`HashedPostState`] computed on a background thread.
169type LazyHashedPostState = reth_tasks::LazyHandle<Arc<HashedPostState>>;
170
171/// Multiplier over the parent's gas limit beyond which a block's claimed gas usage cannot be
172/// legitimate. Gas limit can change by at most 1/1024 per block, so anything over this is rejected
173/// without entering execution.
174const MAX_EXPECTED_GAS_USAGE_MULTIPLIER: u64 = 2;
175
176/// Worker name for deferred trie data and changeset provider preparation.
177const DEFERRED_TRIE_WORKER_NAME: &str = "deferred-trie";
178
179type ReceiptRootSender<N> =
180    crossbeam_channel::Sender<IndexedReceipt<<N as NodePrimitives>::Receipt>>;
181type ReceiptRootReceiver = tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>;
182
183/// Context providing access to tree state during validation.
184///
185/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
186/// internals
187pub struct TreeCtx<'a, N: NodePrimitives> {
188    /// The engine API tree state
189    state: &'a mut EngineApiTreeState<N>,
190    /// Reference to the canonical in-memory state
191    canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
192}
193
194impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        f.debug_struct("TreeCtx")
197            .field("state", &"EngineApiTreeState")
198            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
199            .finish()
200    }
201}
202
203impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
204    /// Creates a new tree context
205    pub const fn new(
206        state: &'a mut EngineApiTreeState<N>,
207        canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
208    ) -> Self {
209        Self { state, canonical_in_memory_state }
210    }
211}
212
213impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
214    /// Returns a reference to the engine tree state
215    pub const fn state(&self) -> &EngineApiTreeState<N> {
216        &*self.state
217    }
218
219    /// Returns a mutable reference to the engine tree state
220    pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
221        self.state
222    }
223
224    /// Returns a reference to the canonical in-memory state
225    pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
226        self.canonical_in_memory_state
227    }
228}
229
230/// Pauses JIT helper execution while validating imported payloads.
231///
232/// Validation still queues JIT work and can use resident compiled code, but helper execution is
233/// paused during validation to minimize latency. Queued work resumes when validation exits, so JIT
234/// compilation is biased toward idle periods instead of competing with payload validation.
235struct JitPauseGuard<Evm: ConfigureEvm>(Evm);
236
237impl<Evm: ConfigureEvm> JitPauseGuard<Evm> {
238    fn new(evm_config: &Evm) -> Self {
239        if let Some(jit_backend) = evm_config.jit_backend() {
240            jit_backend.pause();
241        }
242        Self(evm_config.clone())
243    }
244}
245
246impl<Evm: ConfigureEvm> Drop for JitPauseGuard<Evm> {
247    fn drop(&mut self) {
248        if let Some(jit_backend) = self.0.jit_backend() {
249            jit_backend.resume();
250        }
251    }
252}
253
254/// A helper type that provides reusable payload validation logic for network-specific validators.
255///
256/// This type satisfies [`EngineValidator`] and is responsible for executing blocks/payloads.
257///
258/// This type contains common validation, execution, and state root computation logic that can be
259/// used by network-specific payload validators (e.g., Ethereum, Optimism). It is not meant to be
260/// used as a standalone component, but rather as a building block for concrete implementations.
261#[derive(derive_more::Debug)]
262pub struct BasicEngineValidator<P, Evm, V>
263where
264    Evm: ConfigureEvm,
265{
266    /// Provider for database access.
267    provider: P,
268    /// Consensus implementation for validation.
269    consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
270    /// EVM configuration.
271    evm_config: Evm,
272    /// Configuration for the tree.
273    config: TreeConfig,
274    /// Payload processor for state root computation.
275    payload_processor: PayloadProcessor<Evm>,
276    /// Precompile cache map.
277    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
278    /// Precompile cache metrics.
279    precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
280    /// Hook to call when invalid blocks are encountered.
281    #[debug(skip)]
282    invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
283    /// Metrics for the engine api.
284    metrics: EngineApiMetrics,
285    /// Validator for the payload.
286    validator: V,
287    /// Changeset cache for in-memory trie changesets
288    changeset_cache: ChangesetCache,
289    /// Task runtime for spawning parallel work.
290    runtime: reth_tasks::Runtime,
291    /// Custom state root computation function.
292    custom_state_root: Option<CustomStateRoot<Evm::Primitives>>,
293}
294
295impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
296where
297    N: NodePrimitives,
298    P: DatabaseProviderFactory<
299            Provider: BlockReader
300                          + StageCheckpointReader
301                          + PruneCheckpointReader
302                          + ChangeSetReader
303                          + StorageChangeSetReader
304                          + BlockNumReader
305                          + StorageSettingsCache,
306        > + BlockReader<Header = N::BlockHeader>
307        + ChangeSetReader
308        + BlockNumReader
309        + StateProviderFactory
310        + StateReader
311        + HashedPostStateProvider
312        + Clone
313        + 'static,
314    Evm: ConfigureEvm<Primitives = N> + 'static,
315{
316    /// Creates a new `TreePayloadValidator`.
317    #[expect(clippy::too_many_arguments)]
318    pub fn new(
319        provider: P,
320        consensus: Arc<dyn FullConsensus<N>>,
321        evm_config: Evm,
322        validator: V,
323        config: TreeConfig,
324        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
325        changeset_cache: ChangesetCache,
326        runtime: reth_tasks::Runtime,
327    ) -> Self {
328        let precompile_cache_map = PrecompileCacheMap::default();
329        let payload_processor = PayloadProcessor::new(
330            runtime.clone(),
331            evm_config.clone(),
332            &config,
333            precompile_cache_map.clone(),
334        );
335        Self {
336            provider,
337            consensus,
338            evm_config,
339            payload_processor,
340            precompile_cache_map,
341            precompile_cache_metrics: HashMap::new(),
342            config,
343            invalid_block_hook,
344            metrics: EngineApiMetrics::default(),
345            validator,
346            changeset_cache,
347            runtime,
348            custom_state_root: None,
349        }
350    }
351
352    /// Sets a custom state root computation handler.
353    pub fn with_custom_state_root(mut self, custom_state_root: CustomStateRoot<N>) -> Self {
354        self.custom_state_root = Some(custom_state_root);
355        self
356    }
357
358    /// Converts a [`BlockOrPayload`] to a recovered block.
359    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
360    pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
361        &self,
362        input: BlockOrPayload<T>,
363    ) -> Result<SealedBlock<N::Block>, NewPayloadError>
364    where
365        V: PayloadValidator<T, Block = N::Block>,
366    {
367        match input {
368            BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
369            BlockOrPayload::Block(block) => Ok(block),
370        }
371    }
372
373    /// Returns EVM environment for the given payload or block.
374    pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
375        &self,
376        input: &BlockOrPayload<T>,
377    ) -> Result<EvmEnvFor<Evm>, Evm::Error>
378    where
379        V: PayloadValidator<T, Block = N::Block>,
380        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
381    {
382        match input {
383            BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
384            BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
385        }
386    }
387
388    /// Returns [`ExecutableTxIterator`] for the given payload or block.
389    pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
390        &'a self,
391        input: &'a BlockOrPayload<T>,
392    ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
393    where
394        V: PayloadValidator<T, Block = N::Block>,
395        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
396    {
397        Ok(match input {
398            BlockOrPayload::Payload(payload) => {
399                let iter = self
400                    .evm_config
401                    .tx_iterator_for_payload(payload)
402                    .map_err(NewPayloadError::other)?;
403                Either::Left(iter)
404            }
405            BlockOrPayload::Block(block) => {
406                let txs = block.body().clone_transactions();
407                let convert = |tx: N::SignedTx| tx.try_into_recovered();
408                Either::Right((txs, convert))
409            }
410        })
411    }
412
413    /// Returns a [`ExecutionCtxFor`] for the given payload or block.
414    pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
415        &self,
416        input: &'a BlockOrPayload<T>,
417    ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
418    where
419        V: PayloadValidator<T, Block = N::Block>,
420        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
421    {
422        match input {
423            BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
424            BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
425        }
426    }
427
428    /// Validates a block that has already been converted from a payload.
429    ///
430    /// This method performs:
431    /// - Consensus validation
432    /// - Block execution
433    /// - State root computation
434    /// - Fork detection
435    #[instrument(
436        level = "debug",
437        target = "engine::tree::payload_validator",
438        skip_all,
439        fields(
440            parent = ?input.parent_hash(),
441            type_name = ?input.type_name(),
442        )
443    )]
444    pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
445        &mut self,
446        input: BlockOrPayload<T>,
447        mut ctx: TreeCtx<'_, N>,
448    ) -> InsertPayloadResult<N>
449    where
450        V: PayloadValidator<T, Block = N::Block> + Clone,
451        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
452    {
453        let parent_hash = input.parent_hash();
454        let _jit_pause = JitPauseGuard::new(&self.evm_config);
455
456        // Fetch parent block. This goes to memory most of the time unless the parent block is
457        // beyond the in-memory buffer.
458        let parent_block = match self.sealed_header_by_hash(parent_hash, ctx.state()) {
459            Ok(Some(parent_block)) => parent_block,
460            Ok(None) => {
461                return Err(InsertBlockError::new(
462                    self.convert_to_block(input)?,
463                    ProviderError::HeaderNotFound(parent_hash.into()).into(),
464                )
465                .into())
466            }
467            Err(e) => {
468                return Err(InsertBlockError::new(self.convert_to_block(input)?, e.into()).into())
469            }
470        };
471
472        // Spawn payload conversion and basic validation on a background thread so it runs
473        // concurrently with the rest of the function (setup + execution). For payloads this
474        // overlaps the cost of RLP decoding + header hashing.
475        let validated_block = self.spawn_convert_and_validate(&input, parent_block.clone());
476
477        /// A helper macro that returns the block in case there was an error
478        /// This macro is used for early returns before block conversion
479        macro_rules! ensure_ok {
480            ($expr:expr) => {
481                match $expr {
482                    Ok(val) => val,
483                    Err(e) => {
484                        let block = validated_block.try_into_inner().expect("sole handle")?;
485                        return Err(InsertBlockError::new(block, e.into()).into())
486                    }
487                }
488            };
489        }
490
491        /// A helper macro for handling errors after the input has been converted to a block
492        macro_rules! ensure_ok_post_block {
493            ($expr:expr, $block:expr) => {
494                match $expr {
495                    Ok(val) => val,
496                    Err(e) => {
497                        return Err(
498                            InsertBlockError::new($block.into_sealed_block(), e.into()).into()
499                        )
500                    }
501                }
502            };
503        }
504
505        // If the gas usage is suspiciously high (multiple times higher than parent's gas limit), be
506        // cautious and block on pre-execution checks of the block.
507        if input.gas_used() > parent_block.gas_limit() * MAX_EXPECTED_GAS_USAGE_MULTIPLIER {
508            // Call `.get()` to await the pre-execution checks and exit early if they fail.
509            if validated_block.get().is_err() {
510                return Err(validated_block
511                    .try_into_inner()
512                    .expect("sole handle")
513                    .expect_err("Err result checked"))
514            }
515        }
516
517        trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
518        let _enter =
519            debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
520        let Some(provider_builder) =
521            ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
522        else {
523            // this is pre-validated in the tree
524            return Err(InsertBlockError::new(
525                validated_block.try_into_inner().expect("sole handle")?,
526                ProviderError::HeaderNotFound(parent_hash.into()).into(),
527            )
528            .into())
529        };
530        drop(_enter);
531
532        let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
533            .in_scope(|| self.evm_env_for(&input))
534            .map_err(NewPayloadError::other)?;
535
536        // Extract the decoded BAL, if valid and available.
537        let decoded_bal = ensure_ok!(input
538            .try_decoded_access_list()
539            .map_err(|err| ConsensusError::BlockAccessListInvalid(err.to_string())))
540        .map(Arc::new);
541
542        if let Some(decoded_bal) = decoded_bal.as_deref() {
543            ensure_ok!(Self::validate_received_bal_gas(decoded_bal, input.gas_limit()));
544        }
545
546        let env = ExecutionEnv {
547            evm_env,
548            hash: input.hash(),
549            parent_hash: input.parent_hash(),
550            parent_state_root: parent_block.state_root(),
551            transaction_count: input.transaction_count(),
552            gas_used: input.gas_used(),
553            withdrawals: input.withdrawals().map(|w| w.to_vec()),
554            decoded_bal: decoded_bal.as_ref().map(Arc::clone),
555        };
556
557        // Plan the strategy used for state root computation.
558        let strategy = self.plan_state_root_computation();
559
560        debug!(
561            target: "engine::tree::payload_validator",
562            ?strategy,
563            "Decided which state root algorithm to run"
564        );
565
566        // Get an iterator over the transactions in the payload
567        let txs = self.tx_iterator_for(&input)?;
568
569        // Create overlay factory for payload processor (StateRootTask path needs it for
570        // multiproofs)
571        let provider_factory = self.provider.clone();
572        let overlay_builder = Self::overlay_builder_for_parent(
573            parent_hash,
574            ctx.state(),
575            self.changeset_cache.clone(),
576        );
577        let overlay_factory =
578            OverlayStateProviderFactory::new(provider_factory.clone(), overlay_builder.clone());
579        let changeset_provider = self.spawn_changeset_provider_task(overlay_factory.clone());
580
581        let parallel_bal_execution = ensure_ok!(self.bal_path_eligible(env.decoded_bal.as_deref()));
582
583        // Spawn the appropriate processor based on strategy
584        let mut handle = ensure_ok!(self.spawn_payload_processor(
585            env.clone(),
586            txs,
587            provider_builder.clone(),
588            overlay_factory,
589            &strategy,
590            parallel_bal_execution,
591        ));
592
593        // Create optional cache stats for detailed block logging
594        let slow_block_enabled = self.config.slow_block_threshold().is_some();
595        let cache_stats = slow_block_enabled.then(|| Arc::new(CacheStats::default()));
596        let instrument_state_provider = slow_block_enabled || self.config.state_provider_metrics();
597        let state_provider_metrics =
598            instrument_state_provider.then(|| StateProviderMetrics::with_source("engine"));
599        let state_provider_stats =
600            instrument_state_provider.then(|| Arc::new(StateProviderStats::default()));
601        let execution_cache = handle.caches().map(|caches| (caches, handle.cache_metrics()));
602
603        // This state provider factory is parametrized by:
604        //
605        // 1. fill_on_miss?
606        // 2. instrument_state_provider?
607        //
608        // `fill_on_miss` controls whether the loaded value after a cache miss will be inserted
609        // back into the cache. On a glance it seems to be always useful to do this. However,
610        // in practice, for the serial/non-BAL execution, it's not needed and is net negative:
611        //
612        // - It's not necessary because the revm machinery provides layer of caching itself. That
613        //   means a value for a miss will be recorded in revm's cache.
614        // - Inserting back into the cache is not free.
615        // - After execution, the execution post-state will be dumped into the execution cache as
616        //   whole anyway.
617        //
618        // Therefore, there `fill_on_miss` is going to be false for those paths.
619        //
620        // The second parameter `instrument_state_provider` controls whether we should
621        // instrument the state provider with metrics.
622        let make_state_provider = |fill_on_miss: bool| -> ProviderResult<StateProviderBox> {
623            let provider = provider_builder.build()?;
624            let mut provider = if let Some((caches, cache_metrics)) = &execution_cache {
625                let fill_mode = if fill_on_miss {
626                    CacheFillMode::FillOnMiss
627                } else {
628                    CacheFillMode::LookupOnly
629                };
630                Box::new(CachedStateProvider::new_with_mode(
631                    provider,
632                    caches.clone(),
633                    fill_mode,
634                    cache_metrics.clone(),
635                    cache_stats.clone(),
636                )) as StateProviderBox
637            } else {
638                provider
639            };
640
641            if instrument_state_provider {
642                let stats = state_provider_stats
643                    .as_ref()
644                    .expect("instrumented state provider requires shared stats");
645                let metrics = state_provider_metrics
646                    .as_ref()
647                    .expect("instrumented state provider requires metrics");
648                provider = Box::new(InstrumentedStateProvider::with_stats(
649                    provider,
650                    metrics.clone(),
651                    Arc::clone(stats),
652                ));
653            }
654
655            Ok(provider)
656        };
657
658        // Execute the block and handle any execution errors.
659        // The receipt root task is spawned before execution and receives receipts incrementally
660        // as transactions complete, allowing parallel computation during execution.
661        let execute_block_start = Instant::now();
662        let execution_result = if parallel_bal_execution {
663            self.execute_block_bal(env, &input, &handle, &make_state_provider)
664        } else {
665            let state_provider = make_state_provider(false);
666            match state_provider {
667                Ok(state_provider) => self.execute_block(state_provider, env, &input, &mut handle),
668                Err(err) => Err(err.into()),
669            }
670        };
671        let execution_duration = execute_block_start.elapsed();
672        if let (Some(metrics), Some(stats)) = (&state_provider_metrics, &state_provider_stats) {
673            metrics.record_totals(stats);
674        }
675        let (output, senders, receipt_root_rx, built_bal) = ensure_ok!(execution_result);
676
677        // After executing the block we can stop prewarming transactions
678        handle.stop_prewarming_execution();
679
680        // Create ExecutionOutcome early so we can terminate caching before validation and state
681        // root computation. Using Arc allows sharing with both the caching task and the deferred
682        // trie task without cloning the expensive BundleState.
683        let output = Arc::new(output);
684
685        // Terminate caching task early since execution is complete and caching is no longer
686        // needed. This frees up resources while state root computation continues.
687        let valid_block_tx = handle.terminate_caching(Some(output.clone()));
688
689        // Spawn hashed post state computation in background so it runs concurrently with
690        // block conversion and receipt root computation. This is a pure CPU-bound task
691        // (keccak256 hashing of all changed addresses and storage slots).
692        let hashed_state_output = output.clone();
693        let hashed_state_provider = self.provider.clone();
694        let mut hashed_state_rx = handle.take_hashed_state_rx();
695        let mut hashed_state: LazyHashedPostState =
696            self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
697                let _span = debug_span!(
698                    target: "engine::tree::payload_validator",
699                    "hashed_post_state",
700                )
701                .entered();
702                let state = if let Some(Ok(state)) = hashed_state_rx.as_mut().map(|rx| rx.recv()) {
703                    state
704                } else {
705                    hashed_state_provider.hashed_post_state(&hashed_state_output.state)
706                };
707                Arc::new(state)
708            });
709
710        let block = validated_block.try_into_inner().expect("sole handle")?;
711        let block = block.with_senders(senders);
712
713        // Wait for the receipt root computation to complete.
714        let receipt_root_bloom = {
715            let _enter = debug_span!(
716                target: "engine::tree::payload_validator",
717                "wait_receipt_root",
718            )
719            .entered();
720
721            receipt_root_rx
722                .blocking_recv()
723                .inspect_err(|_| {
724                    tracing::error!(
725                        target: "engine::tree::payload_validator",
726                        "Receipt root task dropped sender without result, receipt root calculation likely aborted"
727                    );
728                })
729                .ok()
730        };
731
732        ensure_ok_post_block!(
733            self.validate_post_execution(
734                &block,
735                &parent_block,
736                &output,
737                &mut ctx,
738                receipt_root_bloom,
739                built_bal
740            ),
741            block
742        );
743
744        // Run the hashed state validation hook but don't propagate the error yet. If the state root
745        // task fails, we might need to re-run this check against a fallback state.
746        let mut hashed_state_validate_result = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").in_scope(|| {
747            // Wait for the background keccak256 hashing task to complete. This blocks until
748            // all changed addresses and storage slots have been hashed.
749            let hashed_state_ref =
750                debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
751                    .in_scope(|| hashed_state.get());
752
753            self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, &block)
754        });
755
756        let root_time = Instant::now();
757        let mut maybe_state_root = None;
758        let mut state_root_task_failed = false;
759        #[cfg(feature = "trie-debug")]
760        let mut trie_debug_recorders = Vec::new();
761
762        match strategy {
763            StateRootStrategy::StateRootTask => {
764                debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
765
766                let task_result = ensure_ok_post_block!(
767                    self.await_state_root_with_timeout(
768                        &mut handle,
769                        provider_builder.clone(),
770                        output.clone(),
771                    ),
772                    block
773                );
774
775                let maybe_new_hashed_state = match task_result {
776                    Ok((
777                        StateRootComputeOutcome {
778                            state_root,
779                            trie_updates,
780                            #[cfg(feature = "trie-debug")]
781                            debug_recorders,
782                        },
783                        maybe_new_hashed_state,
784                    )) => {
785                        let elapsed = root_time.elapsed();
786                        info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
787
788                        #[cfg(feature = "trie-debug")]
789                        {
790                            trie_debug_recorders = debug_recorders;
791                        }
792
793                        // Compare trie updates with serial computation if configured
794                        if self.config.always_compare_trie_updates() {
795                            let _has_diff = self.compare_trie_updates_with_serial(
796                                provider_builder.clone(),
797                                provider_factory,
798                                overlay_builder,
799                                &output,
800                                trie_updates.as_ref().clone(),
801                            );
802                            #[cfg(feature = "trie-debug")]
803                            if _has_diff {
804                                Self::write_trie_debug_recorders(
805                                    block.header().number(),
806                                    &trie_debug_recorders,
807                                );
808                            }
809                        }
810
811                        // we double check the state root here for good measure
812                        if state_root == block.header().state_root() {
813                            maybe_state_root = Some((state_root, trie_updates, elapsed))
814                        } else {
815                            warn!(
816                                target: "engine::tree::payload_validator",
817                                ?state_root,
818                                block_state_root = ?block.header().state_root(),
819                                "State root task returned incorrect state root"
820                            );
821                            #[cfg(feature = "trie-debug")]
822                            Self::write_trie_debug_recorders(
823                                block.header().number(),
824                                &trie_debug_recorders,
825                            );
826                            state_root_task_failed = true;
827                        }
828
829                        maybe_new_hashed_state
830                    }
831                    Err(error) => {
832                        debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
833                        state_root_task_failed = true;
834                        None
835                    }
836                };
837
838                // If the state root task failed or we got a new hashed state from the fallback that
839                // won the race, we need to replace the hashed state handle and re-run the
840                // validation.
841                if maybe_new_hashed_state.is_some() || state_root_task_failed {
842                    hashed_state = maybe_new_hashed_state.unwrap_or_else(|| {
843                        LazyHandle::ready(Arc::new(self.provider.hashed_post_state(&output.state)))
844                    });
845                    hashed_state_validate_result =
846                        self.validator.validate_block_post_execution_with_hashed_state(
847                            hashed_state.get(),
848                            &block,
849                        );
850                }
851            }
852            StateRootStrategy::Parallel => {
853                debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
854                match self.compute_state_root_parallel(
855                    provider_factory,
856                    overlay_builder,
857                    &hashed_state,
858                ) {
859                    Ok(result) => {
860                        let elapsed = root_time.elapsed();
861                        info!(
862                            target: "engine::tree::payload_validator",
863                            regular_state_root = ?result.0,
864                            ?elapsed,
865                            "Regular root task finished"
866                        );
867                        maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
868                    }
869                    Err(error) => {
870                        debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
871                    }
872                }
873            }
874            StateRootStrategy::Synchronous => {}
875            StateRootStrategy::Custom(custom) => {
876                let (state_root, trie_updates) = ensure_ok_post_block!(
877                    custom(CustomStateRootInput {
878                        block: &block,
879                        parent_block: &parent_block,
880                        output: &output,
881                        hashed_state: &hashed_state,
882                    }),
883                    block
884                );
885                maybe_state_root = Some((state_root, Arc::new(trie_updates), root_time.elapsed()));
886            }
887        }
888
889        // Determine the state root.
890        // If the state root was computed in parallel, we use it.
891        // Otherwise, we fall back to computing it synchronously.
892        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
893            maybe_state_root
894        {
895            maybe_state_root
896        } else {
897            // fallback is to compute the state root regularly in sync
898            if self.config.state_root_fallback() {
899                debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
900            } else {
901                warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
902                self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
903            }
904
905            let (root, updates) = ensure_ok_post_block!(
906                provider_builder
907                    .build()
908                    .and_then(|provider| Self::compute_state_root_serial(provider, &hashed_state)),
909                block
910            );
911
912            if state_root_task_failed {
913                self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
914            }
915
916            (root, Arc::new(updates), root_time.elapsed())
917        };
918
919        if let Err(err) = hashed_state_validate_result {
920            // call post-block hook
921            self.on_invalid_block(&parent_block, &block, &output, None, ctx.state_mut());
922            return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into())
923        }
924
925        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
926        self.metrics
927            .record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
928        debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
929
930        // ensure state root matches
931        if state_root != block.header().state_root() {
932            #[cfg(feature = "trie-debug")]
933            Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
934
935            // call post-block hook
936            self.on_invalid_block(
937                &parent_block,
938                &block,
939                &output,
940                Some((&trie_output, state_root)),
941                ctx.state_mut(),
942            );
943            let block_state_root = block.header().state_root();
944            return Err(InsertBlockError::new(
945                block.into_sealed_block(),
946                ConsensusError::BodyStateRootDiff(
947                    GotExpected { got: state_root, expected: block_state_root }.into(),
948                )
949                .into(),
950            )
951            .into())
952        }
953
954        let timing_stats = state_provider_stats.filter(|_| slow_block_enabled).map(|stats| {
955            self.calculate_timing_stats(
956                &block,
957                stats,
958                cache_stats,
959                &output,
960                execution_duration,
961                root_elapsed,
962            )
963        });
964
965        if let Some(valid_block_tx) = valid_block_tx {
966            let _ = valid_block_tx.send(());
967        }
968
969        let changeset_provider = ensure_ok_post_block!(
970            changeset_provider
971                .try_into_inner()
972                .ok()
973                .expect("changeset provider handle is not cloned"),
974            block
975        );
976
977        let executed_block = self.spawn_deferred_trie_task(
978            Arc::new(block),
979            output,
980            hashed_state,
981            trie_output,
982            changeset_provider,
983        );
984        let raw_bal = decoded_bal.map(|decoded_bal| decoded_bal.as_raw_bal().clone());
985        Ok(ValidationOutput::new(executed_block, timing_stats).with_raw_bal(raw_bal))
986    }
987
988    /// Spawns a background task to convert a [`BlockOrPayload`] into a [`SealedBlock`] and perform
989    /// basic consensus validations on it.
990    #[expect(clippy::type_complexity)]
991    pub fn spawn_convert_and_validate<T>(
992        &self,
993        input: &BlockOrPayload<T>,
994        parent: SealedHeader<N::BlockHeader>,
995    ) -> LazyHandle<Result<SealedBlock<N::Block>, InsertPayloadError<N::Block>>>
996    where
997        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
998        V: PayloadValidator<T, Block = N::Block> + Clone,
999    {
1000        let input = input.clone();
1001        let validator = self.validator.clone();
1002        let consensus = self.consensus.clone();
1003        let parent_span = Span::current();
1004        self.payload_processor.executor().spawn_blocking_named("payload-convert", move || {
1005            let _span = debug_span!(
1006                target: "engine::tree::payload_validator",
1007                parent: parent_span,
1008                "convert_and_validate",
1009            )
1010            .entered();
1011            let block = match input {
1012                BlockOrPayload::Block(block) => block,
1013                BlockOrPayload::Payload(payload) => {
1014                    validator.convert_payload_to_block(payload)?
1015                }
1016            };
1017
1018            if let Err(e) = consensus.validate_header(block.sealed_header()) {
1019                error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
1020                return Err(InsertBlockError::consensus_error(e, block).into())
1021            }
1022
1023            // now validate against the parent
1024            let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
1025            if let Err(e) = consensus.validate_header_against_parent(block.sealed_header(), &parent)
1026            {
1027                warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
1028                return Err(InsertBlockError::consensus_error(e, block).into())
1029            }
1030            drop(_enter);
1031
1032            if let Err(e) =
1033                consensus.validate_block_pre_execution_with_tx_root(&block, None)
1034            {
1035                error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
1036                return Err(InsertBlockError::consensus_error(e, block).into())
1037            }
1038
1039            Ok(block)
1040        })
1041    }
1042
1043    /// Spawns a background task that creates the changeset provider used by the deferred trie
1044    /// producer.
1045    ///
1046    /// This is started before execution so overlay construction can run concurrently with payload
1047    /// validation, then awaited before the deferred trie producer is spawned.
1048    fn spawn_changeset_provider_task(
1049        &self,
1050        overlay_factory: OverlayStateProviderFactory<P, N>,
1051    ) -> LazyHandle<ProviderResult<OverlayStateProvider<P::Provider>>> {
1052        let parent_span = Span::current();
1053        self.payload_processor.executor().spawn_blocking_named(
1054            DEFERRED_TRIE_WORKER_NAME,
1055            move || {
1056                let _span = debug_span!(
1057                    target: "engine::tree::payload_validator",
1058                    parent: parent_span,
1059                    "changeset_provider",
1060                )
1061                .entered();
1062                overlay_factory.database_provider_ro()
1063            },
1064        )
1065    }
1066
1067    /// Return sealed block header from database or in-memory state by hash.
1068    fn sealed_header_by_hash(
1069        &self,
1070        hash: B256,
1071        state: &EngineApiTreeState<N>,
1072    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1073        // check memory first
1074        let header = state.tree_state.sealed_header_by_hash(&hash);
1075
1076        if header.is_some() {
1077            Ok(header)
1078        } else {
1079            self.provider.sealed_header_by_hash(hash)
1080        }
1081    }
1082
1083    fn validate_received_bal_gas(
1084        decoded_bal: &DecodedBal,
1085        gas_limit: u64,
1086    ) -> Result<(), ConsensusError> {
1087        decoded_bal.as_bal().validate_gas_limit(gas_limit).map_err(ConsensusError::from)
1088    }
1089
1090    /// Executes a block with the given state provider.
1091    ///
1092    /// This method orchestrates block execution:
1093    /// 1. Sets up the EVM with state database and precompile caching
1094    /// 2. Spawns a background task for incremental receipt root computation
1095    /// 3. Executes transactions with metrics collection via state hooks
1096    /// 4. Merges state transitions and records execution metrics
1097    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1098    #[expect(clippy::type_complexity)]
1099    fn execute_block<S, Err, T>(
1100        &mut self,
1101        state_provider: S,
1102        env: ExecutionEnv<Evm>,
1103        input: &BlockOrPayload<T>,
1104        handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
1105    ) -> Result<
1106        (
1107            BlockExecutionOutput<N::Receipt>,
1108            Vec<Address>,
1109            ReceiptRootReceiver,
1110            Option<BlockAccessList>,
1111        ),
1112        InsertBlockErrorKind,
1113    >
1114    where
1115        S: StateProvider + Send,
1116        Err: core::error::Error + Send + Sync + 'static,
1117        V: PayloadValidator<T, Block = N::Block>,
1118        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1119        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
1120    {
1121        debug!(target: "engine::tree::payload_validator", "Executing block");
1122
1123        let has_bal = env.decoded_bal.is_some();
1124        let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
1125            State::builder()
1126                .with_database(StateProviderDatabase::new(state_provider))
1127                .with_bundle_update()
1128                .with_bal_builder_if(has_bal)
1129                .build()
1130        });
1131
1132        let (spec_id, mut executor) = {
1133            let _span = debug_span!(target: "engine::tree", "create_evm").entered();
1134            let spec_id = *env.evm_env.spec_id();
1135            let evm_config = self.evm_config.clone().with_jit_support();
1136            let evm = evm_config.evm_with_env(&mut db, env.evm_env);
1137            let ctx = self
1138                .execution_ctx_for(input)
1139                .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
1140            let executor = self.evm_config.create_executor(evm, ctx);
1141            (spec_id, executor)
1142        };
1143
1144        if !self.config.precompile_cache_disabled() {
1145            let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
1146            executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
1147                |address, precompile| {
1148                    let metrics = self
1149                        .precompile_cache_metrics
1150                        .entry(*address)
1151                        .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
1152                        .clone();
1153                    CachedPrecompile::wrap(
1154                        precompile,
1155                        self.precompile_cache_map.cache_for_address(*address),
1156                        spec_id,
1157                        Some(metrics),
1158                    )
1159                },
1160            );
1161        }
1162
1163        let transaction_count = input.transaction_count();
1164        let (receipt_tx, result_rx) = self.spawn_receipt_root_task(transaction_count);
1165        let executed_tx_index = Arc::clone(handle.executed_tx_index());
1166        executor.evm_mut().db_mut().set_state_hook(
1167            handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook + 'static>),
1168        );
1169
1170        let execution_start = Instant::now();
1171
1172        // Execute all transactions and finalize
1173        let (executor, senders) = self.execute_transactions(
1174            executor,
1175            transaction_count,
1176            handle.iter_transactions(),
1177            &receipt_tx,
1178            &executed_tx_index,
1179            has_bal,
1180        )?;
1181        drop(receipt_tx);
1182
1183        // Finish execution and get the result
1184        let post_exec_start = Instant::now();
1185        let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
1186            .in_scope(|| executor.finish())
1187            .map(|(evm, result)| (evm.into_db(), result))?;
1188        self.metrics.record_post_execution(post_exec_start.elapsed());
1189
1190        // Merge transitions into bundle state
1191        debug_span!(target: "engine::tree", "merge_transitions")
1192            .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
1193
1194        let built_bal = if has_bal { db.take_built_alloy_bal() } else { None };
1195        let output = BlockExecutionOutput { result, state: db.take_bundle() };
1196
1197        let execution_duration = execution_start.elapsed();
1198        self.metrics.record_block_execution(&output, execution_duration);
1199        self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1200        debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
1201
1202        Ok((output, senders, result_rx, built_bal))
1203    }
1204
1205    /// Returns true when the BAL execute path should be used for this block.
1206    // TODO: extend with stronger gating before enabling on mainnet:
1207    //   - Fork check: `Amsterdam.active_at_timestamp(env.evm_env.timestamp)`. Today a BAL only
1208    //     exists post-Amsterdam, so the BAL-presence check is a sufficient proxy. It is a proxy,
1209    //     not a guarantee.
1210    //   - Tx-count threshold (`bal_execute_path_min_tx_count`): below the parallelism break-even
1211    //     point, provider setup and worker scheduling overhead can exceed the gain. Tune
1212    //     empirically once workers are parallel; meaningless while the commit loop is sequential.
1213    fn bal_path_eligible(&self, bal: Option<&DecodedBal>) -> Result<bool, InsertBlockErrorKind> {
1214        let has_bal = bal.is_some();
1215        let parallel_execution = has_bal && !self.config.disable_bal_parallel_execution();
1216        if parallel_execution && self.config.disable_bal_parallel_state_root() {
1217            return Err(InsertBlockErrorKind::Other(
1218                "disabling parallel state root is impossible when parallel execution is enabled"
1219                    .into(),
1220            ));
1221        }
1222
1223        Ok(parallel_execution)
1224    }
1225
1226    /// Executes the block on the BAL path. Mirrors the return shape of [`Self::execute_block`]
1227    /// so the dispatch site stays uniform.
1228    ///
1229    /// Inside, this:
1230    /// 1. Creates a shared parent-state cache handle for provider-backed workers.
1231    /// 2. Relies on BAL prewarm to stream sparse-trie updates and optional state prefetches.
1232    /// 3. Spawns the receipt-root task.
1233    /// 4. Calls [`crate::tree::payload_processor::bal::execute_block`].
1234    /// 5. Returns the rebuilt BAL for post-execution consensus validation.
1235    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1236    #[expect(clippy::type_complexity)]
1237    fn execute_block_bal<Tx, Err, MakeStateProvider, T>(
1238        &self,
1239        env: ExecutionEnv<Evm>,
1240        input: &BlockOrPayload<T>,
1241        handle: &PayloadHandle<Tx, Err, N::Receipt>,
1242        make_state_provider: &MakeStateProvider,
1243    ) -> Result<
1244        (
1245            BlockExecutionOutput<N::Receipt>,
1246            Vec<Address>,
1247            ReceiptRootReceiver,
1248            Option<BlockAccessList>,
1249        ),
1250        InsertBlockErrorKind,
1251    >
1252    where
1253        Tx: ExecutableTxFor<Evm> + Send,
1254        Err: core::error::Error + Send + Sync + 'static,
1255        MakeStateProvider: Fn(bool) -> ProviderResult<StateProviderBox> + Sync,
1256        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
1257        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1258        V: PayloadValidator<T, Block = N::Block>,
1259    {
1260        debug!(target: "engine::tree::payload_validator", "Executing block via BAL path");
1261
1262        let (receipt_tx, result_rx) = self.spawn_receipt_root_task(env.transaction_count);
1263        let input_bal = env.decoded_bal.ok_or_else(|| {
1264            InsertBlockErrorKind::Other("BAL execute path: no decoded BAL available".into())
1265        })?;
1266
1267        let make_db = |fill_on_miss| {
1268            let provider = make_state_provider(fill_on_miss)
1269                .map_err(crate::tree::payload_processor::bal::BalExecutionError::Provider)?;
1270            Ok(StateProviderDatabase::new(provider))
1271        };
1272        let execution_start = Instant::now();
1273        let ctx =
1274            self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
1275        let (output, senders, built_bal) = crate::tree::payload_processor::bal::execute_block(
1276            &self.runtime,
1277            &self.evm_config,
1278            &make_db,
1279            input_bal,
1280            env.evm_env,
1281            ctx,
1282            env.transaction_count,
1283            handle.clone_transaction_receiver(),
1284            receipt_tx,
1285        )?;
1286        let execution_duration = execution_start.elapsed();
1287
1288        self.metrics.record_block_execution(&output, execution_duration);
1289        self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1290        debug!(
1291            target: "engine::tree::payload_validator",
1292            elapsed = ?execution_duration,
1293            "Executed block via BAL path",
1294        );
1295
1296        Ok((output, senders, result_rx, Some(built_bal)))
1297    }
1298
1299    fn spawn_receipt_root_task(
1300        &self,
1301        receipts_len: usize,
1302    ) -> (ReceiptRootSender<N>, ReceiptRootReceiver) {
1303        // Unbounded channel is used since tx count bounds capacity anyway.
1304        let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
1305        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
1306        let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
1307        self.payload_processor
1308            .executor()
1309            .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
1310
1311        (receipt_tx, result_rx)
1312    }
1313
1314    /// Executes transactions and collects senders, streaming receipts to a background task.
1315    ///
1316    /// This method handles:
1317    /// - Applying pre-execution changes (e.g., beacon root updates)
1318    /// - Executing each transaction with timing metrics
1319    /// - Streaming receipts to the receipt root computation task
1320    /// - Collecting transaction senders for later use
1321    ///
1322    /// Returns the executor (for finalization) and the collected senders.
1323    fn execute_transactions<'a, E, Tx, InnerTx, Err, DB>(
1324        &self,
1325        mut executor: E,
1326        transaction_count: usize,
1327        transactions: impl Iterator<Item = Result<Tx, Err>>,
1328        receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
1329        executed_tx_index: &AtomicUsize,
1330        has_bal: bool,
1331    ) -> Result<(E, Vec<Address>), BlockExecutionError>
1332    where
1333        E: BlockExecutor<Receipt = N::Receipt, Evm: alloy_evm::Evm<DB = &'a mut State<DB>>>,
1334        Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
1335        InnerTx: TxHashRef,
1336        DB: revm::Database + 'a,
1337        Err: core::error::Error + Send + Sync + 'static,
1338    {
1339        let mut senders = Vec::with_capacity(transaction_count);
1340
1341        // Apply pre-execution changes (e.g., beacon root update)
1342        let pre_exec_start = Instant::now();
1343        debug_span!(target: "engine::tree", "pre_execution")
1344            .in_scope(|| executor.apply_pre_execution_changes())?;
1345        self.metrics.record_pre_execution(pre_exec_start.elapsed());
1346
1347        // Bump BAL index after pre-execution changes (EIP-7928: index 0 is pre-execution)
1348        if has_bal {
1349            executor.evm_mut().db_mut().bump_bal_index();
1350        }
1351
1352        // Execute transactions
1353        let exec_span = debug_span!(target: "engine::tree", "execution").entered();
1354        let mut transactions = transactions.into_iter();
1355        // Some executors may execute transactions that do not append receipts during the
1356        // main loop (e.g., system transactions whose receipts are added during finalization).
1357        // In that case, invoking the callback on every transaction would resend the previous
1358        // receipt with the same index and can panic the ordered root builder.
1359        let mut last_sent_len = 0usize;
1360        loop {
1361            // Measure time spent waiting for next transaction from iterator
1362            // (e.g., parallel signature recovery)
1363            let wait_start = Instant::now();
1364            let Some(tx_result) = transactions.next() else { break };
1365            self.metrics.record_transaction_wait(wait_start.elapsed());
1366
1367            let tx = tx_result.map_err(BlockExecutionError::other)?;
1368            let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
1369
1370            senders.push(tx_signer);
1371
1372            let _enter = tracing::enabled!(target: "engine::tree", Level::TRACE).then(|| {
1373                tracing::trace_span!(
1374                    target: "engine::tree",
1375                    "execute tx",
1376                    tx_index = senders.len() - 1,
1377                )
1378                .entered()
1379            });
1380            if tracing::enabled!(target: "engine::tree", Level::TRACE) {
1381                trace!(target: "engine::tree", "Executing transaction");
1382            }
1383
1384            let tx_start = Instant::now();
1385            executor.execute_transaction(tx)?;
1386            self.metrics.record_transaction_execution(tx_start.elapsed());
1387
1388            // advance the shared counter so prewarm workers skip already-executed txs
1389            executed_tx_index.store(senders.len(), Ordering::Relaxed);
1390
1391            let current_len = executor.receipts().len();
1392            if current_len > last_sent_len {
1393                last_sent_len = current_len;
1394                // Send the latest receipt to the background task for incremental root computation.
1395                if let Some(receipt) = executor.receipts().last() {
1396                    let tx_index = current_len - 1;
1397                    let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
1398                }
1399            }
1400            // Bump BAL index after each transaction (EIP-7928)
1401            if has_bal {
1402                executor.evm_mut().db_mut().bump_bal_index();
1403            }
1404        }
1405
1406        drop(exec_span);
1407
1408        Ok((executor, senders))
1409    }
1410
1411    /// Compute state root for the given hashed post state in parallel.
1412    ///
1413    /// Uses an overlay factory which provides the state of the parent block, along with the
1414    /// [`HashedPostState`] containing the changes of this block, to compute the state root and
1415    /// trie updates for this block.
1416    ///
1417    /// # Returns
1418    ///
1419    /// Returns `Ok(_)` if computed successfully.
1420    /// Returns `Err(_)` if error was encountered during computation.
1421    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1422    fn compute_state_root_parallel(
1423        &self,
1424        provider_factory: P,
1425        overlay_builder: OverlayBuilder<N>,
1426        hashed_state: &LazyHashedPostState,
1427    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
1428        let hashed_state = hashed_state.get();
1429        // The `hashed_state` argument will be taken into account as part of the overlay, but we
1430        // need to use the prefix sets which were generated from it to indicate to the
1431        // ParallelStateRoot which parts of the trie need to be recomputed.
1432        let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1433        let overlay_builder =
1434            overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1435        let overlay_factory = OverlayStateProviderFactory::new(provider_factory, overlay_builder);
1436        ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
1437            .incremental_root_with_updates()
1438    }
1439
1440    /// Compute state root for the given hashed post state in serial.
1441    ///
1442    /// Uses the same provider construction path as main execution and computes the state root and
1443    /// trie updates for this block directly via
1444    /// [`reth_provider::StateRootProvider::state_root_with_updates`].
1445    fn compute_state_root_serial(
1446        state_provider: StateProviderBox,
1447        hashed_state: &LazyHashedPostState,
1448    ) -> ProviderResult<(B256, TrieUpdates)> {
1449        state_provider.state_root_with_updates(hashed_state.get().as_ref().clone())
1450    }
1451
1452    /// Awaits the state root from the background task, with an optional timeout fallback.
1453    ///
1454    /// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
1455    /// state root task up to the timeout duration. If the task doesn't complete in time, a
1456    /// sequential state root computation is spawned via `spawn_blocking`. Both computations
1457    /// then race: the main thread polls the task receiver and the sequential result channel
1458    /// in a loop, returning whichever finishes first.
1459    ///
1460    /// If no timeout is configured, this simply awaits the state root task without any fallback.
1461    ///
1462    /// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
1463    /// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
1464    /// `Result` captures parallel state root task errors that can still fall back to serial.
1465    #[instrument(
1466        level = "debug",
1467        target = "engine::tree::payload_validator",
1468        name = "await_state_root",
1469        skip_all
1470    )]
1471    fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
1472        &self,
1473        handle: &mut PayloadHandle<Tx, Err, R>,
1474        state_provider_builder: StateProviderBuilder<N, P>,
1475        output: Arc<BlockExecutionOutput<R>>,
1476    ) -> ProviderResult<
1477        Result<(StateRootComputeOutcome, Option<LazyHashedPostState>), ParallelStateRootError>,
1478    > {
1479        let Some(timeout) = self.config.state_root_task_timeout() else {
1480            return Ok(handle.state_root().map(|outcome| (outcome, None)));
1481        };
1482
1483        let task_rx = handle.take_state_root_rx();
1484
1485        match task_rx.recv_timeout(timeout) {
1486            Ok(result) => Ok(result.map(|outcome| (outcome, None))),
1487            Err(RecvTimeoutError::Disconnected) => {
1488                Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
1489            }
1490            Err(RecvTimeoutError::Timeout) => {
1491                warn!(
1492                    target: "engine::tree::payload_validator",
1493                    ?timeout,
1494                    "State root task timed out, spawning sequential fallback"
1495                );
1496                self.metrics.block_validation.state_root_task_timeout_total.increment(1);
1497
1498                let (seq_tx, seq_rx) = std::sync::mpsc::channel();
1499
1500                self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
1501                    let result = state_provider_builder.build().and_then(|provider| {
1502                        let hashed_state =
1503                            LazyHandle::ready(Arc::new(provider.hashed_post_state(&output.state)));
1504                        let (state_root, trie_updates) =
1505                            Self::compute_state_root_serial(provider, &hashed_state)?;
1506
1507                        Ok((state_root, trie_updates, hashed_state))
1508                    });
1509                    let _ = seq_tx.send(result);
1510                });
1511
1512                const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
1513
1514                loop {
1515                    match task_rx.recv_timeout(POLL_INTERVAL) {
1516                        Ok(result) => {
1517                            debug!(
1518                                target: "engine::tree::payload_validator",
1519                                source = "task",
1520                                "State root timeout race won"
1521                            );
1522                            return Ok(result.map(|outcome| (outcome, None)));
1523                        }
1524                        Err(RecvTimeoutError::Disconnected) => {
1525                            debug!(
1526                                target: "engine::tree::payload_validator",
1527                                "State root task dropped, waiting for sequential fallback"
1528                            );
1529                            let result = seq_rx.recv().map_err(|_| {
1530                                ProviderError::other(std::io::Error::other(
1531                                    "both state root computations failed",
1532                                ))
1533                            })?;
1534                            let (state_root, trie_updates, hashed_state) = result?;
1535                            return Ok(Ok((
1536                                StateRootComputeOutcome {
1537                                    state_root,
1538                                    trie_updates: Arc::new(trie_updates),
1539                                    #[cfg(feature = "trie-debug")]
1540                                    debug_recorders: Vec::new(),
1541                                },
1542                                Some(hashed_state),
1543                            )));
1544                        }
1545                        Err(RecvTimeoutError::Timeout) => {}
1546                    }
1547
1548                    if let Ok(result) = seq_rx.try_recv() {
1549                        debug!(
1550                            target: "engine::tree::payload_validator",
1551                            source = "sequential",
1552                            "State root timeout race won"
1553                        );
1554                        let (state_root, trie_updates, hashed_state) = result?;
1555                        return Ok(Ok((
1556                            StateRootComputeOutcome {
1557                                state_root,
1558                                trie_updates: Arc::new(trie_updates),
1559                                #[cfg(feature = "trie-debug")]
1560                                debug_recorders: Vec::new(),
1561                            },
1562                            Some(hashed_state),
1563                        )));
1564                    }
1565                }
1566            }
1567        }
1568    }
1569
1570    /// Compares trie updates from the state root task with serial state root computation.
1571    ///
1572    /// This is used for debugging and validating the correctness of the parallel state root
1573    /// task implementation. When enabled via `--engine.state-root-task-compare-updates`, this
1574    /// method runs a separate serial state root computation and compares the resulting trie
1575    /// updates.
1576    fn compare_trie_updates_with_serial(
1577        &self,
1578        state_provider_builder: StateProviderBuilder<N, P>,
1579        provider_factory: P,
1580        overlay_builder: OverlayBuilder<N>,
1581        output: &BlockExecutionOutput<N::Receipt>,
1582        task_trie_updates: TrieUpdates,
1583    ) -> bool {
1584        debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1585
1586        match state_provider_builder.build().and_then(|provider| {
1587            let hashed_state = Arc::new(provider.hashed_post_state(&output.state));
1588            Self::compute_state_root_serial(provider, &LazyHandle::ready(hashed_state))
1589        }) {
1590            Ok((serial_root, serial_trie_updates)) => {
1591                debug!(
1592                    target: "engine::tree::payload_validator",
1593                    ?serial_root,
1594                    "Serial state root computation finished for comparison"
1595                );
1596
1597                // Get a database provider to use as trie cursor factory
1598                let overlay_factory =
1599                    OverlayStateProviderFactory::new(provider_factory, overlay_builder);
1600                match overlay_factory.database_provider_ro() {
1601                    Ok(provider) => {
1602                        match super::trie_updates::compare_trie_updates(
1603                            &provider,
1604                            task_trie_updates,
1605                            serial_trie_updates,
1606                        ) {
1607                            Ok(has_diff) => return has_diff,
1608                            Err(err) => {
1609                                warn!(
1610                                    target: "engine::tree::payload_validator",
1611                                    %err,
1612                                    "Error comparing trie updates"
1613                                );
1614                                return true;
1615                            }
1616                        }
1617                    }
1618                    Err(err) => {
1619                        warn!(
1620                            target: "engine::tree::payload_validator",
1621                            %err,
1622                            "Failed to get database provider for trie update comparison"
1623                        );
1624                    }
1625                }
1626            }
1627            Err(err) => {
1628                warn!(
1629                    target: "engine::tree::payload_validator",
1630                    %err,
1631                    "Failed to compute serial state root for comparison"
1632                );
1633            }
1634        }
1635        false
1636    }
1637
1638    /// Writes trie debug recorders to a JSON file for the given block number.
1639    ///
1640    /// The file is written to the current working directory as
1641    /// `trie_debug_block_{block_number}.json`.
1642    #[cfg(feature = "trie-debug")]
1643    fn write_trie_debug_recorders(
1644        block_number: u64,
1645        recorders: &[(Option<B256>, TrieDebugRecorder)],
1646    ) {
1647        let path = format!("trie_debug_block_{block_number}.json");
1648        match serde_json::to_string_pretty(recorders) {
1649            Ok(json) => match std::fs::write(&path, json) {
1650                Ok(()) => {
1651                    warn!(
1652                        target: "engine::tree::payload_validator",
1653                        %path,
1654                        "Wrote trie debug recorders to file"
1655                    );
1656                }
1657                Err(err) => {
1658                    warn!(
1659                        target: "engine::tree::payload_validator",
1660                        %err,
1661                        %path,
1662                        "Failed to write trie debug recorders"
1663                    );
1664                }
1665            },
1666            Err(err) => {
1667                warn!(
1668                    target: "engine::tree::payload_validator",
1669                    %err,
1670                    "Failed to serialize trie debug recorders"
1671                );
1672            }
1673        }
1674    }
1675
1676    /// Validates the block after execution.
1677    ///
1678    /// This performs:
1679    /// - parent header validation
1680    /// - post-execution consensus validation
1681    /// - state-root based post-execution validation
1682    ///
1683    /// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
1684    /// and logs bloom from the receipts.
1685    ///
1686    /// The `hashed_state` handle wraps the background hashed post state computation.
1687    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1688    fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1689        &self,
1690        block: &RecoveredBlock<N::Block>,
1691        parent_block: &SealedHeader<N::BlockHeader>,
1692        output: &BlockExecutionOutput<N::Receipt>,
1693        ctx: &mut TreeCtx<'_, N>,
1694        receipt_root_bloom: Option<ReceiptRootBloom>,
1695        built_bal: Option<BlockAccessList>,
1696    ) -> Result<(), InsertBlockErrorKind>
1697    where
1698        V: PayloadValidator<T, Block = N::Block>,
1699    {
1700        let start = Instant::now();
1701
1702        trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1703
1704        // Validate block post-execution rules
1705        let _enter =
1706            debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1707                .entered();
1708        let block_access_list_hash =
1709            built_bal.as_ref().map(|bal| compute_block_access_list_hash(bal));
1710
1711        if let Err(err) = self.consensus.validate_block_post_execution(
1712            block,
1713            output,
1714            receipt_root_bloom,
1715            block_access_list_hash,
1716        ) {
1717            // call post-block hook
1718            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1719            return Err(err.into())
1720        }
1721        drop(_enter);
1722
1723        // record post-execution validation duration
1724        self.metrics
1725            .block_validation
1726            .post_execution_validation_duration
1727            .record(start.elapsed().as_secs_f64());
1728
1729        Ok(())
1730    }
1731
1732    /// Spawns a payload processor task based on the state root strategy.
1733    ///
1734    /// This method determines how to execute the block and compute its state root based on
1735    /// the selected strategy:
1736    /// - `StateRootTask`: Uses a dedicated task for state root computation with proof generation
1737    /// - `Parallel`: Computes state root in parallel with block execution
1738    /// - `Synchronous`: Falls back to sequential execution and state root computation
1739    ///
1740    /// The method handles strategy fallbacks if the preferred approach fails, ensuring
1741    /// block execution always completes with a valid state root.
1742    ///
1743    /// # Arguments
1744    ///
1745    /// * `overlay_factory` - Pre-computed overlay factory for multiproof generation
1746    ///   (`StateRootTask`)
1747    #[instrument(
1748        level = "debug",
1749        target = "engine::tree::payload_validator",
1750        skip_all,
1751        fields(?strategy, parallel_bal_execution)
1752    )]
1753    fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1754        &mut self,
1755        env: ExecutionEnv<Evm>,
1756        txs: T,
1757        provider_builder: StateProviderBuilder<N, P>,
1758        overlay_factory: OverlayStateProviderFactory<P, N>,
1759        strategy: &StateRootStrategy<N>,
1760        parallel_bal_execution: bool,
1761    ) -> Result<
1762        PayloadHandle<
1763            impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1764            impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1765            N::Receipt,
1766        >,
1767        InsertBlockErrorKind,
1768    > {
1769        match strategy {
1770            StateRootStrategy::StateRootTask => {
1771                let spawn_start = Instant::now();
1772
1773                // Use the pre-computed overlay factory for multiproofs
1774                let handle = self.payload_processor.spawn(
1775                    env,
1776                    txs,
1777                    provider_builder,
1778                    overlay_factory,
1779                    &self.config,
1780                    parallel_bal_execution,
1781                );
1782
1783                // record prewarming initialization duration
1784                self.metrics
1785                    .block_validation
1786                    .spawn_payload_processor
1787                    .record(spawn_start.elapsed().as_secs_f64());
1788
1789                Ok(handle)
1790            }
1791            StateRootStrategy::Parallel |
1792            StateRootStrategy::Synchronous |
1793            StateRootStrategy::Custom(_) => {
1794                let start = Instant::now();
1795                let handle = self.payload_processor.spawn_cache_exclusive(
1796                    env,
1797                    txs,
1798                    provider_builder,
1799                    parallel_bal_execution,
1800                );
1801
1802                // Record prewarming initialization duration
1803                self.metrics
1804                    .block_validation
1805                    .spawn_payload_processor
1806                    .record(start.elapsed().as_secs_f64());
1807
1808                Ok(handle)
1809            }
1810        }
1811    }
1812
1813    /// Creates a `StateProviderBuilder` for the given parent hash.
1814    ///
1815    /// This method checks if the parent is in the tree state (in-memory) or persisted to disk,
1816    /// and creates the appropriate provider builder.
1817    fn state_provider_builder(
1818        &self,
1819        hash: B256,
1820        state: &EngineApiTreeState<N>,
1821    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1822        if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1823            debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1824            // the block leads back to the canonical chain
1825            return Ok(Some(StateProviderBuilder::new(
1826                self.provider.clone(),
1827                historical,
1828                Some(blocks),
1829            )))
1830        }
1831
1832        // Check if the block is persisted
1833        if let Some(header) = self.provider.header(hash)? {
1834            debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1835            // For persisted blocks, we create a builder that will fetch state directly from the
1836            // database
1837            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1838        }
1839
1840        debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1841        Ok(None)
1842    }
1843
1844    /// Determines the state root computation strategy based on configuration.
1845    ///
1846    /// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
1847    /// too expensive because it requires walking all paths in every proof.
1848    fn plan_state_root_computation(&self) -> StateRootStrategy<N> {
1849        if let Some(custom_state_root) = &self.custom_state_root {
1850            StateRootStrategy::Custom(custom_state_root.clone())
1851        } else if self.config.state_root_fallback() {
1852            StateRootStrategy::Synchronous
1853        } else if self.config.use_state_root_task() {
1854            StateRootStrategy::StateRootTask
1855        } else {
1856            StateRootStrategy::Parallel
1857        }
1858    }
1859
1860    /// Called when an invalid block is encountered during validation.
1861    fn on_invalid_block(
1862        &self,
1863        parent_header: &SealedHeader<N::BlockHeader>,
1864        block: &RecoveredBlock<N::Block>,
1865        output: &BlockExecutionOutput<N::Receipt>,
1866        trie_updates: Option<(&TrieUpdates, B256)>,
1867        state: &mut EngineApiTreeState<N>,
1868    ) {
1869        if state.invalid_headers.get(&block.hash()).is_some() {
1870            // we already marked this block as invalid
1871            return
1872        }
1873        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1874    }
1875
1876    /// Returns an overlay builder configured for a payload parent.
1877    fn overlay_builder_for_parent(
1878        parent_hash: B256,
1879        state: &EngineApiTreeState<N>,
1880        changeset_cache: ChangesetCache,
1881    ) -> OverlayBuilder<N> {
1882        OverlayBuilder::new(parent_hash, changeset_cache)
1883            .with_state_trie_overlay_manager(state.tree_state.state_trie_overlays.clone())
1884    }
1885
1886    /// Spawns a background task to compute and sort trie data for the executed block.
1887    ///
1888    /// This function creates a [`DeferredTrieData`] handle and spawns a blocking task that:
1889    /// 1. Sort the block's hashed state and trie updates
1890    /// 2. Publishes the result so subsequent calls return immediately
1891    ///
1892    /// If the background task hasn't completed when `trie_data()` is called, callers wait for the
1893    /// publishing task instead of computing synchronously.
1894    ///
1895    /// The validation hot path can return immediately after state root verification,
1896    /// while consumers (DB writes, overlay providers, proofs) get trie data from the completed
1897    /// task.
1898    fn spawn_deferred_trie_task(
1899        &self,
1900        block: Arc<RecoveredBlock<N::Block>>,
1901        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1902        hashed_state: LazyHashedPostState,
1903        trie_output: Arc<TrieUpdates>,
1904        changeset_provider: impl TrieCursorFactory + Send + 'static,
1905    ) -> ExecutedBlock<N> {
1906        // Create deferred handle and task that owns the unsorted inputs.
1907        // Resolve the lazy handle into Arc<HashedPostState>. By this point the hashed state has
1908        // already been computed and used for state root verification, so .get() returns instantly.
1909        let hashed_state = match hashed_state.try_into_inner() {
1910            Ok(state) => state,
1911            Err(handle) => handle.get().clone(),
1912        };
1913        let (deferred_trie_data, deferred_trie_task) =
1914            DeferredTrieData::pending(hashed_state, trie_output);
1915        let block_validation_metrics = self.metrics.block_validation.clone();
1916
1917        // Capture block info and cache handle for changeset computation
1918        let block_hash = block.hash();
1919        let block_number = block.number();
1920
1921        // Register a pending changeset entry so that concurrent readers will wait for
1922        // this computation to finish rather than falling back to the expensive DB path.
1923        // The guard ensures the pending entry is cancelled if the task panics.
1924        let pending_changeset_guard = self.changeset_cache.register_pending(block_hash);
1925
1926        // Spawn background task to compute trie data. The task publishes the sorted result before
1927        // computing changesets, so trie data waiters do not block on changeset computation.
1928        let compute_trie_input_task = move || {
1929            let _span = debug_span!(
1930                target: "engine::tree::payload_validator",
1931                "compute_trie_input_task",
1932                block_number
1933            )
1934            .entered();
1935
1936            let result = panic::catch_unwind(AssertUnwindSafe(|| {
1937                let compute_start = Instant::now();
1938                let computed = deferred_trie_task.compute_and_publish();
1939                block_validation_metrics
1940                    .deferred_trie_compute_duration
1941                    .record(compute_start.elapsed().as_secs_f64());
1942
1943                // Record sizes of the computed trie data
1944                block_validation_metrics
1945                    .hashed_post_state_size
1946                    .record(computed.hashed_state.total_len() as f64);
1947                block_validation_metrics
1948                    .trie_updates_sorted_size
1949                    .record(computed.trie_updates.total_len() as f64);
1950                // Compute and cache changesets using the computed trie_updates.
1951                // Use the pre-created provider to avoid races with changeset cache
1952                // eviction that can happen between task spawn and execution.
1953                let changeset_start = Instant::now();
1954
1955                match reth_trie::changesets::compute_trie_changesets(
1956                    &changeset_provider,
1957                    &computed.trie_updates,
1958                ) {
1959                    Ok(changesets) => {
1960                        debug!(
1961                            target: "engine::tree::changeset",
1962                            ?block_number,
1963                            elapsed = ?changeset_start.elapsed(),
1964                            "Computed and caching changesets"
1965                        );
1966
1967                        pending_changeset_guard.resolve(block_number, Arc::new(changesets));
1968                    }
1969                    Err(e) => {
1970                        warn!(
1971                            target: "engine::tree::changeset",
1972                            ?block_number,
1973                            ?e,
1974                            "Failed to compute changesets for deferred trie producer"
1975                        );
1976                    }
1977                }
1978            }));
1979
1980            if result.is_err() {
1981                error!(
1982                    target: "engine::tree::payload_validator",
1983                    "Deferred trie task panicked"
1984                );
1985            }
1986        };
1987
1988        // Spawn task that computes trie data asynchronously.
1989        self.payload_processor
1990            .executor()
1991            .spawn_blocking_named(DEFERRED_TRIE_WORKER_NAME, compute_trie_input_task);
1992
1993        ExecutedBlock::with_deferred_trie_data(block, execution_outcome, deferred_trie_data)
1994    }
1995
1996    fn calculate_timing_stats(
1997        &self,
1998        block: &RecoveredBlock<N::Block>,
1999        provider_stats: Arc<StateProviderStats>,
2000        cache_stats: Option<Arc<CacheStats>>,
2001        output: &BlockExecutionOutput<N::Receipt>,
2002        execution_duration: Duration,
2003        state_hash_duration: Duration,
2004    ) -> Box<ExecutionTimingStats> {
2005        let accounts_read = provider_stats.total_account_fetches();
2006        let storage_read = provider_stats.total_storage_fetches();
2007        let code_read = provider_stats.total_code_fetches();
2008        let code_bytes_read = provider_stats.total_code_fetched_bytes();
2009
2010        // Write stats from BundleState (final state changes)
2011        let accounts_changed = output.state.state.len();
2012        let accounts_deleted =
2013            output.state.state.values().filter(|acc| acc.was_destroyed()).count();
2014        let storage_slots_changed =
2015            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
2016        let storage_slots_deleted = output
2017            .state
2018            .state
2019            .values()
2020            .flat_map(|account| account.storage.values())
2021            .filter(|slot| {
2022                slot.present_value.is_zero() && !slot.previous_or_original_value.is_zero()
2023            })
2024            .count();
2025
2026        // Helper: check if account represents a new contract deployment
2027        let is_new_deployment = |acc: &BundleAccount| -> bool {
2028            let has_code_now = acc.info.as_ref().is_some_and(|info| info.code_hash != KECCAK_EMPTY);
2029            let had_no_code_before = acc
2030                .original_info
2031                .as_ref()
2032                .map(|info| info.code_hash == KECCAK_EMPTY)
2033                .unwrap_or(true);
2034            has_code_now && had_no_code_before
2035        };
2036
2037        let bytecodes_changed =
2038            output.state.state.values().filter(|acc| is_new_deployment(acc)).count();
2039
2040        // Unique new code hashes to count actual bytes persisted (deduplicated)
2041        let unique_new_code_hashes: B256Set = output
2042            .state
2043            .state
2044            .values()
2045            .filter(|acc| is_new_deployment(acc))
2046            .filter_map(|acc| acc.info.as_ref().map(|info| info.code_hash))
2047            .collect();
2048        let code_bytes_written: usize = unique_new_code_hashes
2049            .iter()
2050            .filter_map(|hash| {
2051                output.state.contracts.get(hash).map(|bytecode| bytecode.original_bytes().len())
2052            })
2053            .sum();
2054
2055        // Total time spent fetching state during execution
2056        let state_read_duration = provider_stats.total_account_fetch_latency() +
2057            provider_stats.total_storage_fetch_latency() +
2058            provider_stats.total_code_fetch_latency();
2059
2060        // EIP-7702 delegation tracking from bytecode changes
2061        // Count new EIP-7702 bytecodes as delegations set
2062        let eip7702_delegations_set =
2063            output.state.contracts.values().filter(|bytecode| bytecode.is_eip7702()).count();
2064        // Delegations cleared: accounts where bytecode changed FROM EIP-7702 TO empty
2065        // This detects when an EIP-7702 delegation is removed by setting code to empty
2066        // Note: Clearing a delegation does NOT destroy the account - it just empties the
2067        // bytecode
2068        let eip7702_delegations_cleared = output
2069            .state
2070            .state
2071            .values()
2072            .filter(|acc| {
2073                // Check if original bytecode was EIP-7702
2074                let original_was_eip7702 = acc
2075                    .original_info
2076                    .as_ref()
2077                    .and_then(|info| info.code.as_ref())
2078                    .map(|bytecode| bytecode.is_eip7702())
2079                    .unwrap_or(false);
2080
2081                // Check if current code is empty (delegation cleared)
2082                let code_now_empty =
2083                    acc.info.as_ref().map(|info| info.code_hash == KECCAK_EMPTY).unwrap_or(false);
2084
2085                original_was_eip7702 && code_now_empty
2086            })
2087            .count();
2088
2089        // Get cache statistics for detailed block logging
2090        let (account_cache_hits, account_cache_misses) = cache_stats
2091            .as_ref()
2092            .map(|s| (s.account_hits(), s.account_misses()))
2093            .unwrap_or_default();
2094        let (storage_cache_hits, storage_cache_misses) = cache_stats
2095            .as_ref()
2096            .map(|s| (s.storage_hits(), s.storage_misses()))
2097            .unwrap_or_default();
2098        let (code_cache_hits, code_cache_misses) =
2099            cache_stats.as_ref().map(|s| (s.code_hits(), s.code_misses())).unwrap_or_default();
2100
2101        // Build execution timing stats for detailed block logging
2102        Box::new(ExecutionTimingStats {
2103            block_number: block.number(),
2104            block_hash: block.hash(),
2105            gas_used: output.result.gas_used,
2106            tx_count: block.transaction_count(),
2107            execution_duration,
2108            state_read_duration,
2109            state_hash_duration,
2110            accounts_read,
2111            storage_read,
2112            code_read,
2113            code_bytes_read,
2114            accounts_changed,
2115            accounts_deleted,
2116            storage_slots_changed,
2117            storage_slots_deleted,
2118            bytecodes_changed,
2119            code_bytes_written,
2120            eip7702_delegations_set,
2121            eip7702_delegations_cleared,
2122            account_cache_hits,
2123            account_cache_misses,
2124            storage_cache_hits,
2125            storage_cache_misses,
2126            code_cache_hits,
2127            code_cache_misses,
2128        })
2129    }
2130}
2131
2132/// Strategy describing how to compute the state root.
2133#[derive(derive_more::Debug, Clone)]
2134enum StateRootStrategy<N: NodePrimitives> {
2135    /// Use the state root task (background sparse trie computation).
2136    StateRootTask,
2137    /// Run the parallel state root computation on the calling thread.
2138    Parallel,
2139    /// Fall back to synchronous computation via the state provider.
2140    Synchronous,
2141    /// Custom state root computation strategy.
2142    Custom(#[debug(skip)] CustomStateRoot<N>),
2143}
2144
2145/// Type that validates the payloads processed by the engine.
2146///
2147/// This provides the necessary functions for validating/executing payloads/blocks.
2148pub trait EngineValidator<
2149    Types: PayloadTypes,
2150    N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
2151>: Send + Sync + 'static
2152{
2153    /// Validates the payload attributes with respect to the header.
2154    ///
2155    /// By default, this enforces that the payload attributes timestamp is greater than the
2156    /// timestamp according to:
2157    ///   > 7. Client software MUST ensure that payloadAttributes.timestamp is greater than
2158    ///   > timestamp
2159    ///   > of a block referenced by forkchoiceState.headBlockHash.
2160    ///
2161    /// See also: <https://github.com/ethereum/execution-apis/blob/main/src/engine/common.md#specification-1>
2162    fn validate_payload_attributes_against_header(
2163        &self,
2164        attr: &Types::PayloadAttributes,
2165        header: &N::BlockHeader,
2166    ) -> Result<(), InvalidPayloadAttributesError>;
2167
2168    /// Ensures that the given payload does not violate any consensus rules that concern the block's
2169    /// layout.
2170    ///
2171    /// This function must convert the payload into the executable block and pre-validate its
2172    /// fields.
2173    ///
2174    /// Implementers should ensure that the checks are done in the order that conforms with the
2175    /// engine-API specification.
2176    fn convert_payload_to_block(
2177        &self,
2178        payload: Types::ExecutionData,
2179    ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
2180
2181    /// Validates a payload received from engine API.
2182    fn validate_payload(
2183        &mut self,
2184        payload: Types::ExecutionData,
2185        ctx: TreeCtx<'_, N>,
2186    ) -> ValidationOutcome<N>;
2187
2188    /// Validates a block downloaded from the network.
2189    fn validate_block(
2190        &mut self,
2191        block: SealedBlock<N::Block>,
2192        ctx: TreeCtx<'_, N>,
2193    ) -> ValidationOutcome<N>;
2194
2195    /// Hook called after an executed block is inserted directly into the tree.
2196    ///
2197    /// This is invoked when blocks are inserted via `InsertExecutedBlock` (e.g., locally built
2198    /// blocks by sequencers) to allow implementations to update internal state such as caches.
2199    fn on_inserted_executed_block(
2200        &self,
2201        block: BuiltPayloadExecutedBlock<N>,
2202        state: &EngineApiTreeState<N>,
2203    ) -> ProviderResult<ExecutedBlock<N>>;
2204
2205    /// Returns [`SavedCache`] for the given block hash.
2206    fn cache_for(&self, _block_hash: B256) -> Option<SavedCache>;
2207
2208    /// Spawns a sparse trie pipeline and returns a handle for the payload builder.
2209    fn sparse_trie_handle_for(
2210        &self,
2211        parent_hash: B256,
2212        parent_state_root: B256,
2213        state: &EngineApiTreeState<N>,
2214    ) -> Option<StateRootHandle>;
2215}
2216
2217impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
2218where
2219    P: DatabaseProviderFactory<
2220            Provider: BlockReader
2221                          + StageCheckpointReader
2222                          + PruneCheckpointReader
2223                          + ChangeSetReader
2224                          + StorageChangeSetReader
2225                          + BlockNumReader
2226                          + StorageSettingsCache,
2227        > + BlockReader<Header = N::BlockHeader>
2228        + StateProviderFactory
2229        + StateReader
2230        + ChangeSetReader
2231        + BlockNumReader
2232        + HashedPostStateProvider
2233        + Clone
2234        + 'static,
2235    N: NodePrimitives,
2236    V: PayloadValidator<Types, Block = N::Block> + Clone,
2237    Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
2238    Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
2239{
2240    fn validate_payload_attributes_against_header(
2241        &self,
2242        attr: &Types::PayloadAttributes,
2243        header: &N::BlockHeader,
2244    ) -> Result<(), InvalidPayloadAttributesError> {
2245        self.validator.validate_payload_attributes_against_header(attr, header)
2246    }
2247
2248    fn convert_payload_to_block(
2249        &self,
2250        payload: Types::ExecutionData,
2251    ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
2252        let block = self.validator.convert_payload_to_block(payload)?;
2253        Ok(block)
2254    }
2255
2256    fn validate_payload(
2257        &mut self,
2258        payload: Types::ExecutionData,
2259        ctx: TreeCtx<'_, N>,
2260    ) -> ValidationOutcome<N> {
2261        self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
2262    }
2263
2264    fn validate_block(
2265        &mut self,
2266        block: SealedBlock<N::Block>,
2267        ctx: TreeCtx<'_, N>,
2268    ) -> ValidationOutcome<N> {
2269        self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
2270    }
2271
2272    fn on_inserted_executed_block(
2273        &self,
2274        block: BuiltPayloadExecutedBlock<N>,
2275        state: &EngineApiTreeState<N>,
2276    ) -> ProviderResult<ExecutedBlock<N>> {
2277        self.payload_processor.on_inserted_executed_block(
2278            block.recovered_block.block_with_parent(),
2279            &block.execution_output.state,
2280        );
2281
2282        let overlay_factory = OverlayStateProviderFactory::new(
2283            self.provider.clone(),
2284            Self::overlay_builder_for_parent(
2285                block.recovered_block.parent_hash(),
2286                state,
2287                self.changeset_cache.clone(),
2288            ),
2289        );
2290        let changeset_provider = overlay_factory.database_provider_ro()?;
2291
2292        Ok(self.spawn_deferred_trie_task(
2293            block.recovered_block,
2294            block.execution_output,
2295            LazyHashedPostState::ready(block.hashed_state),
2296            block.trie_updates,
2297            changeset_provider,
2298        ))
2299    }
2300
2301    fn cache_for(&self, block_hash: B256) -> Option<SavedCache> {
2302        Some(self.payload_processor.cache_for(block_hash))
2303    }
2304
2305    fn sparse_trie_handle_for(
2306        &self,
2307        parent_hash: B256,
2308        parent_state_root: B256,
2309        state: &EngineApiTreeState<N>,
2310    ) -> Option<StateRootHandle> {
2311        let overlay_factory = OverlayStateProviderFactory::new(
2312            self.provider.clone(),
2313            Self::overlay_builder_for_parent(parent_hash, state, self.changeset_cache.clone()),
2314        );
2315
2316        Some(self.payload_processor.spawn_state_root(
2317            overlay_factory,
2318            parent_state_root,
2319            // Full proof workers — tx count unknown at FCU time (block built incrementally)
2320            false,
2321            &self.config,
2322        ))
2323    }
2324}
2325
2326impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
2327where
2328    Evm: ConfigureEvm,
2329{
2330    fn wait_for_caches(&self) -> CacheWaitDurations {
2331        self.payload_processor.wait_for_caches()
2332    }
2333}
2334
2335/// Enum representing either block or payload being validated.
2336#[derive(Debug, Clone)]
2337pub enum BlockOrPayload<T: PayloadTypes> {
2338    /// Payload.
2339    Payload(T::ExecutionData),
2340    /// Block.
2341    Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
2342}
2343
2344impl<T: PayloadTypes> BlockOrPayload<T> {
2345    /// Returns the hash of the block.
2346    pub fn hash(&self) -> B256 {
2347        match self {
2348            Self::Payload(payload) => payload.block_hash(),
2349            Self::Block(block) => block.hash(),
2350        }
2351    }
2352
2353    /// Returns the number and hash of the block.
2354    pub fn num_hash(&self) -> NumHash {
2355        match self {
2356            Self::Payload(payload) => payload.num_hash(),
2357            Self::Block(block) => block.num_hash(),
2358        }
2359    }
2360
2361    /// Returns the parent hash of the block.
2362    pub fn parent_hash(&self) -> B256 {
2363        match self {
2364            Self::Payload(payload) => payload.parent_hash(),
2365            Self::Block(block) => block.parent_hash(),
2366        }
2367    }
2368
2369    /// Returns [`BlockWithParent`] for the block.
2370    pub fn block_with_parent(&self) -> BlockWithParent {
2371        match self {
2372            Self::Payload(payload) => payload.block_with_parent(),
2373            Self::Block(block) => block.block_with_parent(),
2374        }
2375    }
2376
2377    /// Returns a string showing whether or not this is a block or payload.
2378    pub const fn type_name(&self) -> &'static str {
2379        match self {
2380            Self::Payload(_) => "payload",
2381            Self::Block(_) => "block",
2382        }
2383    }
2384
2385    /// Returns true if this is a payload.
2386    pub const fn is_payload(&self) -> bool {
2387        matches!(self, Self::Payload(_))
2388    }
2389
2390    /// Returns true if this is a block.
2391    pub const fn is_block(&self) -> bool {
2392        matches!(self, Self::Block(_))
2393    }
2394
2395    /// Returns the decoded block access list, if present and successfully decoded.
2396    pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
2397        match self {
2398            Self::Payload(payload) => payload
2399                .block_access_list()
2400                .map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
2401                .transpose(),
2402            Self::Block(_) => Ok(None),
2403        }
2404    }
2405
2406    /// Returns the number of transactions in the payload or block.
2407    pub fn transaction_count(&self) -> usize
2408    where
2409        T::ExecutionData: ExecutionPayload,
2410    {
2411        match self {
2412            Self::Payload(payload) => payload.transaction_count(),
2413            Self::Block(block) => block.transaction_count(),
2414        }
2415    }
2416
2417    /// Returns the withdrawals from the payload or block.
2418    pub fn withdrawals(&self) -> Option<&[Withdrawal]>
2419    where
2420        T::ExecutionData: ExecutionPayload,
2421    {
2422        match self {
2423            Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
2424            Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
2425        }
2426    }
2427
2428    /// Returns the total gas used by the block.
2429    pub fn gas_used(&self) -> u64
2430    where
2431        T::ExecutionData: ExecutionPayload,
2432    {
2433        match self {
2434            Self::Payload(payload) => payload.gas_used(),
2435            Self::Block(block) => block.gas_used(),
2436        }
2437    }
2438
2439    /// Returns the gas limit used by the block.
2440    pub fn gas_limit(&self) -> u64
2441    where
2442        T::ExecutionData: ExecutionPayload,
2443    {
2444        match self {
2445            Self::Payload(payload) => payload.gas_limit(),
2446            Self::Block(block) => block.gas_limit(),
2447        }
2448    }
2449}
2450
2451/// Input for [`CustomStateRoot`].
2452#[derive(Debug, Clone)]
2453pub struct CustomStateRootInput<'a, N: NodePrimitives> {
2454    /// The block being validated.
2455    pub block: &'a SealedBlock<N::Block>,
2456    /// The parent block.
2457    pub parent_block: &'a SealedHeader<N::BlockHeader>,
2458    /// The execution output.
2459    pub output: &'a BlockExecutionOutput<N::Receipt>,
2460    /// The hashed state.
2461    pub hashed_state: &'a LazyHashedPostState,
2462}
2463
2464/// A custom state root computation handler.
2465pub type CustomStateRoot<N> = Arc<
2466    dyn Fn(CustomStateRootInput<'_, N>) -> ProviderResult<(B256, TrieUpdates)>
2467        + Send
2468        + Sync
2469        + 'static,
2470>;