Skip to main content

reth_engine_tree/tree/
payload_validator.rs

1//! Types and traits for validating blocks and payloads.
2//!
3//! # Validation pipeline
4//!
5//! When the engine processes a new payload (`engine_newPayload`), validation happens in phases:
6//!
7//! ## Phase 1 – Payload conversion
8//! [`PayloadValidator::convert_payload_to_block`] decodes the execution payload (RLP, hashing)
9//! into a [`SealedBlock`]. This runs on a background thread concurrently with state setup.
10//!
11//! ## Phase 2 – Pre-execution consensus
12//! - [`HeaderValidator::validate_header`] — standalone header checks (hash, gas, base fee,
13//!   fork-specific fields)
14//! - [`Consensus::validate_block_pre_execution`] — body vs header (tx root, ommer hash, withdrawals
15//!   root)
16//! - [`HeaderValidator::validate_header_against_parent`] — sequential checks (block number,
17//!   timestamp, gas limit, base fee vs parent)
18//!
19//! ## Phase 3 – Execution
20//! Block transactions are executed via the EVM. Receipt roots are computed incrementally.
21//!
22//! ## Phase 4 – Post-execution consensus
23//! - [`FullConsensus::validate_block_post_execution`] — gas used, receipt root, logs bloom,
24//!   requests hash
25//! - [`PayloadValidator::validate_block_post_execution_with_hashed_state`] — network-specific
26//!   (no-op on L1, used by OP Stack)
27//!
28//! ## Payload attributes validation (`engine_forkchoiceUpdated`)
29//! When the CL provides payload attributes to start building a block:
30//! - [`PayloadValidator::validate_payload_attributes_against_header`] — ensures timestamp ordering
31//!
32//! If validation passes, a payload build job is started. If it fails,
33//! `INVALID_PAYLOAD_ATTRIBUTES` is returned without rolling back the forkchoice update.
34//!
35//! [`HeaderValidator::validate_header`]: reth_consensus::HeaderValidator::validate_header
36//! [`Consensus::validate_block_pre_execution`]: reth_consensus::Consensus::validate_block_pre_execution
37//! [`HeaderValidator::validate_header_against_parent`]: reth_consensus::HeaderValidator::validate_header_against_parent
38//! [`FullConsensus::validate_block_post_execution`]: reth_consensus::FullConsensus::validate_block_post_execution
39//! [`SealedBlock`]: reth_primitives_traits::SealedBlock
40
41use crate::tree::{
42    error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
43    instrumented_state::{InstrumentedStateProvider, StateProviderStats},
44    multiproof::{StateRootComputeOutcome, StateRootHandle},
45    payload_processor::PayloadProcessor,
46    precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
47    types::{InsertPayloadResult, ValidationOutput},
48    CacheWaitDurations, CachedStateProvider, EngineApiMetrics, EngineApiTreeState, ExecutionEnv,
49    PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
50};
51use alloy_consensus::transaction::{Either, TxHashRef};
52use alloy_eip7928::bal::DecodedBal;
53use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
54use alloy_evm::Evm;
55use alloy_primitives::{map::B256Set, B256};
56use reth_tasks::LazyHandle;
57#[cfg(feature = "trie-debug")]
58use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
59
60use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
61use reth_chain_state::{
62    CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, ExecutionTimingStats, LazyOverlay,
63};
64use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
65use reth_engine_primitives::{
66    ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
67};
68use reth_errors::{BlockExecutionError, ProviderResult};
69use reth_evm::{
70    block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
71    OnStateHook, SpecFor,
72};
73use reth_execution_cache::{CacheStats, SavedCache};
74use reth_payload_primitives::{
75    BuiltPayload, BuiltPayloadExecutedBlock, InvalidPayloadAttributesError, NewPayloadError,
76    PayloadTypes,
77};
78use reth_primitives_traits::{
79    AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
80    RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
81};
82use reth_provider::{
83    providers::{OverlayBuilder, OverlayStateProviderFactory},
84    BlockExecutionOutput, BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory,
85    DatabaseProviderROFactory, HashedPostStateProvider, ProviderError, PruneCheckpointReader,
86    StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateReader,
87    StorageChangeSetReader, StorageSettingsCache,
88};
89use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
90use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState};
91use reth_trie_db::ChangesetCache;
92use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
93use revm_primitives::{Address, KECCAK_EMPTY};
94use std::{
95    collections::HashMap,
96    panic::{self, AssertUnwindSafe},
97    sync::{
98        atomic::{AtomicUsize, Ordering},
99        mpsc::RecvTimeoutError,
100        Arc,
101    },
102    time::Duration,
103};
104use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
105
106pub use crate::tree::types::ValidationOutcome;
107
108/// Handle to a [`HashedPostState`] computed on a background thread.
109type LazyHashedPostState = reth_tasks::LazyHandle<Arc<HashedPostState>>;
110
111/// Multiplier over the parent's gas limit beyond which a block's claimed gas usage cannot be
112/// legitimate. Gas limit can change by at most 1/1024 per block, so anything over this is rejected
113/// without entering execution.
114const MAX_EXPECTED_GAS_USAGE_MULTIPLIER: u64 = 2;
115
116type ReceiptRootSender<N> =
117    crossbeam_channel::Sender<IndexedReceipt<<N as NodePrimitives>::Receipt>>;
118type ReceiptRootReceiver = tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>;
119
120/// Context providing access to tree state during validation.
121///
122/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
123/// internals
124pub struct TreeCtx<'a, N: NodePrimitives> {
125    /// The engine API tree state
126    state: &'a mut EngineApiTreeState<N>,
127    /// Reference to the canonical in-memory state
128    canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
129}
130
131impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("TreeCtx")
134            .field("state", &"EngineApiTreeState")
135            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
136            .finish()
137    }
138}
139
140impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
141    /// Creates a new tree context
142    pub const fn new(
143        state: &'a mut EngineApiTreeState<N>,
144        canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
145    ) -> Self {
146        Self { state, canonical_in_memory_state }
147    }
148}
149
150impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
151    /// Returns a reference to the engine tree state
152    pub const fn state(&self) -> &EngineApiTreeState<N> {
153        &*self.state
154    }
155
156    /// Returns a mutable reference to the engine tree state
157    pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
158        self.state
159    }
160
161    /// Returns a reference to the canonical in-memory state
162    pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
163        self.canonical_in_memory_state
164    }
165}
166
167/// A helper type that provides reusable payload validation logic for network-specific validators.
168///
169/// This type satisfies [`EngineValidator`] and is responsible for executing blocks/payloads.
170///
171/// This type contains common validation, execution, and state root computation logic that can be
172/// used by network-specific payload validators (e.g., Ethereum, Optimism). It is not meant to be
173/// used as a standalone component, but rather as a building block for concrete implementations.
174#[derive(derive_more::Debug)]
175pub struct BasicEngineValidator<P, Evm, V>
176where
177    Evm: ConfigureEvm,
178{
179    /// Provider for database access.
180    provider: P,
181    /// Consensus implementation for validation.
182    consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
183    /// EVM configuration.
184    evm_config: Evm,
185    /// Configuration for the tree.
186    config: TreeConfig,
187    /// Payload processor for state root computation.
188    payload_processor: PayloadProcessor<Evm>,
189    /// Precompile cache map.
190    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
191    /// Precompile cache metrics.
192    precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
193    /// Hook to call when invalid blocks are encountered.
194    #[debug(skip)]
195    invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
196    /// Metrics for the engine api.
197    metrics: EngineApiMetrics,
198    /// Validator for the payload.
199    validator: V,
200    /// Changeset cache for in-memory trie changesets
201    changeset_cache: ChangesetCache,
202    /// Task runtime for spawning parallel work.
203    runtime: reth_tasks::Runtime,
204    /// Custom state root computation function.
205    custom_state_root: Option<CustomStateRoot<Evm::Primitives>>,
206}
207
208impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
209where
210    N: NodePrimitives,
211    P: DatabaseProviderFactory<
212            Provider: BlockReader
213                          + StageCheckpointReader
214                          + PruneCheckpointReader
215                          + ChangeSetReader
216                          + StorageChangeSetReader
217                          + BlockNumReader
218                          + StorageSettingsCache,
219        > + BlockReader<Header = N::BlockHeader>
220        + ChangeSetReader
221        + BlockNumReader
222        + StateProviderFactory
223        + StateReader
224        + HashedPostStateProvider
225        + Clone
226        + 'static,
227    Evm: ConfigureEvm<Primitives = N> + 'static,
228{
229    /// Creates a new `TreePayloadValidator`.
230    #[expect(clippy::too_many_arguments)]
231    pub fn new(
232        provider: P,
233        consensus: Arc<dyn FullConsensus<N>>,
234        evm_config: Evm,
235        validator: V,
236        config: TreeConfig,
237        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
238        changeset_cache: ChangesetCache,
239        runtime: reth_tasks::Runtime,
240    ) -> Self {
241        let precompile_cache_map = PrecompileCacheMap::default();
242        let payload_processor = PayloadProcessor::new(
243            runtime.clone(),
244            evm_config.clone(),
245            &config,
246            precompile_cache_map.clone(),
247        );
248        Self {
249            provider,
250            consensus,
251            evm_config,
252            payload_processor,
253            precompile_cache_map,
254            precompile_cache_metrics: HashMap::new(),
255            config,
256            invalid_block_hook,
257            metrics: EngineApiMetrics::default(),
258            validator,
259            changeset_cache,
260            runtime,
261            custom_state_root: None,
262        }
263    }
264
265    /// Sets a custom state root computation handler.
266    pub fn with_custom_state_root(mut self, custom_state_root: CustomStateRoot<N>) -> Self {
267        self.custom_state_root = Some(custom_state_root);
268        self
269    }
270
271    /// Converts a [`BlockOrPayload`] to a recovered block.
272    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
273    pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
274        &self,
275        input: BlockOrPayload<T>,
276    ) -> Result<SealedBlock<N::Block>, NewPayloadError>
277    where
278        V: PayloadValidator<T, Block = N::Block>,
279    {
280        match input {
281            BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
282            BlockOrPayload::Block(block) => Ok(block),
283        }
284    }
285
286    /// Returns EVM environment for the given payload or block.
287    pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
288        &self,
289        input: &BlockOrPayload<T>,
290    ) -> Result<EvmEnvFor<Evm>, Evm::Error>
291    where
292        V: PayloadValidator<T, Block = N::Block>,
293        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
294    {
295        match input {
296            BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
297            BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
298        }
299    }
300
301    /// Returns [`ExecutableTxIterator`] for the given payload or block.
302    pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
303        &'a self,
304        input: &'a BlockOrPayload<T>,
305    ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
306    where
307        V: PayloadValidator<T, Block = N::Block>,
308        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
309    {
310        Ok(match input {
311            BlockOrPayload::Payload(payload) => {
312                let iter = self
313                    .evm_config
314                    .tx_iterator_for_payload(payload)
315                    .map_err(NewPayloadError::other)?;
316                Either::Left(iter)
317            }
318            BlockOrPayload::Block(block) => {
319                let txs = block.body().clone_transactions();
320                let convert = |tx: N::SignedTx| tx.try_into_recovered();
321                Either::Right((txs, convert))
322            }
323        })
324    }
325
326    /// Returns a [`ExecutionCtxFor`] for the given payload or block.
327    pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
328        &self,
329        input: &'a BlockOrPayload<T>,
330    ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
331    where
332        V: PayloadValidator<T, Block = N::Block>,
333        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
334    {
335        match input {
336            BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
337            BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
338        }
339    }
340
341    /// Validates a block that has already been converted from a payload.
342    ///
343    /// This method performs:
344    /// - Consensus validation
345    /// - Block execution
346    /// - State root computation
347    /// - Fork detection
348    #[instrument(
349        level = "debug",
350        target = "engine::tree::payload_validator",
351        skip_all,
352        fields(
353            parent = ?input.parent_hash(),
354            type_name = ?input.type_name(),
355        )
356    )]
357    pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
358        &mut self,
359        input: BlockOrPayload<T>,
360        mut ctx: TreeCtx<'_, N>,
361    ) -> InsertPayloadResult<N>
362    where
363        V: PayloadValidator<T, Block = N::Block> + Clone,
364        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
365    {
366        let parent_hash = input.parent_hash();
367
368        // Fetch parent block. This goes to memory most of the time unless the parent block is
369        // beyond the in-memory buffer.
370        let parent_block = match self.sealed_header_by_hash(parent_hash, ctx.state()) {
371            Ok(Some(parent_block)) => parent_block,
372            Ok(None) => {
373                return Err(InsertBlockError::new(
374                    self.convert_to_block(input)?,
375                    ProviderError::HeaderNotFound(parent_hash.into()).into(),
376                )
377                .into())
378            }
379            Err(e) => {
380                return Err(InsertBlockError::new(self.convert_to_block(input)?, e.into()).into())
381            }
382        };
383
384        // Spawn payload conversion and basic validation on a background thread so it runs
385        // concurrently with the rest of the function (setup + execution). For payloads this
386        // overlaps the cost of RLP decoding + header hashing.
387        let validated_block = self.spawn_convert_and_validate(&input, parent_block.clone());
388
389        /// A helper macro that returns the block in case there was an error
390        /// This macro is used for early returns before block conversion
391        macro_rules! ensure_ok {
392            ($expr:expr) => {
393                match $expr {
394                    Ok(val) => val,
395                    Err(e) => {
396                        let block = validated_block.try_into_inner().expect("sole handle")?;
397                        return Err(InsertBlockError::new(block, e.into()).into())
398                    }
399                }
400            };
401        }
402
403        /// A helper macro for handling errors after the input has been converted to a block
404        macro_rules! ensure_ok_post_block {
405            ($expr:expr, $block:expr) => {
406                match $expr {
407                    Ok(val) => val,
408                    Err(e) => {
409                        return Err(
410                            InsertBlockError::new($block.into_sealed_block(), e.into()).into()
411                        )
412                    }
413                }
414            };
415        }
416
417        // If the gas usage is suspiciously high (multiple times higher than parent's gas limit), be
418        // cautious and block on pre-execution checks of the block.
419        if input.gas_used() > parent_block.gas_limit() * MAX_EXPECTED_GAS_USAGE_MULTIPLIER {
420            // Call `.get()` to await the pre-execution checks and exit early if they fail.
421            if validated_block.get().is_err() {
422                return Err(validated_block
423                    .try_into_inner()
424                    .expect("sole handle")
425                    .expect_err("Err result checked"))
426            }
427        }
428
429        trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
430        let _enter =
431            debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
432        let Some(provider_builder) =
433            ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
434        else {
435            // this is pre-validated in the tree
436            return Err(InsertBlockError::new(
437                validated_block.try_into_inner().expect("sole handle")?,
438                ProviderError::HeaderNotFound(parent_hash.into()).into(),
439            )
440            .into())
441        };
442        let mut state_provider = ensure_ok!(provider_builder.build());
443        drop(_enter);
444
445        let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
446            .in_scope(|| self.evm_env_for(&input))
447            .map_err(NewPayloadError::other)?;
448
449        // Extract the decoded BAL, if valid and available.
450        let decoded_bal = ensure_ok!(input
451            .try_decoded_access_list()
452            .map_err(|err| { Box::<dyn std::error::Error + Send + Sync>::from(err) }))
453        .map(Arc::new);
454
455        let env = ExecutionEnv {
456            evm_env,
457            hash: input.hash(),
458            parent_hash: input.parent_hash(),
459            parent_state_root: parent_block.state_root(),
460            transaction_count: input.transaction_count(),
461            gas_used: input.gas_used(),
462            withdrawals: input.withdrawals().map(|w| w.to_vec()),
463            decoded_bal,
464        };
465
466        // Plan the strategy used for state root computation.
467        let strategy = self.plan_state_root_computation();
468
469        debug!(
470            target: "engine::tree::payload_validator",
471            ?strategy,
472            "Decided which state root algorithm to run"
473        );
474
475        // Get an iterator over the transactions in the payload
476        let txs = self.tx_iterator_for(&input)?;
477
478        // Create lazy overlay from ancestors - this doesn't block, allowing execution to start
479        // before the trie data is ready. The overlay will be computed on first access.
480        let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
481
482        // Create overlay factory for payload processor (StateRootTask path needs it for
483        // multiproofs)
484        let provider_factory = self.provider.clone();
485        let overlay_builder = OverlayBuilder::<N>::new(anchor_hash, self.changeset_cache.clone())
486            .with_lazy_overlay(lazy_overlay);
487        let overlay_factory =
488            OverlayStateProviderFactory::new(provider_factory.clone(), overlay_builder.clone());
489
490        // BAL execute path eligibility. Computed up front because the BAL arm needs a clone of
491        // `provider_builder` (consumed by `spawn_payload_processor` below).
492        let bal_eligible = ensure_ok!(self.bal_path_eligible(env.decoded_bal.as_deref()));
493        let bal_provider_builder = bal_eligible.then(|| provider_builder.clone());
494
495        // Spawn the appropriate processor based on strategy
496        let mut handle = ensure_ok!(self.spawn_payload_processor(
497            env.clone(),
498            txs,
499            provider_builder.clone(),
500            overlay_factory.clone(),
501            &strategy,
502        ));
503
504        // Create optional cache stats for detailed block logging
505        let slow_block_enabled = self.config.slow_block_threshold().is_some();
506        let cache_stats = slow_block_enabled.then(|| Arc::new(CacheStats::default()));
507
508        // Use cached state provider before executing, used in execution after prewarming threads
509        // complete
510        if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
511            state_provider = Box::new(
512                CachedStateProvider::new(state_provider, caches, cache_metrics)
513                    .with_cache_stats(cache_stats.clone()),
514            );
515        };
516
517        let state_provider_stats = if slow_block_enabled || self.config.state_provider_metrics() {
518            let instrumented_state_provider =
519                InstrumentedStateProvider::new(state_provider, "engine");
520            let stats = slow_block_enabled.then(|| instrumented_state_provider.stats());
521            state_provider = Box::new(instrumented_state_provider);
522            stats
523        } else {
524            None
525        };
526
527        // Execute the block and handle any execution errors.
528        // The receipt root task is spawned before execution and receives receipts incrementally
529        // as transactions complete, allowing parallel computation during execution.
530        let execute_block_start = Instant::now();
531        let decoded_bal = env.decoded_bal.clone();
532        let (output, senders, receipt_root_rx) = if bal_eligible {
533            let provider_builder =
534                bal_provider_builder.expect("eligibility implies builder was cloned");
535            ensure_ok!(self.execute_block_bal(
536                state_provider,
537                env,
538                &input,
539                &handle,
540                provider_builder
541            ))
542        } else {
543            ensure_ok!(self.execute_block(state_provider, env, &input, &mut handle))
544        };
545        let block_access_list_hash = decoded_bal.as_ref().map(|decoded_bal| decoded_bal.hash());
546        let execution_duration = execute_block_start.elapsed();
547
548        // After executing the block we can stop prewarming transactions
549        handle.stop_prewarming_execution();
550
551        // Create ExecutionOutcome early so we can terminate caching before validation and state
552        // root computation. Using Arc allows sharing with both the caching task and the deferred
553        // trie task without cloning the expensive BundleState.
554        let output = Arc::new(output);
555
556        // Terminate caching task early since execution is complete and caching is no longer
557        // needed. This frees up resources while state root computation continues.
558        let valid_block_tx = handle.terminate_caching(Some(output.clone()));
559
560        // Spawn hashed post state computation in background so it runs concurrently with
561        // block conversion and receipt root computation. This is a pure CPU-bound task
562        // (keccak256 hashing of all changed addresses and storage slots).
563        let hashed_state_output = output.clone();
564        let hashed_state_provider = self.provider.clone();
565        let hashed_state: LazyHashedPostState =
566            self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
567                let _span = debug_span!(
568                    target: "engine::tree::payload_validator",
569                    "hashed_post_state",
570                )
571                .entered();
572                Arc::new(hashed_state_provider.hashed_post_state(&hashed_state_output.state))
573            });
574
575        let block = validated_block.try_into_inner().expect("sole handle")?;
576        let block = block.with_senders(senders);
577
578        // Wait for the receipt root computation to complete.
579        let receipt_root_bloom = {
580            let _enter = debug_span!(
581                target: "engine::tree::payload_validator",
582                "wait_receipt_root",
583            )
584            .entered();
585
586            receipt_root_rx
587                .blocking_recv()
588                .inspect_err(|_| {
589                    tracing::error!(
590                        target: "engine::tree::payload_validator",
591                        "Receipt root task dropped sender without result, receipt root calculation likely aborted"
592                    );
593                })
594                .ok()
595        };
596
597        let hashed_state = ensure_ok_post_block!(
598            self.validate_post_execution(
599                &block,
600                &parent_block,
601                &output,
602                &mut ctx,
603                receipt_root_bloom,
604                hashed_state,
605                block_access_list_hash
606            ),
607            block
608        );
609
610        let root_time = Instant::now();
611        let mut maybe_state_root = None;
612        let mut state_root_task_failed = false;
613        #[cfg(feature = "trie-debug")]
614        let mut trie_debug_recorders = Vec::new();
615
616        match strategy {
617            StateRootStrategy::StateRootTask => {
618                debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
619
620                let task_result = ensure_ok_post_block!(
621                    self.await_state_root_with_timeout(
622                        &mut handle,
623                        provider_builder.clone(),
624                        &hashed_state,
625                    ),
626                    block
627                );
628
629                match task_result {
630                    Ok(StateRootComputeOutcome {
631                        state_root,
632                        trie_updates,
633                        #[cfg(feature = "trie-debug")]
634                        debug_recorders,
635                    }) => {
636                        let elapsed = root_time.elapsed();
637                        info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
638
639                        #[cfg(feature = "trie-debug")]
640                        {
641                            trie_debug_recorders = debug_recorders;
642                        }
643
644                        // Compare trie updates with serial computation if configured
645                        if self.config.always_compare_trie_updates() {
646                            let _has_diff = self.compare_trie_updates_with_serial(
647                                provider_builder.clone(),
648                                provider_factory,
649                                overlay_builder,
650                                &hashed_state,
651                                trie_updates.as_ref().clone(),
652                            );
653                            #[cfg(feature = "trie-debug")]
654                            if _has_diff {
655                                Self::write_trie_debug_recorders(
656                                    block.header().number(),
657                                    &trie_debug_recorders,
658                                );
659                            }
660                        }
661
662                        // we double check the state root here for good measure
663                        if state_root == block.header().state_root() {
664                            maybe_state_root = Some((state_root, trie_updates, elapsed))
665                        } else {
666                            warn!(
667                                target: "engine::tree::payload_validator",
668                                ?state_root,
669                                block_state_root = ?block.header().state_root(),
670                                "State root task returned incorrect state root"
671                            );
672                            #[cfg(feature = "trie-debug")]
673                            Self::write_trie_debug_recorders(
674                                block.header().number(),
675                                &trie_debug_recorders,
676                            );
677                            state_root_task_failed = true;
678                        }
679                    }
680                    Err(error) => {
681                        debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
682                        state_root_task_failed = true;
683                    }
684                }
685            }
686            StateRootStrategy::Parallel => {
687                debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
688                match self.compute_state_root_parallel(
689                    provider_factory,
690                    overlay_builder,
691                    &hashed_state,
692                ) {
693                    Ok(result) => {
694                        let elapsed = root_time.elapsed();
695                        info!(
696                            target: "engine::tree::payload_validator",
697                            regular_state_root = ?result.0,
698                            ?elapsed,
699                            "Regular root task finished"
700                        );
701                        maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
702                    }
703                    Err(error) => {
704                        debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
705                    }
706                }
707            }
708            StateRootStrategy::Synchronous => {}
709            StateRootStrategy::Custom(custom) => {
710                let (state_root, trie_updates) = ensure_ok_post_block!(
711                    custom(CustomStateRootInput {
712                        block: &block,
713                        parent_block: &parent_block,
714                        output: &output,
715                        hashed_state: &hashed_state,
716                    }),
717                    block
718                );
719                maybe_state_root = Some((state_root, Arc::new(trie_updates), root_time.elapsed()));
720            }
721        }
722
723        // Determine the state root.
724        // If the state root was computed in parallel, we use it.
725        // Otherwise, we fall back to computing it synchronously.
726        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
727            maybe_state_root
728        {
729            maybe_state_root
730        } else {
731            // fallback is to compute the state root regularly in sync
732            if self.config.state_root_fallback() {
733                debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
734            } else {
735                warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
736                self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
737            }
738
739            let (root, updates) = ensure_ok_post_block!(
740                provider_builder
741                    .build()
742                    .and_then(|provider| Self::compute_state_root_serial(provider, &hashed_state)),
743                block
744            );
745
746            if state_root_task_failed {
747                self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
748            }
749
750            (root, Arc::new(updates), root_time.elapsed())
751        };
752
753        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
754        self.metrics
755            .record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
756        debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
757
758        // ensure state root matches
759        if state_root != block.header().state_root() {
760            #[cfg(feature = "trie-debug")]
761            Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
762
763            // call post-block hook
764            self.on_invalid_block(
765                &parent_block,
766                &block,
767                &output,
768                Some((&trie_output, state_root)),
769                ctx.state_mut(),
770            );
771            let block_state_root = block.header().state_root();
772            return Err(InsertBlockError::new(
773                block.into_sealed_block(),
774                ConsensusError::BodyStateRootDiff(
775                    GotExpected { got: state_root, expected: block_state_root }.into(),
776                )
777                .into(),
778            )
779            .into())
780        }
781
782        let timing_stats = state_provider_stats.map(|stats| {
783            self.calculate_timing_stats(
784                &block,
785                stats,
786                cache_stats,
787                &output,
788                execution_duration,
789                root_elapsed,
790            )
791        });
792
793        if let Some(valid_block_tx) = valid_block_tx {
794            let _ = valid_block_tx.send(());
795        }
796
797        // Create the overlay provider NOW, while we're on the engine loop thread and trie changeset
798        // eviction cannot race with us. If we deferred this to the background task, persistence
799        // could advance and evict changeset cache entries between factory creation and the task
800        // actually running, causing expensive DB fallback computations when building the overlay.
801        let changeset_provider =
802            ensure_ok_post_block!(overlay_factory.database_provider_ro(), block);
803
804        let executed_block = self.spawn_deferred_trie_task(
805            Arc::new(block),
806            output,
807            ctx.state(),
808            hashed_state,
809            trie_output,
810            changeset_provider,
811        );
812        Ok(ValidationOutput::new(executed_block, timing_stats))
813    }
814
815    /// Spawns a background task to convert a [`BlockOrPayload`] into a [`SealedBlock`] and perform
816    /// basic consensus validations on it.
817    #[expect(clippy::type_complexity)]
818    pub fn spawn_convert_and_validate<T>(
819        &self,
820        input: &BlockOrPayload<T>,
821        parent: SealedHeader<N::BlockHeader>,
822    ) -> LazyHandle<Result<SealedBlock<N::Block>, InsertPayloadError<N::Block>>>
823    where
824        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
825        V: PayloadValidator<T, Block = N::Block> + Clone,
826    {
827        let input = input.clone();
828        let validator = self.validator.clone();
829        let consensus = self.consensus.clone();
830        let parent_span = Span::current();
831        self.payload_processor.executor().spawn_blocking_named("payload-convert", move || {
832            let _span = debug_span!(
833                target: "engine::tree::payload_validator",
834                parent: parent_span,
835                "convert_and_validate",
836            )
837            .entered();
838            let block = match input {
839                BlockOrPayload::Block(block) => block,
840                BlockOrPayload::Payload(payload) => {
841                    validator.convert_payload_to_block(payload)?
842                }
843            };
844
845            if let Err(e) = consensus.validate_header(block.sealed_header()) {
846                error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
847                return Err(InsertBlockError::consensus_error(e, block).into())
848            }
849
850            // now validate against the parent
851            let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
852            if let Err(e) = consensus.validate_header_against_parent(block.sealed_header(), &parent)
853            {
854                warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
855                return Err(InsertBlockError::consensus_error(e, block).into())
856            }
857            drop(_enter);
858
859            if let Err(e) =
860                consensus.validate_block_pre_execution_with_tx_root(&block, None)
861            {
862                error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
863                return Err(InsertBlockError::consensus_error(e, block).into())
864            }
865
866            Ok(block)
867        })
868    }
869
870    /// Return sealed block header from database or in-memory state by hash.
871    fn sealed_header_by_hash(
872        &self,
873        hash: B256,
874        state: &EngineApiTreeState<N>,
875    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
876        // check memory first
877        let header = state.tree_state.sealed_header_by_hash(&hash);
878
879        if header.is_some() {
880            Ok(header)
881        } else {
882            self.provider.sealed_header_by_hash(hash)
883        }
884    }
885
886    /// Executes a block with the given state provider.
887    ///
888    /// This method orchestrates block execution:
889    /// 1. Sets up the EVM with state database and precompile caching
890    /// 2. Spawns a background task for incremental receipt root computation
891    /// 3. Executes transactions with metrics collection via state hooks
892    /// 4. Merges state transitions and records execution metrics
893    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
894    #[expect(clippy::type_complexity)]
895    fn execute_block<S, Err, T>(
896        &mut self,
897        state_provider: S,
898        env: ExecutionEnv<Evm>,
899        input: &BlockOrPayload<T>,
900        handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
901    ) -> Result<
902        (BlockExecutionOutput<N::Receipt>, Vec<Address>, ReceiptRootReceiver),
903        InsertBlockErrorKind,
904    >
905    where
906        S: StateProvider + Send,
907        Err: core::error::Error + Send + Sync + 'static,
908        V: PayloadValidator<T, Block = N::Block>,
909        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
910        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
911    {
912        debug!(target: "engine::tree::payload_validator", "Executing block");
913
914        if let Some(decoded_bal) = &env.decoded_bal {
915            decoded_bal
916                .as_bal()
917                .validate_gas_limit(input.gas_limit())
918                .map_err(|e| {
919                    debug!(target: "engine::tree::payload_validator", "BAL is invalid since it contains more items than the gas limit allows");
920                    InsertBlockErrorKind::Consensus(ConsensusError::from(e))
921                })?
922        }
923
924        let has_bal = env.decoded_bal.is_some();
925        let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
926            State::builder()
927                .with_database(StateProviderDatabase::new(state_provider))
928                .with_bundle_update()
929                .with_bal_builder_if(has_bal)
930                .build()
931        });
932
933        let (spec_id, mut executor) = {
934            let _span = debug_span!(target: "engine::tree", "create_evm").entered();
935            let spec_id = *env.evm_env.spec_id();
936            let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
937            let ctx = self
938                .execution_ctx_for(input)
939                .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
940            let executor = self.evm_config.create_executor(evm, ctx);
941            (spec_id, executor)
942        };
943
944        if !self.config.precompile_cache_disabled() {
945            let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
946            executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
947                |address, precompile| {
948                    let metrics = self
949                        .precompile_cache_metrics
950                        .entry(*address)
951                        .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
952                        .clone();
953                    CachedPrecompile::wrap(
954                        precompile,
955                        self.precompile_cache_map.cache_for_address(*address),
956                        spec_id,
957                        Some(metrics),
958                    )
959                },
960            );
961        }
962
963        let transaction_count = input.transaction_count();
964        let (receipt_tx, result_rx) = self.spawn_receipt_root_task(transaction_count);
965        let executed_tx_index = Arc::clone(handle.executed_tx_index());
966        let executor = executor.with_state_hook(
967            handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
968        );
969
970        let execution_start = Instant::now();
971
972        // Execute all transactions and finalize
973        let (executor, senders) = self.execute_transactions(
974            executor,
975            transaction_count,
976            handle.iter_transactions(),
977            &receipt_tx,
978            &executed_tx_index,
979            has_bal,
980        )?;
981        drop(receipt_tx);
982
983        // Finish execution and get the result
984        let post_exec_start = Instant::now();
985        let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
986            .in_scope(|| executor.finish())
987            .map(|(evm, result)| (evm.into_db(), result))?;
988        self.metrics.record_post_execution(post_exec_start.elapsed());
989
990        if let Some(decoded_bal) = &env.decoded_bal {
991            // Regular execution still handles BAL payloads when the parallel BAL path is
992            // disabled. Prove that execution rebuilt the payload-provided BAL before
993            // post-execution validation uses `decoded_bal.hash()` as the header commitment.
994            crate::tree::payload_processor::bal::validate_bal(&mut db, decoded_bal)?;
995        }
996
997        // Merge transitions into bundle state
998        debug_span!(target: "engine::tree", "merge_transitions")
999            .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
1000
1001        let output = BlockExecutionOutput { result, state: db.take_bundle() };
1002
1003        let execution_duration = execution_start.elapsed();
1004        self.metrics.record_block_execution(&output, execution_duration);
1005        self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1006        debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
1007
1008        Ok((output, senders, result_rx))
1009    }
1010
1011    /// Returns true when the BAL execute path should be used for this block.
1012    // TODO: extend with stronger gating before enabling on mainnet:
1013    //   - Fork check: `Amsterdam.active_at_timestamp(env.evm_env.timestamp)`. Today a BAL only
1014    //     exists post-Amsterdam, so the BAL-presence check is a sufficient proxy. It is a proxy,
1015    //     not a guarantee.
1016    //   - Tx-count threshold (`bal_execute_path_min_tx_count`): below the parallelism break-even
1017    //     point, provider setup and worker scheduling overhead can exceed the gain. Tune
1018    //     empirically once workers are parallel; meaningless while the commit loop is sequential.
1019    fn bal_path_eligible(&self, bal: Option<&DecodedBal>) -> Result<bool, InsertBlockErrorKind> {
1020        let has_bal = bal.is_some();
1021        let parallel_execution = has_bal &&
1022            !self.config.disable_state_cache() &&
1023            !self.config.disable_bal_parallel_execution();
1024        if parallel_execution && self.config.disable_bal_parallel_state_root() {
1025            return Err(InsertBlockErrorKind::Other(
1026                "disabling parallel state root is impossible when parallel execution is enabled"
1027                    .into(),
1028            ));
1029        }
1030
1031        Ok(parallel_execution)
1032    }
1033
1034    /// Executes the block on the BAL path. Mirrors the return shape of [`Self::execute_block`]
1035    /// so the dispatch site stays uniform.
1036    ///
1037    /// Inside, this:
1038    /// 1. Creates a shared parent-state cache handle for provider-backed workers.
1039    /// 2. Relies on BAL prewarm to stream sparse-trie updates and optional state prefetches.
1040    /// 3. Spawns the receipt-root task.
1041    /// 4. Calls [`crate::tree::payload_processor::bal::execute_block`].
1042    /// 5. Adapts the BAL output to a [`BlockExecutionOutput`].
1043    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1044    #[expect(clippy::type_complexity)]
1045    fn execute_block_bal<S, Tx, Err, BalP, T>(
1046        &self,
1047        _state_provider: S,
1048        env: ExecutionEnv<Evm>,
1049        input: &BlockOrPayload<T>,
1050        handle: &PayloadHandle<Tx, Err, N::Receipt>,
1051        provider_builder: StateProviderBuilder<N, BalP>,
1052    ) -> Result<
1053        (BlockExecutionOutput<N::Receipt>, Vec<Address>, ReceiptRootReceiver),
1054        InsertBlockErrorKind,
1055    >
1056    where
1057        S: StateProvider + Send,
1058        Tx: ExecutableTxFor<Evm> + Send,
1059        Err: core::error::Error + Send + Sync + 'static,
1060        BalP: BlockReader + StateProviderFactory + StateReader + Clone + Send + Sync + 'static,
1061        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
1062        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1063        V: PayloadValidator<T, Block = N::Block>,
1064    {
1065        debug!(target: "engine::tree::payload_validator", "Executing block via BAL path");
1066
1067        let cache = handle.caches().ok_or_else(|| {
1068            InsertBlockErrorKind::Other("BAL execute path: no execution cache available".into())
1069        })?;
1070        let cache_metrics = handle.cache_metrics().unwrap_or_default();
1071        let saved_cache = SavedCache::new(env.parent_hash, cache);
1072
1073        let (receipt_tx, result_rx) = self.spawn_receipt_root_task(env.transaction_count);
1074        let input_bal = env.decoded_bal.ok_or_else(|| {
1075            InsertBlockErrorKind::Other("BAL execute path: no decoded BAL available".into())
1076        })?;
1077
1078        let make_db = move || {
1079            let provider = provider_builder
1080                .build()
1081                .map_err(crate::tree::payload_processor::bal::BalExecutionError::Provider)?;
1082            Ok(StateProviderDatabase::new(CachedStateProvider::new_prewarm(
1083                provider,
1084                saved_cache.cache().clone(),
1085                cache_metrics.clone(),
1086            )))
1087        };
1088        let execution_start = Instant::now();
1089        let ctx =
1090            self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
1091        let (output, senders) = crate::tree::payload_processor::bal::execute_block(
1092            &self.runtime,
1093            &self.evm_config,
1094            &make_db,
1095            input_bal,
1096            env.evm_env,
1097            ctx,
1098            env.transaction_count,
1099            handle.clone_transaction_receiver(),
1100            receipt_tx,
1101        )?;
1102        let execution_duration = execution_start.elapsed();
1103
1104        self.metrics.record_block_execution(&output, execution_duration);
1105        self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
1106        debug!(
1107            target: "engine::tree::payload_validator",
1108            elapsed = ?execution_duration,
1109            "Executed block via BAL path",
1110        );
1111
1112        Ok((output, senders, result_rx))
1113    }
1114
1115    fn spawn_receipt_root_task(
1116        &self,
1117        receipts_len: usize,
1118    ) -> (ReceiptRootSender<N>, ReceiptRootReceiver) {
1119        // Unbounded channel is used since tx count bounds capacity anyway.
1120        let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
1121        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
1122        let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
1123        self.payload_processor
1124            .executor()
1125            .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
1126
1127        (receipt_tx, result_rx)
1128    }
1129
1130    /// Executes transactions and collects senders, streaming receipts to a background task.
1131    ///
1132    /// This method handles:
1133    /// - Applying pre-execution changes (e.g., beacon root updates)
1134    /// - Executing each transaction with timing metrics
1135    /// - Streaming receipts to the receipt root computation task
1136    /// - Collecting transaction senders for later use
1137    ///
1138    /// Returns the executor (for finalization) and the collected senders.
1139    fn execute_transactions<'a, E, Tx, InnerTx, Err, DB>(
1140        &self,
1141        mut executor: E,
1142        transaction_count: usize,
1143        transactions: impl Iterator<Item = Result<Tx, Err>>,
1144        receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
1145        executed_tx_index: &AtomicUsize,
1146        has_bal: bool,
1147    ) -> Result<(E, Vec<Address>), BlockExecutionError>
1148    where
1149        E: BlockExecutor<Receipt = N::Receipt, Evm: alloy_evm::Evm<DB = &'a mut State<DB>>>,
1150        Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
1151        InnerTx: TxHashRef,
1152        DB: revm::Database + 'a,
1153        Err: core::error::Error + Send + Sync + 'static,
1154    {
1155        let mut senders = Vec::with_capacity(transaction_count);
1156
1157        // Apply pre-execution changes (e.g., beacon root update)
1158        let pre_exec_start = Instant::now();
1159        debug_span!(target: "engine::tree", "pre_execution")
1160            .in_scope(|| executor.apply_pre_execution_changes())?;
1161        self.metrics.record_pre_execution(pre_exec_start.elapsed());
1162
1163        // Bump BAL index after pre-execution changes (EIP-7928: index 0 is pre-execution)
1164        if has_bal {
1165            executor.evm_mut().db_mut().bump_bal_index();
1166        }
1167
1168        // Execute transactions
1169        let exec_span = debug_span!(target: "engine::tree", "execution").entered();
1170        let mut transactions = transactions.into_iter();
1171        // Some executors may execute transactions that do not append receipts during the
1172        // main loop (e.g., system transactions whose receipts are added during finalization).
1173        // In that case, invoking the callback on every transaction would resend the previous
1174        // receipt with the same index and can panic the ordered root builder.
1175        let mut last_sent_len = 0usize;
1176        loop {
1177            // Measure time spent waiting for next transaction from iterator
1178            // (e.g., parallel signature recovery)
1179            let wait_start = Instant::now();
1180            let Some(tx_result) = transactions.next() else { break };
1181            self.metrics.record_transaction_wait(wait_start.elapsed());
1182
1183            let tx = tx_result.map_err(BlockExecutionError::other)?;
1184            let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
1185
1186            senders.push(tx_signer);
1187
1188            let _enter = debug_span!(
1189                target: "engine::tree",
1190                "execute tx",
1191                tx_index = senders.len() - 1,
1192            )
1193            .entered();
1194            trace!(target: "engine::tree", "Executing transaction");
1195
1196            let tx_start = Instant::now();
1197            executor.execute_transaction(tx)?;
1198            self.metrics.record_transaction_execution(tx_start.elapsed());
1199
1200            // advance the shared counter so prewarm workers skip already-executed txs
1201            executed_tx_index.store(senders.len(), Ordering::Relaxed);
1202
1203            let current_len = executor.receipts().len();
1204            if current_len > last_sent_len {
1205                last_sent_len = current_len;
1206                // Send the latest receipt to the background task for incremental root computation.
1207                if let Some(receipt) = executor.receipts().last() {
1208                    let tx_index = current_len - 1;
1209                    let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
1210                }
1211            }
1212            // Bump BAL index after each transaction (EIP-7928)
1213            if has_bal {
1214                executor.evm_mut().db_mut().bump_bal_index();
1215            }
1216        }
1217
1218        drop(exec_span);
1219
1220        Ok((executor, senders))
1221    }
1222
1223    /// Compute state root for the given hashed post state in parallel.
1224    ///
1225    /// Uses an overlay factory which provides the state of the parent block, along with the
1226    /// [`HashedPostState`] containing the changes of this block, to compute the state root and
1227    /// trie updates for this block.
1228    ///
1229    /// # Returns
1230    ///
1231    /// Returns `Ok(_)` if computed successfully.
1232    /// Returns `Err(_)` if error was encountered during computation.
1233    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1234    fn compute_state_root_parallel(
1235        &self,
1236        provider_factory: P,
1237        overlay_builder: OverlayBuilder<N>,
1238        hashed_state: &LazyHashedPostState,
1239    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
1240        let hashed_state = hashed_state.get();
1241        // The `hashed_state` argument will be taken into account as part of the overlay, but we
1242        // need to use the prefix sets which were generated from it to indicate to the
1243        // ParallelStateRoot which parts of the trie need to be recomputed.
1244        let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1245        let overlay_factory = OverlayStateProviderFactory::new(
1246            provider_factory,
1247            overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted()),
1248        );
1249        ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
1250            .incremental_root_with_updates()
1251    }
1252
1253    /// Compute state root for the given hashed post state in serial.
1254    ///
1255    /// Uses the same provider construction path as main execution and computes the state root and
1256    /// trie updates for this block directly via
1257    /// [`reth_provider::StateRootProvider::state_root_with_updates`].
1258    fn compute_state_root_serial(
1259        state_provider: StateProviderBox,
1260        hashed_state: &LazyHashedPostState,
1261    ) -> ProviderResult<(B256, TrieUpdates)> {
1262        state_provider.state_root_with_updates(hashed_state.get().as_ref().clone())
1263    }
1264
1265    /// Awaits the state root from the background task, with an optional timeout fallback.
1266    ///
1267    /// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
1268    /// state root task up to the timeout duration. If the task doesn't complete in time, a
1269    /// sequential state root computation is spawned via `spawn_blocking`. Both computations
1270    /// then race: the main thread polls the task receiver and the sequential result channel
1271    /// in a loop, returning whichever finishes first.
1272    ///
1273    /// If no timeout is configured, this simply awaits the state root task without any fallback.
1274    ///
1275    /// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
1276    /// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
1277    /// `Result` captures parallel state root task errors that can still fall back to serial.
1278    #[instrument(
1279        level = "debug",
1280        target = "engine::tree::payload_validator",
1281        name = "await_state_root",
1282        skip_all
1283    )]
1284    fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
1285        &self,
1286        handle: &mut PayloadHandle<Tx, Err, R>,
1287        state_provider_builder: StateProviderBuilder<N, P>,
1288        hashed_state: &LazyHashedPostState,
1289    ) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
1290        let Some(timeout) = self.config.state_root_task_timeout() else {
1291            return Ok(handle.state_root());
1292        };
1293
1294        let task_rx = handle.take_state_root_rx();
1295
1296        match task_rx.recv_timeout(timeout) {
1297            Ok(result) => Ok(result),
1298            Err(RecvTimeoutError::Disconnected) => {
1299                Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
1300            }
1301            Err(RecvTimeoutError::Timeout) => {
1302                warn!(
1303                    target: "engine::tree::payload_validator",
1304                    ?timeout,
1305                    "State root task timed out, spawning sequential fallback"
1306                );
1307                self.metrics.block_validation.state_root_task_timeout_total.increment(1);
1308
1309                let (seq_tx, seq_rx) =
1310                    std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
1311
1312                let seq_hashed_state = hashed_state.clone();
1313                self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
1314                    let result = state_provider_builder.build().and_then(|provider| {
1315                        Self::compute_state_root_serial(provider, &seq_hashed_state)
1316                    });
1317                    let _ = seq_tx.send(result);
1318                });
1319
1320                const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
1321
1322                loop {
1323                    match task_rx.recv_timeout(POLL_INTERVAL) {
1324                        Ok(result) => {
1325                            debug!(
1326                                target: "engine::tree::payload_validator",
1327                                source = "task",
1328                                "State root timeout race won"
1329                            );
1330                            return Ok(result);
1331                        }
1332                        Err(RecvTimeoutError::Disconnected) => {
1333                            debug!(
1334                                target: "engine::tree::payload_validator",
1335                                "State root task dropped, waiting for sequential fallback"
1336                            );
1337                            let result = seq_rx.recv().map_err(|_| {
1338                                ProviderError::other(std::io::Error::other(
1339                                    "both state root computations failed",
1340                                ))
1341                            })?;
1342                            let (state_root, trie_updates) = result?;
1343                            return Ok(Ok(StateRootComputeOutcome {
1344                                state_root,
1345                                trie_updates: Arc::new(trie_updates),
1346                                #[cfg(feature = "trie-debug")]
1347                                debug_recorders: Vec::new(),
1348                            }));
1349                        }
1350                        Err(RecvTimeoutError::Timeout) => {}
1351                    }
1352
1353                    if let Ok(result) = seq_rx.try_recv() {
1354                        debug!(
1355                            target: "engine::tree::payload_validator",
1356                            source = "sequential",
1357                            "State root timeout race won"
1358                        );
1359                        let (state_root, trie_updates) = result?;
1360                        return Ok(Ok(StateRootComputeOutcome {
1361                            state_root,
1362                            trie_updates: Arc::new(trie_updates),
1363                            #[cfg(feature = "trie-debug")]
1364                            debug_recorders: Vec::new(),
1365                        }));
1366                    }
1367                }
1368            }
1369        }
1370    }
1371
1372    /// Compares trie updates from the state root task with serial state root computation.
1373    ///
1374    /// This is used for debugging and validating the correctness of the parallel state root
1375    /// task implementation. When enabled via `--engine.state-root-task-compare-updates`, this
1376    /// method runs a separate serial state root computation and compares the resulting trie
1377    /// updates.
1378    fn compare_trie_updates_with_serial(
1379        &self,
1380        state_provider_builder: StateProviderBuilder<N, P>,
1381        provider_factory: P,
1382        overlay_builder: OverlayBuilder<N>,
1383        hashed_state: &LazyHashedPostState,
1384        task_trie_updates: TrieUpdates,
1385    ) -> bool {
1386        debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1387
1388        match state_provider_builder
1389            .build()
1390            .and_then(|provider| Self::compute_state_root_serial(provider, hashed_state))
1391        {
1392            Ok((serial_root, serial_trie_updates)) => {
1393                debug!(
1394                    target: "engine::tree::payload_validator",
1395                    ?serial_root,
1396                    "Serial state root computation finished for comparison"
1397                );
1398
1399                // Get a database provider to use as trie cursor factory
1400                let overlay_factory =
1401                    OverlayStateProviderFactory::new(provider_factory, overlay_builder);
1402                match overlay_factory.database_provider_ro() {
1403                    Ok(provider) => {
1404                        match super::trie_updates::compare_trie_updates(
1405                            &provider,
1406                            task_trie_updates,
1407                            serial_trie_updates,
1408                        ) {
1409                            Ok(has_diff) => return has_diff,
1410                            Err(err) => {
1411                                warn!(
1412                                    target: "engine::tree::payload_validator",
1413                                    %err,
1414                                    "Error comparing trie updates"
1415                                );
1416                                return true;
1417                            }
1418                        }
1419                    }
1420                    Err(err) => {
1421                        warn!(
1422                            target: "engine::tree::payload_validator",
1423                            %err,
1424                            "Failed to get database provider for trie update comparison"
1425                        );
1426                    }
1427                }
1428            }
1429            Err(err) => {
1430                warn!(
1431                    target: "engine::tree::payload_validator",
1432                    %err,
1433                    "Failed to compute serial state root for comparison"
1434                );
1435            }
1436        }
1437        false
1438    }
1439
1440    /// Writes trie debug recorders to a JSON file for the given block number.
1441    ///
1442    /// The file is written to the current working directory as
1443    /// `trie_debug_block_{block_number}.json`.
1444    #[cfg(feature = "trie-debug")]
1445    fn write_trie_debug_recorders(
1446        block_number: u64,
1447        recorders: &[(Option<B256>, TrieDebugRecorder)],
1448    ) {
1449        let path = format!("trie_debug_block_{block_number}.json");
1450        match serde_json::to_string_pretty(recorders) {
1451            Ok(json) => match std::fs::write(&path, json) {
1452                Ok(()) => {
1453                    warn!(
1454                        target: "engine::tree::payload_validator",
1455                        %path,
1456                        "Wrote trie debug recorders to file"
1457                    );
1458                }
1459                Err(err) => {
1460                    warn!(
1461                        target: "engine::tree::payload_validator",
1462                        %err,
1463                        %path,
1464                        "Failed to write trie debug recorders"
1465                    );
1466                }
1467            },
1468            Err(err) => {
1469                warn!(
1470                    target: "engine::tree::payload_validator",
1471                    %err,
1472                    "Failed to serialize trie debug recorders"
1473                );
1474            }
1475        }
1476    }
1477
1478    /// Validates the block after execution.
1479    ///
1480    /// This performs:
1481    /// - parent header validation
1482    /// - post-execution consensus validation
1483    /// - state-root based post-execution validation
1484    ///
1485    /// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
1486    /// and logs bloom from the receipts.
1487    ///
1488    /// The `hashed_state` handle wraps the background hashed post state computation.
1489    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1490    #[expect(clippy::too_many_arguments)]
1491    fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1492        &self,
1493        block: &RecoveredBlock<N::Block>,
1494        parent_block: &SealedHeader<N::BlockHeader>,
1495        output: &BlockExecutionOutput<N::Receipt>,
1496        ctx: &mut TreeCtx<'_, N>,
1497        receipt_root_bloom: Option<ReceiptRootBloom>,
1498        hashed_state: LazyHashedPostState,
1499        block_access_list_hash: Option<B256>,
1500    ) -> Result<LazyHashedPostState, InsertBlockErrorKind>
1501    where
1502        V: PayloadValidator<T, Block = N::Block>,
1503    {
1504        let start = Instant::now();
1505
1506        trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1507
1508        // Validate block post-execution rules
1509        let _enter =
1510            debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1511                .entered();
1512        if let Err(err) = self.consensus.validate_block_post_execution(
1513            block,
1514            output,
1515            receipt_root_bloom,
1516            block_access_list_hash,
1517        ) {
1518            // call post-block hook
1519            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1520            return Err(err.into())
1521        }
1522        drop(_enter);
1523
1524        // Wait for the background keccak256 hashing task to complete. This blocks until
1525        // all changed addresses and storage slots have been hashed.
1526        let hashed_state_ref =
1527            debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
1528                .in_scope(|| hashed_state.get());
1529
1530        let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
1531        if let Err(err) =
1532            self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, block)
1533        {
1534            // call post-block hook
1535            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1536            return Err(err.into())
1537        }
1538
1539        // record post-execution validation duration
1540        self.metrics
1541            .block_validation
1542            .post_execution_validation_duration
1543            .record(start.elapsed().as_secs_f64());
1544
1545        Ok(hashed_state)
1546    }
1547
1548    /// Spawns a payload processor task based on the state root strategy.
1549    ///
1550    /// This method determines how to execute the block and compute its state root based on
1551    /// the selected strategy:
1552    /// - `StateRootTask`: Uses a dedicated task for state root computation with proof generation
1553    /// - `Parallel`: Computes state root in parallel with block execution
1554    /// - `Synchronous`: Falls back to sequential execution and state root computation
1555    ///
1556    /// The method handles strategy fallbacks if the preferred approach fails, ensuring
1557    /// block execution always completes with a valid state root.
1558    ///
1559    /// # Arguments
1560    ///
1561    /// * `overlay_factory` - Pre-computed overlay factory for multiproof generation
1562    ///   (`StateRootTask`)
1563    #[instrument(
1564        level = "debug",
1565        target = "engine::tree::payload_validator",
1566        skip_all,
1567        fields(?strategy)
1568    )]
1569    fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1570        &mut self,
1571        env: ExecutionEnv<Evm>,
1572        txs: T,
1573        provider_builder: StateProviderBuilder<N, P>,
1574        overlay_factory: OverlayStateProviderFactory<P, N>,
1575        strategy: &StateRootStrategy<N>,
1576    ) -> Result<
1577        PayloadHandle<
1578            impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1579            impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1580            N::Receipt,
1581        >,
1582        InsertBlockErrorKind,
1583    > {
1584        match strategy {
1585            StateRootStrategy::StateRootTask => {
1586                let spawn_start = Instant::now();
1587
1588                // Use the pre-computed overlay factory for multiproofs
1589                let handle = self.payload_processor.spawn(
1590                    env,
1591                    txs,
1592                    provider_builder,
1593                    overlay_factory,
1594                    &self.config,
1595                );
1596
1597                // record prewarming initialization duration
1598                self.metrics
1599                    .block_validation
1600                    .spawn_payload_processor
1601                    .record(spawn_start.elapsed().as_secs_f64());
1602
1603                Ok(handle)
1604            }
1605            StateRootStrategy::Parallel |
1606            StateRootStrategy::Synchronous |
1607            StateRootStrategy::Custom(_) => {
1608                let start = Instant::now();
1609                let handle =
1610                    self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
1611
1612                // Record prewarming initialization duration
1613                self.metrics
1614                    .block_validation
1615                    .spawn_payload_processor
1616                    .record(start.elapsed().as_secs_f64());
1617
1618                Ok(handle)
1619            }
1620        }
1621    }
1622
1623    /// Creates a `StateProviderBuilder` for the given parent hash.
1624    ///
1625    /// This method checks if the parent is in the tree state (in-memory) or persisted to disk,
1626    /// and creates the appropriate provider builder.
1627    fn state_provider_builder(
1628        &self,
1629        hash: B256,
1630        state: &EngineApiTreeState<N>,
1631    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1632        if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1633            debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1634            // the block leads back to the canonical chain
1635            return Ok(Some(StateProviderBuilder::new(
1636                self.provider.clone(),
1637                historical,
1638                Some(blocks),
1639            )))
1640        }
1641
1642        // Check if the block is persisted
1643        if let Some(header) = self.provider.header(hash)? {
1644            debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1645            // For persisted blocks, we create a builder that will fetch state directly from the
1646            // database
1647            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1648        }
1649
1650        debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1651        Ok(None)
1652    }
1653
1654    /// Determines the state root computation strategy based on configuration.
1655    ///
1656    /// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
1657    /// too expensive because it requires walking all paths in every proof.
1658    fn plan_state_root_computation(&self) -> StateRootStrategy<N> {
1659        if let Some(custom_state_root) = &self.custom_state_root {
1660            StateRootStrategy::Custom(custom_state_root.clone())
1661        } else if self.config.state_root_fallback() {
1662            StateRootStrategy::Synchronous
1663        } else if self.config.use_state_root_task() {
1664            StateRootStrategy::StateRootTask
1665        } else {
1666            StateRootStrategy::Parallel
1667        }
1668    }
1669
1670    /// Called when an invalid block is encountered during validation.
1671    fn on_invalid_block(
1672        &self,
1673        parent_header: &SealedHeader<N::BlockHeader>,
1674        block: &RecoveredBlock<N::Block>,
1675        output: &BlockExecutionOutput<N::Receipt>,
1676        trie_updates: Option<(&TrieUpdates, B256)>,
1677        state: &mut EngineApiTreeState<N>,
1678    ) {
1679        if state.invalid_headers.get(&block.hash()).is_some() {
1680            // we already marked this block as invalid
1681            return
1682        }
1683        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1684    }
1685
1686    /// Creates a [`LazyOverlay`] for the parent block without blocking.
1687    ///
1688    /// Returns a lazy overlay that will compute the trie input on first access, and the anchor
1689    /// block hash (the highest persisted ancestor). This allows execution to start immediately
1690    /// while the trie input computation is deferred until the overlay is actually needed.
1691    ///
1692    /// If parent is on disk (no in-memory blocks), returns `None` for the lazy overlay.
1693    ///
1694    /// Uses a cached overlay if available for the canonical head (the common case).
1695    fn get_parent_lazy_overlay(
1696        parent_hash: B256,
1697        state: &EngineApiTreeState<N>,
1698    ) -> (Option<LazyOverlay<N>>, B256) {
1699        // Get blocks leading to the parent to determine the anchor
1700        let (anchor_hash, blocks) =
1701            state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
1702
1703        if blocks.is_empty() {
1704            debug!(target: "engine::tree::payload_validator", "Parent found on disk, no lazy overlay needed");
1705            return (None, anchor_hash);
1706        }
1707
1708        // Try to use the cached overlay if it matches both parent hash and anchor
1709        if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
1710            debug!(
1711                target: "engine::tree::payload_validator",
1712                %parent_hash,
1713                %anchor_hash,
1714                "Using cached canonical overlay"
1715            );
1716            return (Some(cached.overlay.clone()), cached.anchor_hash);
1717        }
1718
1719        debug!(
1720            target: "engine::tree::payload_validator",
1721            %anchor_hash,
1722            num_blocks = blocks.len(),
1723            "Creating lazy overlay for in-memory blocks"
1724        );
1725
1726        (Some(LazyOverlay::new(blocks)), anchor_hash)
1727    }
1728
1729    /// Spawns a background task to compute and sort trie data for the executed block.
1730    ///
1731    /// This function creates a [`DeferredTrieData`] handle with fallback inputs and spawns a
1732    /// blocking task that calls `wait_cloned()` to:
1733    /// 1. Sort the block's hashed state and trie updates
1734    /// 2. Merge ancestor overlays and extend with the sorted data
1735    /// 3. Create an [`AnchoredTrieInput`](reth_chain_state::AnchoredTrieInput) for efficient future
1736    ///    trie computations
1737    /// 4. Cache the result so subsequent calls return immediately
1738    ///
1739    /// If the background task hasn't completed when `trie_data()` is called, `wait_cloned()`
1740    /// computes from the stored inputs, eliminating deadlock risk and duplicate computation.
1741    ///
1742    /// The validation hot path can return immediately after state root verification,
1743    /// while consumers (DB writes, overlay providers, proofs) get trie data either
1744    /// from the completed task or via fallback computation.
1745    fn spawn_deferred_trie_task(
1746        &self,
1747        block: Arc<RecoveredBlock<N::Block>>,
1748        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1749        state: &EngineApiTreeState<N>,
1750        hashed_state: LazyHashedPostState,
1751        trie_output: Arc<TrieUpdates>,
1752        changeset_provider: impl TrieCursorFactory + Send + 'static,
1753    ) -> ExecutedBlock<N> {
1754        // Capture parent hash and ancestor overlays for deferred trie input construction.
1755        let (anchor_hash, overlay_blocks) = state
1756            .tree_state
1757            .blocks_by_hash(block.parent_hash())
1758            .unwrap_or_else(|| (block.parent_hash(), Vec::new()));
1759
1760        // Collect lightweight ancestor trie data handles. We don't call trie_data() here;
1761        // the merge and any fallback sorting happens in the compute_trie_input_task.
1762        let ancestors: Vec<DeferredTrieData> =
1763            overlay_blocks.iter().rev().map(|b| b.trie_data_handle()).collect();
1764
1765        // Create deferred handle with fallback inputs in case the background task hasn't completed.
1766        // Resolve the lazy handle into Arc<HashedPostState>. By this point the hashed state has
1767        // already been computed and used for state root verification, so .get() returns instantly.
1768        let hashed_state = match hashed_state.try_into_inner() {
1769            Ok(state) => state,
1770            Err(handle) => handle.get().clone(),
1771        };
1772        let deferred_trie_data =
1773            DeferredTrieData::pending(hashed_state, trie_output, anchor_hash, ancestors);
1774        let deferred_handle_task = deferred_trie_data.clone();
1775        let block_validation_metrics = self.metrics.block_validation.clone();
1776
1777        // Capture block info and cache handle for changeset computation
1778        let block_hash = block.hash();
1779        let block_number = block.number();
1780
1781        // Register a pending changeset entry so that concurrent readers will wait for
1782        // this computation to finish rather than falling back to the expensive DB path.
1783        // The guard ensures the pending entry is cancelled if the task panics.
1784        let pending_changeset_guard = self.changeset_cache.register_pending(block_hash);
1785
1786        // Spawn background task to compute trie data. Calling `wait_cloned` will compute from
1787        // the stored inputs and cache the result, so subsequent calls return immediately.
1788        let compute_trie_input_task = move || {
1789            let _span = debug_span!(
1790                target: "engine::tree::payload_validator",
1791                "compute_trie_input_task",
1792                block_number
1793            )
1794            .entered();
1795
1796            let result = panic::catch_unwind(AssertUnwindSafe(|| {
1797                let compute_start = Instant::now();
1798                let computed = deferred_handle_task.wait_cloned();
1799                block_validation_metrics
1800                    .deferred_trie_compute_duration
1801                    .record(compute_start.elapsed().as_secs_f64());
1802
1803                // Record sizes of the computed trie data
1804                block_validation_metrics
1805                    .hashed_post_state_size
1806                    .record(computed.hashed_state.total_len() as f64);
1807                block_validation_metrics
1808                    .trie_updates_sorted_size
1809                    .record(computed.trie_updates.total_len() as f64);
1810                if let Some(anchored) = &computed.anchored_trie_input {
1811                    block_validation_metrics
1812                        .anchored_overlay_trie_updates_size
1813                        .record(anchored.trie_input.nodes.total_len() as f64);
1814                    block_validation_metrics
1815                        .anchored_overlay_hashed_state_size
1816                        .record(anchored.trie_input.state.total_len() as f64);
1817                }
1818
1819                // Compute and cache changesets using the computed trie_updates.
1820                // Use the pre-created provider to avoid races with changeset cache
1821                // eviction that can happen between task spawn and execution.
1822                let changeset_start = Instant::now();
1823
1824                match reth_trie::changesets::compute_trie_changesets(
1825                    &changeset_provider,
1826                    &computed.trie_updates,
1827                ) {
1828                    Ok(changesets) => {
1829                        debug!(
1830                            target: "engine::tree::changeset",
1831                            ?block_number,
1832                            elapsed = ?changeset_start.elapsed(),
1833                            "Computed and caching changesets"
1834                        );
1835
1836                        pending_changeset_guard.resolve(block_number, Arc::new(changesets));
1837                    }
1838                    Err(e) => {
1839                        warn!(
1840                            target: "engine::tree::changeset",
1841                            ?block_number,
1842                            ?e,
1843                            "Failed to compute changesets in deferred trie task"
1844                        );
1845                    }
1846                }
1847            }));
1848
1849            if result.is_err() {
1850                error!(
1851                    target: "engine::tree::payload_validator",
1852                    "Deferred trie task panicked; fallback computation will be used when trie data is accessed"
1853                );
1854            }
1855        };
1856
1857        // Spawn task that computes trie data asynchronously.
1858        self.payload_processor
1859            .executor()
1860            .spawn_blocking_named("trie-input", compute_trie_input_task);
1861
1862        ExecutedBlock::with_deferred_trie_data(block, execution_outcome, deferred_trie_data)
1863    }
1864
1865    fn calculate_timing_stats(
1866        &self,
1867        block: &RecoveredBlock<N::Block>,
1868        provider_stats: Arc<StateProviderStats>,
1869        cache_stats: Option<Arc<CacheStats>>,
1870        output: &BlockExecutionOutput<N::Receipt>,
1871        execution_duration: Duration,
1872        state_hash_duration: Duration,
1873    ) -> Box<ExecutionTimingStats> {
1874        let accounts_read = provider_stats.total_account_fetches();
1875        let storage_read = provider_stats.total_storage_fetches();
1876        let code_read = provider_stats.total_code_fetches();
1877        let code_bytes_read = provider_stats.total_code_fetched_bytes();
1878
1879        // Write stats from BundleState (final state changes)
1880        let accounts_changed = output.state.state.len();
1881        let accounts_deleted =
1882            output.state.state.values().filter(|acc| acc.was_destroyed()).count();
1883        let storage_slots_changed =
1884            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
1885        let storage_slots_deleted = output
1886            .state
1887            .state
1888            .values()
1889            .flat_map(|account| account.storage.values())
1890            .filter(|slot| {
1891                slot.present_value.is_zero() && !slot.previous_or_original_value.is_zero()
1892            })
1893            .count();
1894
1895        // Helper: check if account represents a new contract deployment
1896        let is_new_deployment = |acc: &BundleAccount| -> bool {
1897            let has_code_now = acc.info.as_ref().is_some_and(|info| info.code_hash != KECCAK_EMPTY);
1898            let had_no_code_before = acc
1899                .original_info
1900                .as_ref()
1901                .map(|info| info.code_hash == KECCAK_EMPTY)
1902                .unwrap_or(true);
1903            has_code_now && had_no_code_before
1904        };
1905
1906        let bytecodes_changed =
1907            output.state.state.values().filter(|acc| is_new_deployment(acc)).count();
1908
1909        // Unique new code hashes to count actual bytes persisted (deduplicated)
1910        let unique_new_code_hashes: B256Set = output
1911            .state
1912            .state
1913            .values()
1914            .filter(|acc| is_new_deployment(acc))
1915            .filter_map(|acc| acc.info.as_ref().map(|info| info.code_hash))
1916            .collect();
1917        let code_bytes_written: usize = unique_new_code_hashes
1918            .iter()
1919            .filter_map(|hash| {
1920                output.state.contracts.get(hash).map(|bytecode| bytecode.original_bytes().len())
1921            })
1922            .sum();
1923
1924        // Total time spent fetching state during execution
1925        let state_read_duration = provider_stats.total_account_fetch_latency() +
1926            provider_stats.total_storage_fetch_latency() +
1927            provider_stats.total_code_fetch_latency();
1928
1929        // EIP-7702 delegation tracking from bytecode changes
1930        // Count new EIP-7702 bytecodes as delegations set
1931        let eip7702_delegations_set =
1932            output.state.contracts.values().filter(|bytecode| bytecode.is_eip7702()).count();
1933        // Delegations cleared: accounts where bytecode changed FROM EIP-7702 TO empty
1934        // This detects when an EIP-7702 delegation is removed by setting code to empty
1935        // Note: Clearing a delegation does NOT destroy the account - it just empties the
1936        // bytecode
1937        let eip7702_delegations_cleared = output
1938            .state
1939            .state
1940            .values()
1941            .filter(|acc| {
1942                // Check if original bytecode was EIP-7702
1943                let original_was_eip7702 = acc
1944                    .original_info
1945                    .as_ref()
1946                    .and_then(|info| info.code.as_ref())
1947                    .map(|bytecode| bytecode.is_eip7702())
1948                    .unwrap_or(false);
1949
1950                // Check if current code is empty (delegation cleared)
1951                let code_now_empty =
1952                    acc.info.as_ref().map(|info| info.code_hash == KECCAK_EMPTY).unwrap_or(false);
1953
1954                original_was_eip7702 && code_now_empty
1955            })
1956            .count();
1957
1958        // Get cache statistics for detailed block logging
1959        let (account_cache_hits, account_cache_misses) = cache_stats
1960            .as_ref()
1961            .map(|s| (s.account_hits(), s.account_misses()))
1962            .unwrap_or_default();
1963        let (storage_cache_hits, storage_cache_misses) = cache_stats
1964            .as_ref()
1965            .map(|s| (s.storage_hits(), s.storage_misses()))
1966            .unwrap_or_default();
1967        let (code_cache_hits, code_cache_misses) =
1968            cache_stats.as_ref().map(|s| (s.code_hits(), s.code_misses())).unwrap_or_default();
1969
1970        // Build execution timing stats for detailed block logging
1971        Box::new(ExecutionTimingStats {
1972            block_number: block.number(),
1973            block_hash: block.hash(),
1974            gas_used: output.result.gas_used,
1975            tx_count: block.transaction_count(),
1976            execution_duration,
1977            state_read_duration,
1978            state_hash_duration,
1979            accounts_read,
1980            storage_read,
1981            code_read,
1982            code_bytes_read,
1983            accounts_changed,
1984            accounts_deleted,
1985            storage_slots_changed,
1986            storage_slots_deleted,
1987            bytecodes_changed,
1988            code_bytes_written,
1989            eip7702_delegations_set,
1990            eip7702_delegations_cleared,
1991            account_cache_hits,
1992            account_cache_misses,
1993            storage_cache_hits,
1994            storage_cache_misses,
1995            code_cache_hits,
1996            code_cache_misses,
1997        })
1998    }
1999}
2000
2001/// Strategy describing how to compute the state root.
2002#[derive(derive_more::Debug, Clone)]
2003enum StateRootStrategy<N: NodePrimitives> {
2004    /// Use the state root task (background sparse trie computation).
2005    StateRootTask,
2006    /// Run the parallel state root computation on the calling thread.
2007    Parallel,
2008    /// Fall back to synchronous computation via the state provider.
2009    Synchronous,
2010    /// Custom state root computation strategy.
2011    Custom(#[debug(skip)] CustomStateRoot<N>),
2012}
2013
2014/// Type that validates the payloads processed by the engine.
2015///
2016/// This provides the necessary functions for validating/executing payloads/blocks.
2017pub trait EngineValidator<
2018    Types: PayloadTypes,
2019    N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
2020>: Send + Sync + 'static
2021{
2022    /// Validates the payload attributes with respect to the header.
2023    ///
2024    /// By default, this enforces that the payload attributes timestamp is greater than the
2025    /// timestamp according to:
2026    ///   > 7. Client software MUST ensure that payloadAttributes.timestamp is greater than
2027    ///   > timestamp
2028    ///   > of a block referenced by forkchoiceState.headBlockHash.
2029    ///
2030    /// See also: <https://github.com/ethereum/execution-apis/blob/main/src/engine/common.md#specification-1>
2031    fn validate_payload_attributes_against_header(
2032        &self,
2033        attr: &Types::PayloadAttributes,
2034        header: &N::BlockHeader,
2035    ) -> Result<(), InvalidPayloadAttributesError>;
2036
2037    /// Ensures that the given payload does not violate any consensus rules that concern the block's
2038    /// layout.
2039    ///
2040    /// This function must convert the payload into the executable block and pre-validate its
2041    /// fields.
2042    ///
2043    /// Implementers should ensure that the checks are done in the order that conforms with the
2044    /// engine-API specification.
2045    fn convert_payload_to_block(
2046        &self,
2047        payload: Types::ExecutionData,
2048    ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
2049
2050    /// Validates a payload received from engine API.
2051    fn validate_payload(
2052        &mut self,
2053        payload: Types::ExecutionData,
2054        ctx: TreeCtx<'_, N>,
2055    ) -> ValidationOutcome<N>;
2056
2057    /// Validates a block downloaded from the network.
2058    fn validate_block(
2059        &mut self,
2060        block: SealedBlock<N::Block>,
2061        ctx: TreeCtx<'_, N>,
2062    ) -> ValidationOutcome<N>;
2063
2064    /// Hook called after an executed block is inserted directly into the tree.
2065    ///
2066    /// This is invoked when blocks are inserted via `InsertExecutedBlock` (e.g., locally built
2067    /// blocks by sequencers) to allow implementations to update internal state such as caches.
2068    fn on_inserted_executed_block(
2069        &self,
2070        block: BuiltPayloadExecutedBlock<N>,
2071        state: &EngineApiTreeState<N>,
2072    ) -> ProviderResult<ExecutedBlock<N>>;
2073
2074    /// Returns [`SavedCache`] for the given block hash.
2075    fn cache_for(&self, _block_hash: B256) -> Option<SavedCache>;
2076
2077    /// Spawns a sparse trie pipeline and returns a handle for the payload builder.
2078    fn sparse_trie_handle_for(
2079        &self,
2080        parent_hash: B256,
2081        parent_state_root: B256,
2082        state: &EngineApiTreeState<N>,
2083    ) -> Option<StateRootHandle>;
2084}
2085
2086impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
2087where
2088    P: DatabaseProviderFactory<
2089            Provider: BlockReader
2090                          + StageCheckpointReader
2091                          + PruneCheckpointReader
2092                          + ChangeSetReader
2093                          + StorageChangeSetReader
2094                          + BlockNumReader
2095                          + StorageSettingsCache,
2096        > + BlockReader<Header = N::BlockHeader>
2097        + StateProviderFactory
2098        + StateReader
2099        + ChangeSetReader
2100        + BlockNumReader
2101        + HashedPostStateProvider
2102        + Clone
2103        + 'static,
2104    N: NodePrimitives,
2105    V: PayloadValidator<Types, Block = N::Block> + Clone,
2106    Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
2107    Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
2108{
2109    fn validate_payload_attributes_against_header(
2110        &self,
2111        attr: &Types::PayloadAttributes,
2112        header: &N::BlockHeader,
2113    ) -> Result<(), InvalidPayloadAttributesError> {
2114        self.validator.validate_payload_attributes_against_header(attr, header)
2115    }
2116
2117    fn convert_payload_to_block(
2118        &self,
2119        payload: Types::ExecutionData,
2120    ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
2121        let block = self.validator.convert_payload_to_block(payload)?;
2122        Ok(block)
2123    }
2124
2125    fn validate_payload(
2126        &mut self,
2127        payload: Types::ExecutionData,
2128        ctx: TreeCtx<'_, N>,
2129    ) -> ValidationOutcome<N> {
2130        self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
2131    }
2132
2133    fn validate_block(
2134        &mut self,
2135        block: SealedBlock<N::Block>,
2136        ctx: TreeCtx<'_, N>,
2137    ) -> ValidationOutcome<N> {
2138        self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
2139    }
2140
2141    fn on_inserted_executed_block(
2142        &self,
2143        block: BuiltPayloadExecutedBlock<N>,
2144        state: &EngineApiTreeState<N>,
2145    ) -> ProviderResult<ExecutedBlock<N>> {
2146        self.payload_processor.on_inserted_executed_block(
2147            block.recovered_block.block_with_parent(),
2148            &block.execution_output.state,
2149        );
2150
2151        let (lazy_overlay, anchor_hash) =
2152            Self::get_parent_lazy_overlay(block.recovered_block.parent_hash(), state);
2153        let overlay_factory = OverlayStateProviderFactory::new(
2154            self.provider.clone(),
2155            OverlayBuilder::<N>::new(anchor_hash, self.changeset_cache.clone())
2156                .with_lazy_overlay(lazy_overlay),
2157        );
2158        let changeset_provider = overlay_factory.database_provider_ro()?;
2159
2160        Ok(self.spawn_deferred_trie_task(
2161            block.recovered_block,
2162            block.execution_output,
2163            state,
2164            LazyHashedPostState::ready(block.hashed_state),
2165            block.trie_updates,
2166            changeset_provider,
2167        ))
2168    }
2169
2170    fn cache_for(&self, block_hash: B256) -> Option<SavedCache> {
2171        Some(self.payload_processor.cache_for(block_hash))
2172    }
2173
2174    fn sparse_trie_handle_for(
2175        &self,
2176        parent_hash: B256,
2177        parent_state_root: B256,
2178        state: &EngineApiTreeState<N>,
2179    ) -> Option<StateRootHandle> {
2180        let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, state);
2181        let overlay_factory = OverlayStateProviderFactory::new(
2182            self.provider.clone(),
2183            OverlayBuilder::<N>::new(anchor_hash, self.changeset_cache.clone())
2184                .with_lazy_overlay(lazy_overlay),
2185        );
2186
2187        Some(self.payload_processor.spawn_state_root(
2188            overlay_factory,
2189            parent_state_root,
2190            // Full proof workers — tx count unknown at FCU time (block built incrementally)
2191            false,
2192            &self.config,
2193        ))
2194    }
2195}
2196
2197impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
2198where
2199    Evm: ConfigureEvm,
2200{
2201    fn wait_for_caches(&self) -> CacheWaitDurations {
2202        self.payload_processor.wait_for_caches()
2203    }
2204}
2205
2206/// Enum representing either block or payload being validated.
2207#[derive(Debug, Clone)]
2208pub enum BlockOrPayload<T: PayloadTypes> {
2209    /// Payload.
2210    Payload(T::ExecutionData),
2211    /// Block.
2212    Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
2213}
2214
2215impl<T: PayloadTypes> BlockOrPayload<T> {
2216    /// Returns the hash of the block.
2217    pub fn hash(&self) -> B256 {
2218        match self {
2219            Self::Payload(payload) => payload.block_hash(),
2220            Self::Block(block) => block.hash(),
2221        }
2222    }
2223
2224    /// Returns the number and hash of the block.
2225    pub fn num_hash(&self) -> NumHash {
2226        match self {
2227            Self::Payload(payload) => payload.num_hash(),
2228            Self::Block(block) => block.num_hash(),
2229        }
2230    }
2231
2232    /// Returns the parent hash of the block.
2233    pub fn parent_hash(&self) -> B256 {
2234        match self {
2235            Self::Payload(payload) => payload.parent_hash(),
2236            Self::Block(block) => block.parent_hash(),
2237        }
2238    }
2239
2240    /// Returns [`BlockWithParent`] for the block.
2241    pub fn block_with_parent(&self) -> BlockWithParent {
2242        match self {
2243            Self::Payload(payload) => payload.block_with_parent(),
2244            Self::Block(block) => block.block_with_parent(),
2245        }
2246    }
2247
2248    /// Returns a string showing whether or not this is a block or payload.
2249    pub const fn type_name(&self) -> &'static str {
2250        match self {
2251            Self::Payload(_) => "payload",
2252            Self::Block(_) => "block",
2253        }
2254    }
2255
2256    /// Returns true if this is a payload.
2257    pub const fn is_payload(&self) -> bool {
2258        matches!(self, Self::Payload(_))
2259    }
2260
2261    /// Returns true if this is a block.
2262    pub const fn is_block(&self) -> bool {
2263        matches!(self, Self::Block(_))
2264    }
2265
2266    /// Returns the decoded block access list, if present and successfully decoded.
2267    pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
2268        match self {
2269            Self::Payload(payload) => payload
2270                .block_access_list()
2271                .map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
2272                .transpose(),
2273            Self::Block(_) => Ok(None),
2274        }
2275    }
2276
2277    /// Returns the number of transactions in the payload or block.
2278    pub fn transaction_count(&self) -> usize
2279    where
2280        T::ExecutionData: ExecutionPayload,
2281    {
2282        match self {
2283            Self::Payload(payload) => payload.transaction_count(),
2284            Self::Block(block) => block.transaction_count(),
2285        }
2286    }
2287
2288    /// Returns the withdrawals from the payload or block.
2289    pub fn withdrawals(&self) -> Option<&[Withdrawal]>
2290    where
2291        T::ExecutionData: ExecutionPayload,
2292    {
2293        match self {
2294            Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
2295            Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
2296        }
2297    }
2298
2299    /// Returns the total gas used by the block.
2300    pub fn gas_used(&self) -> u64
2301    where
2302        T::ExecutionData: ExecutionPayload,
2303    {
2304        match self {
2305            Self::Payload(payload) => payload.gas_used(),
2306            Self::Block(block) => block.gas_used(),
2307        }
2308    }
2309
2310    /// Returns the gas limit used by the block.
2311    pub fn gas_limit(&self) -> u64
2312    where
2313        T::ExecutionData: ExecutionPayload,
2314    {
2315        match self {
2316            Self::Payload(payload) => payload.gas_limit(),
2317            Self::Block(block) => block.gas_limit(),
2318        }
2319    }
2320}
2321
2322/// Input for [`CustomStateRoot`].
2323#[derive(Debug, Clone)]
2324pub struct CustomStateRootInput<'a, N: NodePrimitives> {
2325    /// The block being validated.
2326    pub block: &'a SealedBlock<N::Block>,
2327    /// The parent block.
2328    pub parent_block: &'a SealedHeader<N::BlockHeader>,
2329    /// The execution output.
2330    pub output: &'a BlockExecutionOutput<N::Receipt>,
2331    /// The hashed state.
2332    pub hashed_state: &'a LazyHashedPostState,
2333}
2334
2335/// A custom state root computation handler.
2336pub type CustomStateRoot<N> = Arc<
2337    dyn Fn(CustomStateRootInput<'_, N>) -> ProviderResult<(B256, TrieUpdates)>
2338        + Send
2339        + Sync
2340        + 'static,
2341>;