Skip to main content

reth_engine_tree/tree/
payload_validator.rs

1//! Types and traits for validating blocks and payloads.
2//!
3//! # Validation pipeline
4//!
5//! When the engine processes a new payload (`engine_newPayload`), validation happens in phases:
6//!
7//! ## Phase 1 – Payload conversion
8//! [`PayloadValidator::convert_payload_to_block`] decodes the execution payload (RLP, hashing)
9//! into a [`SealedBlock`]. This runs on a background thread concurrently with state setup.
10//!
11//! ## Phase 2 – Pre-execution consensus
12//! - [`HeaderValidator::validate_header`] — standalone header checks (hash, gas, base fee,
13//!   fork-specific fields)
14//! - [`Consensus::validate_block_pre_execution`] — body vs header (tx root, ommer hash, withdrawals
15//!   root)
16//! - [`HeaderValidator::validate_header_against_parent`] — sequential checks (block number,
17//!   timestamp, gas limit, base fee vs parent)
18//!
19//! ## Phase 3 – Execution
20//! Block transactions are executed via the EVM. Receipt roots are computed incrementally.
21//!
22//! ## Phase 4 – Post-execution consensus
23//! - [`FullConsensus::validate_block_post_execution`] — gas used, receipt root, logs bloom,
24//!   requests hash
25//! - [`PayloadValidator::validate_block_post_execution_with_hashed_state`] — network-specific
26//!   (no-op on L1, used by OP Stack)
27//!
28//! ## Payload attributes validation (`engine_forkchoiceUpdated`)
29//! When the CL provides payload attributes to start building a block:
30//! - [`PayloadValidator::validate_payload_attributes_against_header`] — ensures timestamp ordering
31//!
32//! If validation passes, a payload build job is started. If it fails,
33//! `INVALID_PAYLOAD_ATTRIBUTES` is returned without rolling back the forkchoice update.
34//!
35//! [`HeaderValidator::validate_header`]: reth_consensus::HeaderValidator::validate_header
36//! [`Consensus::validate_block_pre_execution`]: reth_consensus::Consensus::validate_block_pre_execution
37//! [`HeaderValidator::validate_header_against_parent`]: reth_consensus::HeaderValidator::validate_header_against_parent
38//! [`FullConsensus::validate_block_post_execution`]: reth_consensus::FullConsensus::validate_block_post_execution
39//! [`SealedBlock`]: reth_primitives_traits::SealedBlock
40
41use crate::tree::{
42    error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
43    instrumented_state::{InstrumentedStateProvider, StateProviderStats},
44    payload_processor::PayloadProcessor,
45    precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
46    sparse_trie::StateRootComputeOutcome,
47    CacheWaitDurations, CachedStateProvider, EngineApiMetrics, EngineApiTreeState, ExecutionEnv,
48    PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
49};
50use alloy_consensus::transaction::{Either, TxHashRef};
51use alloy_eip7928::BlockAccessList;
52use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
53use alloy_evm::Evm;
54use alloy_primitives::{map::B256Set, B256};
55#[cfg(feature = "trie-debug")]
56use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
57
58use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
59use reth_chain_state::{
60    CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, ExecutionTimingStats, LazyOverlay,
61};
62use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
63use reth_engine_primitives::{
64    ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
65};
66use reth_errors::{BlockExecutionError, ProviderResult};
67use reth_evm::{
68    block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
69    OnStateHook, SpecFor,
70};
71use reth_execution_cache::CacheStats;
72use reth_payload_primitives::{
73    BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
74};
75use reth_primitives_traits::{
76    AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
77    RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
78};
79use reth_provider::{
80    providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
81    ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
82    ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
83    StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
84};
85use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
86use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState, StateRoot};
87use reth_trie_db::ChangesetCache;
88use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
89use revm_primitives::{Address, KECCAK_EMPTY};
90use std::{
91    collections::HashMap,
92    panic::{self, AssertUnwindSafe},
93    sync::{
94        atomic::{AtomicUsize, Ordering},
95        mpsc::RecvTimeoutError,
96        Arc,
97    },
98    time::Duration,
99};
100use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
101
102/// Output of block or payload validation.
103pub type ValidationOutcome<N, E = InsertPayloadError<BlockTy<N>>> =
104    Result<(ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>), E>;
105
106/// Handle to a [`HashedPostState`] computed on a background thread.
107type LazyHashedPostState = reth_tasks::LazyHandle<HashedPostState>;
108
109/// Result type for block validation with optional timing stats.
110type InsertPayloadResult<N> = Result<
111    (ExecutedBlock<N>, Option<Box<ExecutionTimingStats>>),
112    InsertPayloadError<<N as NodePrimitives>::Block>,
113>;
114
115/// Context providing access to tree state during validation.
116///
117/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
118/// internals
119pub struct TreeCtx<'a, N: NodePrimitives> {
120    /// The engine API tree state
121    state: &'a mut EngineApiTreeState<N>,
122    /// Reference to the canonical in-memory state
123    canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
124}
125
126impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("TreeCtx")
129            .field("state", &"EngineApiTreeState")
130            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
131            .finish()
132    }
133}
134
135impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
136    /// Creates a new tree context
137    pub const fn new(
138        state: &'a mut EngineApiTreeState<N>,
139        canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
140    ) -> Self {
141        Self { state, canonical_in_memory_state }
142    }
143}
144
145impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
146    /// Returns a reference to the engine tree state
147    pub const fn state(&self) -> &EngineApiTreeState<N> {
148        &*self.state
149    }
150
151    /// Returns a mutable reference to the engine tree state
152    pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
153        self.state
154    }
155
156    /// Returns a reference to the canonical in-memory state
157    pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
158        self.canonical_in_memory_state
159    }
160}
161
162/// A helper type that provides reusable payload validation logic for network-specific validators.
163///
164/// This type satisfies [`EngineValidator`] and is responsible for executing blocks/payloads.
165///
166/// This type contains common validation, execution, and state root computation logic that can be
167/// used by network-specific payload validators (e.g., Ethereum, Optimism). It is not meant to be
168/// used as a standalone component, but rather as a building block for concrete implementations.
169#[derive(derive_more::Debug)]
170pub struct BasicEngineValidator<P, Evm, V>
171where
172    Evm: ConfigureEvm,
173{
174    /// Provider for database access.
175    provider: P,
176    /// Consensus implementation for validation.
177    consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
178    /// EVM configuration.
179    evm_config: Evm,
180    /// Configuration for the tree.
181    config: TreeConfig,
182    /// Payload processor for state root computation.
183    payload_processor: PayloadProcessor<Evm>,
184    /// Precompile cache map.
185    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
186    /// Precompile cache metrics.
187    precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
188    /// Hook to call when invalid blocks are encountered.
189    #[debug(skip)]
190    invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
191    /// Metrics for the engine api.
192    metrics: EngineApiMetrics,
193    /// Validator for the payload.
194    validator: V,
195    /// Changeset cache for in-memory trie changesets
196    changeset_cache: ChangesetCache,
197    /// Task runtime for spawning parallel work.
198    runtime: reth_tasks::Runtime,
199}
200
201impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
202where
203    N: NodePrimitives,
204    P: DatabaseProviderFactory<
205            Provider: BlockReader
206                          + StageCheckpointReader
207                          + PruneCheckpointReader
208                          + ChangeSetReader
209                          + StorageChangeSetReader
210                          + BlockNumReader
211                          + StorageSettingsCache,
212        > + BlockReader<Header = N::BlockHeader>
213        + ChangeSetReader
214        + BlockNumReader
215        + StateProviderFactory
216        + StateReader
217        + HashedPostStateProvider
218        + Clone
219        + 'static,
220    Evm: ConfigureEvm<Primitives = N> + 'static,
221{
222    /// Creates a new `TreePayloadValidator`.
223    #[allow(clippy::too_many_arguments)]
224    pub fn new(
225        provider: P,
226        consensus: Arc<dyn FullConsensus<N>>,
227        evm_config: Evm,
228        validator: V,
229        config: TreeConfig,
230        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
231        changeset_cache: ChangesetCache,
232        runtime: reth_tasks::Runtime,
233    ) -> Self {
234        let precompile_cache_map = PrecompileCacheMap::default();
235        let payload_processor = PayloadProcessor::new(
236            runtime.clone(),
237            evm_config.clone(),
238            &config,
239            precompile_cache_map.clone(),
240        );
241        Self {
242            provider,
243            consensus,
244            evm_config,
245            payload_processor,
246            precompile_cache_map,
247            precompile_cache_metrics: HashMap::new(),
248            config,
249            invalid_block_hook,
250            metrics: EngineApiMetrics::default(),
251            validator,
252            changeset_cache,
253            runtime,
254        }
255    }
256
257    /// Converts a [`BlockOrPayload`] to a recovered block.
258    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
259    pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
260        &self,
261        input: BlockOrPayload<T>,
262    ) -> Result<SealedBlock<N::Block>, NewPayloadError>
263    where
264        V: PayloadValidator<T, Block = N::Block>,
265    {
266        match input {
267            BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
268            BlockOrPayload::Block(block) => Ok(block),
269        }
270    }
271
272    /// Returns EVM environment for the given payload or block.
273    pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
274        &self,
275        input: &BlockOrPayload<T>,
276    ) -> Result<EvmEnvFor<Evm>, Evm::Error>
277    where
278        V: PayloadValidator<T, Block = N::Block>,
279        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
280    {
281        match input {
282            BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
283            BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
284        }
285    }
286
287    /// Returns [`ExecutableTxIterator`] for the given payload or block.
288    pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
289        &'a self,
290        input: &'a BlockOrPayload<T>,
291    ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
292    where
293        V: PayloadValidator<T, Block = N::Block>,
294        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
295    {
296        Ok(match input {
297            BlockOrPayload::Payload(payload) => {
298                let iter = self
299                    .evm_config
300                    .tx_iterator_for_payload(payload)
301                    .map_err(NewPayloadError::other)?;
302                Either::Left(iter)
303            }
304            BlockOrPayload::Block(block) => {
305                let txs = block.body().clone_transactions();
306                let convert = |tx: N::SignedTx| tx.try_into_recovered();
307                Either::Right((txs, convert))
308            }
309        })
310    }
311
312    /// Returns a [`ExecutionCtxFor`] for the given payload or block.
313    pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
314        &self,
315        input: &'a BlockOrPayload<T>,
316    ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
317    where
318        V: PayloadValidator<T, Block = N::Block>,
319        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
320    {
321        match input {
322            BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
323            BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
324        }
325    }
326
327    /// Handles execution errors by checking if header validation errors should take precedence.
328    ///
329    /// When an execution error occurs, this function checks if there are any header validation
330    /// errors that should be reported instead, as header validation errors have higher priority.
331    fn handle_execution_error<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
332        &self,
333        input: BlockOrPayload<T>,
334        execution_err: InsertBlockErrorKind,
335        parent_block: &SealedHeader<N::BlockHeader>,
336    ) -> InsertPayloadResult<N>
337    where
338        V: PayloadValidator<T, Block = N::Block>,
339    {
340        debug!(
341            target: "engine::tree::payload_validator",
342            ?execution_err,
343            block = ?input.num_hash(),
344            "Block execution failed, checking for header validation errors"
345        );
346
347        // If execution failed, we should first check if there are any header validation
348        // errors that take precedence over the execution error
349        let block = self.convert_to_block(input)?;
350
351        // Validate block consensus rules which includes header validation
352        if let Err(consensus_err) = self.validate_block_inner(&block, None) {
353            // Header validation error takes precedence over execution error
354            return Err(InsertBlockError::new(block, consensus_err.into()).into())
355        }
356
357        // Also validate against the parent
358        if let Err(consensus_err) =
359            self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
360        {
361            // Parent validation error takes precedence over execution error
362            return Err(InsertBlockError::new(block, consensus_err.into()).into())
363        }
364
365        // No header validation errors, return the original execution error
366        Err(InsertBlockError::new(block, execution_err).into())
367    }
368
369    /// Validates a block that has already been converted from a payload.
370    ///
371    /// This method performs:
372    /// - Consensus validation
373    /// - Block execution
374    /// - State root computation
375    /// - Fork detection
376    #[instrument(
377        level = "debug",
378        target = "engine::tree::payload_validator",
379        skip_all,
380        fields(
381            parent = ?input.parent_hash(),
382            type_name = ?input.type_name(),
383        )
384    )]
385    pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
386        &mut self,
387        input: BlockOrPayload<T>,
388        mut ctx: TreeCtx<'_, N>,
389    ) -> InsertPayloadResult<N>
390    where
391        V: PayloadValidator<T, Block = N::Block> + Clone,
392        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
393    {
394        // Spawn payload conversion on a background thread so it runs concurrently with the
395        // rest of the function (setup + execution). For payloads this overlaps the cost of
396        // RLP decoding + header hashing.
397        let is_payload = matches!(&input, BlockOrPayload::Payload(_));
398        let convert_to_block = match &input {
399            BlockOrPayload::Payload(_) => {
400                let payload_clone = input.clone();
401                let validator = self.validator.clone();
402                let handle = self.payload_processor.executor().spawn_blocking_named(
403                    "payload-convert",
404                    move || {
405                        let BlockOrPayload::Payload(payload) = payload_clone else {
406                            unreachable!()
407                        };
408                        validator.convert_payload_to_block(payload)
409                    },
410                );
411                Either::Left(handle)
412            }
413            BlockOrPayload::Block(_) => Either::Right(()),
414        };
415
416        // Returns the sealed block, either by awaiting the background conversion task (for
417        // payloads) or by extracting the already-converted block directly.
418        let convert_to_block =
419            move |input: BlockOrPayload<T>| -> Result<SealedBlock<N::Block>, NewPayloadError> {
420                match convert_to_block {
421                    Either::Left(handle) => handle.try_into_inner().expect("sole handle"),
422                    Either::Right(()) => {
423                        let BlockOrPayload::Block(block) = input else { unreachable!() };
424                        Ok(block)
425                    }
426                }
427            };
428
429        /// A helper macro that returns the block in case there was an error
430        /// This macro is used for early returns before block conversion
431        macro_rules! ensure_ok {
432            ($expr:expr) => {
433                match $expr {
434                    Ok(val) => val,
435                    Err(e) => {
436                        let block = convert_to_block(input)?;
437                        return Err(InsertBlockError::new(block, e.into()).into())
438                    }
439                }
440            };
441        }
442
443        /// A helper macro for handling errors after the input has been converted to a block
444        macro_rules! ensure_ok_post_block {
445            ($expr:expr, $block:expr) => {
446                match $expr {
447                    Ok(val) => val,
448                    Err(e) => {
449                        return Err(
450                            InsertBlockError::new($block.into_sealed_block(), e.into()).into()
451                        )
452                    }
453                }
454            };
455        }
456
457        let parent_hash = input.parent_hash();
458
459        trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
460        let _enter =
461            debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
462        let Some(provider_builder) =
463            ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
464        else {
465            // this is pre-validated in the tree
466            return Err(InsertBlockError::new(
467                convert_to_block(input)?,
468                ProviderError::HeaderNotFound(parent_hash.into()).into(),
469            )
470            .into())
471        };
472        let mut state_provider = ensure_ok!(provider_builder.build());
473        drop(_enter);
474
475        // Fetch parent block. This goes to memory most of the time unless the parent block is
476        // beyond the in-memory buffer.
477        let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
478        else {
479            return Err(InsertBlockError::new(
480                convert_to_block(input)?,
481                ProviderError::HeaderNotFound(parent_hash.into()).into(),
482            )
483            .into())
484        };
485
486        let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
487            .in_scope(|| self.evm_env_for(&input))
488            .map_err(NewPayloadError::other)?;
489
490        let env = ExecutionEnv {
491            evm_env,
492            hash: input.hash(),
493            parent_hash: input.parent_hash(),
494            parent_state_root: parent_block.state_root(),
495            transaction_count: input.transaction_count(),
496            gas_used: input.gas_used(),
497            withdrawals: input.withdrawals().map(|w| w.to_vec()),
498        };
499
500        // Plan the strategy used for state root computation.
501        let strategy = self.plan_state_root_computation();
502
503        debug!(
504            target: "engine::tree::payload_validator",
505            ?strategy,
506            "Decided which state root algorithm to run"
507        );
508
509        // Get an iterator over the transactions in the payload
510        let txs = self.tx_iterator_for(&input)?;
511
512        // Extract the BAL, if valid and available
513        let block_access_list = ensure_ok!(input
514            .block_access_list()
515            .transpose()
516            // Eventually gets converted to a `InsertBlockErrorKind::Other`
517            .map_err(Box::<dyn std::error::Error + Send + Sync>::from))
518        .map(Arc::new);
519
520        // Create lazy overlay from ancestors - this doesn't block, allowing execution to start
521        // before the trie data is ready. The overlay will be computed on first access.
522        let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
523
524        // Create overlay factory for payload processor (StateRootTask path needs it for
525        // multiproofs)
526        let overlay_factory =
527            OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
528                .with_block_hash(Some(anchor_hash))
529                .with_lazy_overlay(lazy_overlay);
530
531        // Spawn the appropriate processor based on strategy
532        let mut handle = ensure_ok!(self.spawn_payload_processor(
533            env.clone(),
534            txs,
535            provider_builder,
536            overlay_factory.clone(),
537            strategy,
538            block_access_list,
539        ));
540
541        // Create optional cache stats for detailed block logging
542        let slow_block_enabled = self.config.slow_block_threshold().is_some();
543        let cache_stats = slow_block_enabled.then(|| Arc::new(CacheStats::default()));
544
545        // Use cached state provider before executing, used in execution after prewarming threads
546        // complete
547        if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
548            state_provider = Box::new(
549                CachedStateProvider::new(state_provider, caches, cache_metrics)
550                    .with_cache_stats(cache_stats.clone()),
551            );
552        };
553
554        let state_provider_stats = if slow_block_enabled || self.config.state_provider_metrics() {
555            let instrumented_state_provider =
556                InstrumentedStateProvider::new(state_provider, "engine");
557            let stats = slow_block_enabled.then(|| instrumented_state_provider.stats());
558            state_provider = Box::new(instrumented_state_provider);
559            stats
560        } else {
561            None
562        };
563
564        // Execute the block and handle any execution errors.
565        // The receipt root task is spawned before execution and receives receipts incrementally
566        // as transactions complete, allowing parallel computation during execution.
567        let execute_block_start = Instant::now();
568        let (output, senders, receipt_root_rx) =
569            match self.execute_block(state_provider, env, &input, &mut handle) {
570                Ok(output) => output,
571                Err(err) => return self.handle_execution_error(input, err, &parent_block),
572            };
573        let execution_duration = execute_block_start.elapsed();
574
575        // After executing the block we can stop prewarming transactions
576        handle.stop_prewarming_execution();
577
578        // Create ExecutionOutcome early so we can terminate caching before validation and state
579        // root computation. Using Arc allows sharing with both the caching task and the deferred
580        // trie task without cloning the expensive BundleState.
581        let output = Arc::new(output);
582
583        // Terminate caching task early since execution is complete and caching is no longer
584        // needed. This frees up resources while state root computation continues.
585        let valid_block_tx = handle.terminate_caching(Some(output.clone()));
586
587        // Spawn hashed post state computation in background so it runs concurrently with
588        // block conversion and receipt root computation. This is a pure CPU-bound task
589        // (keccak256 hashing of all changed addresses and storage slots).
590        let hashed_state_output = output.clone();
591        let hashed_state_provider = self.provider.clone();
592        let hashed_state: LazyHashedPostState =
593            self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
594                let _span = debug_span!(
595                    target: "engine::tree::payload_validator",
596                    "hashed_post_state",
597                )
598                .entered();
599                hashed_state_provider.hashed_post_state(&hashed_state_output.state)
600            });
601
602        let block = convert_to_block(input)?;
603        let transaction_root = is_payload.then(|| {
604            let body = block.body().clone();
605            let parent_span = Span::current();
606            let num_hash = block.num_hash();
607            self.payload_processor.executor().spawn_blocking_named("payload-tx-root", move || {
608                let _span =
609                    debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
610                        .entered();
611                body.calculate_tx_root()
612            })
613        });
614        let block = block.with_senders(senders);
615
616        // Wait for the receipt root computation to complete.
617        let receipt_root_bloom = {
618            let _enter = debug_span!(
619                target: "engine::tree::payload_validator",
620                "wait_receipt_root",
621            )
622            .entered();
623
624            receipt_root_rx
625                .blocking_recv()
626                .inspect_err(|_| {
627                    tracing::error!(
628                        target: "engine::tree::payload_validator",
629                        "Receipt root task dropped sender without result, receipt root calculation likely aborted"
630                    );
631                })
632                .ok()
633        };
634        let transaction_root = transaction_root.map(|handle| {
635            let _span =
636                debug_span!(target: "engine::tree::payload_validator", "wait_payload_tx_root")
637                    .entered();
638            handle.try_into_inner().expect("sole handle")
639        });
640
641        let hashed_state = ensure_ok_post_block!(
642            self.validate_post_execution(
643                &block,
644                &parent_block,
645                &output,
646                &mut ctx,
647                transaction_root,
648                receipt_root_bloom,
649                hashed_state,
650            ),
651            block
652        );
653
654        let root_time = Instant::now();
655        let mut maybe_state_root = None;
656        let mut state_root_task_failed = false;
657        #[cfg(feature = "trie-debug")]
658        let mut trie_debug_recorders = Vec::new();
659
660        match strategy {
661            StateRootStrategy::StateRootTask => {
662                debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
663
664                let task_result = ensure_ok_post_block!(
665                    self.await_state_root_with_timeout(
666                        &mut handle,
667                        overlay_factory.clone(),
668                        &hashed_state,
669                    ),
670                    block
671                );
672
673                match task_result {
674                    Ok(StateRootComputeOutcome {
675                        state_root,
676                        trie_updates,
677                        #[cfg(feature = "trie-debug")]
678                        debug_recorders,
679                    }) => {
680                        let elapsed = root_time.elapsed();
681                        info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
682
683                        #[cfg(feature = "trie-debug")]
684                        {
685                            trie_debug_recorders = debug_recorders;
686                        }
687
688                        // Compare trie updates with serial computation if configured
689                        if self.config.always_compare_trie_updates() {
690                            let _has_diff = self.compare_trie_updates_with_serial(
691                                overlay_factory.clone(),
692                                &hashed_state,
693                                trie_updates.as_ref().clone(),
694                            );
695                            #[cfg(feature = "trie-debug")]
696                            if _has_diff {
697                                Self::write_trie_debug_recorders(
698                                    block.header().number(),
699                                    &trie_debug_recorders,
700                                );
701                            }
702                        }
703
704                        // we double check the state root here for good measure
705                        if state_root == block.header().state_root() {
706                            maybe_state_root = Some((state_root, trie_updates, elapsed))
707                        } else {
708                            warn!(
709                                target: "engine::tree::payload_validator",
710                                ?state_root,
711                                block_state_root = ?block.header().state_root(),
712                                "State root task returned incorrect state root"
713                            );
714                            #[cfg(feature = "trie-debug")]
715                            Self::write_trie_debug_recorders(
716                                block.header().number(),
717                                &trie_debug_recorders,
718                            );
719                            state_root_task_failed = true;
720                        }
721                    }
722                    Err(error) => {
723                        debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
724                        state_root_task_failed = true;
725                    }
726                }
727            }
728            StateRootStrategy::Parallel => {
729                debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
730                match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
731                    Ok(result) => {
732                        let elapsed = root_time.elapsed();
733                        info!(
734                            target: "engine::tree::payload_validator",
735                            regular_state_root = ?result.0,
736                            ?elapsed,
737                            "Regular root task finished"
738                        );
739                        maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
740                    }
741                    Err(error) => {
742                        debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
743                    }
744                }
745            }
746            StateRootStrategy::Synchronous => {}
747        }
748
749        // Determine the state root.
750        // If the state root was computed in parallel, we use it.
751        // Otherwise, we fall back to computing it synchronously.
752        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
753            maybe_state_root
754        {
755            maybe_state_root
756        } else {
757            // fallback is to compute the state root regularly in sync
758            if self.config.state_root_fallback() {
759                debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
760            } else {
761                warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
762                self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
763            }
764
765            let (root, updates) = ensure_ok_post_block!(
766                Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
767                block
768            );
769
770            if state_root_task_failed {
771                self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
772            }
773
774            (root, Arc::new(updates), root_time.elapsed())
775        };
776
777        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
778        self.metrics
779            .record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
780        debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
781
782        // ensure state root matches
783        if state_root != block.header().state_root() {
784            #[cfg(feature = "trie-debug")]
785            Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
786
787            // call post-block hook
788            self.on_invalid_block(
789                &parent_block,
790                &block,
791                &output,
792                Some((&trie_output, state_root)),
793                ctx.state_mut(),
794            );
795            let block_state_root = block.header().state_root();
796            return Err(InsertBlockError::new(
797                block.into_sealed_block(),
798                ConsensusError::BodyStateRootDiff(
799                    GotExpected { got: state_root, expected: block_state_root }.into(),
800                )
801                .into(),
802            )
803            .into())
804        }
805
806        let timing_stats = state_provider_stats.map(|stats| {
807            self.calculate_timing_stats(
808                &block,
809                stats,
810                cache_stats,
811                &output,
812                execution_duration,
813                root_elapsed,
814            )
815        });
816
817        if let Some(valid_block_tx) = valid_block_tx {
818            let _ = valid_block_tx.send(());
819        }
820
821        // Create the overlay provider NOW, while we're on the engine loop thread and trie changeset
822        // eviction cannot race with us. If we deferred this to the background task, persistence
823        // could advance and evict changeset cache entries between factory creation and the task
824        // actually running, causing expensive DB fallback computations when building the overlay.
825        let changeset_provider =
826            ensure_ok_post_block!(overlay_factory.database_provider_ro(), block);
827
828        let executed_block = self.spawn_deferred_trie_task(
829            block,
830            output,
831            &ctx,
832            hashed_state,
833            trie_output,
834            changeset_provider,
835        );
836        Ok((executed_block, timing_stats))
837    }
838
839    /// Return sealed block header from database or in-memory state by hash.
840    fn sealed_header_by_hash(
841        &self,
842        hash: B256,
843        state: &EngineApiTreeState<N>,
844    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
845        // check memory first
846        let header = state.tree_state.sealed_header_by_hash(&hash);
847
848        if header.is_some() {
849            Ok(header)
850        } else {
851            self.provider.sealed_header_by_hash(hash)
852        }
853    }
854
855    /// Validate if block is correct and satisfies all the consensus rules that concern the header
856    /// and block body itself.
857    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
858    fn validate_block_inner(
859        &self,
860        block: &SealedBlock<N::Block>,
861        transaction_root: Option<B256>,
862    ) -> Result<(), ConsensusError> {
863        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
864            error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
865            return Err(e)
866        }
867
868        if let Err(e) =
869            self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
870        {
871            error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
872            return Err(e)
873        }
874
875        Ok(())
876    }
877
878    /// Executes a block with the given state provider.
879    ///
880    /// This method orchestrates block execution:
881    /// 1. Sets up the EVM with state database and precompile caching
882    /// 2. Spawns a background task for incremental receipt root computation
883    /// 3. Executes transactions with metrics collection via state hooks
884    /// 4. Merges state transitions and records execution metrics
885    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
886    #[expect(clippy::type_complexity)]
887    fn execute_block<S, Err, T>(
888        &mut self,
889        state_provider: S,
890        env: ExecutionEnv<Evm>,
891        input: &BlockOrPayload<T>,
892        handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
893    ) -> Result<
894        (
895            BlockExecutionOutput<N::Receipt>,
896            Vec<Address>,
897            tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
898        ),
899        InsertBlockErrorKind,
900    >
901    where
902        S: StateProvider + Send,
903        Err: core::error::Error + Send + Sync + 'static,
904        V: PayloadValidator<T, Block = N::Block>,
905        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
906        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
907    {
908        debug!(target: "engine::tree::payload_validator", "Executing block");
909
910        let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
911            State::builder()
912                .with_database(StateProviderDatabase::new(state_provider))
913                .with_bundle_update()
914                .build()
915        });
916
917        let (spec_id, mut executor) = {
918            let _span = debug_span!(target: "engine::tree", "create_evm").entered();
919            let spec_id = *env.evm_env.spec_id();
920            let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
921            let ctx = self
922                .execution_ctx_for(input)
923                .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
924            let executor = self.evm_config.create_executor(evm, ctx);
925            (spec_id, executor)
926        };
927
928        if !self.config.precompile_cache_disabled() {
929            let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
930            executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
931                |address, precompile| {
932                    let metrics = self
933                        .precompile_cache_metrics
934                        .entry(*address)
935                        .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
936                        .clone();
937                    CachedPrecompile::wrap(
938                        precompile,
939                        self.precompile_cache_map.cache_for_address(*address),
940                        spec_id,
941                        Some(metrics),
942                    )
943                },
944            );
945        }
946
947        // Spawn background task to compute receipt root and logs bloom incrementally.
948        // Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
949        let receipts_len = input.transaction_count();
950        let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
951        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
952        let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
953        self.payload_processor
954            .executor()
955            .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
956
957        let transaction_count = input.transaction_count();
958        let executed_tx_index = Arc::clone(handle.executed_tx_index());
959        let executor = executor.with_state_hook(
960            handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
961        );
962
963        let execution_start = Instant::now();
964
965        // Execute all transactions and finalize
966        let (executor, senders) = self.execute_transactions(
967            executor,
968            transaction_count,
969            handle.iter_transactions(),
970            &receipt_tx,
971            &executed_tx_index,
972        )?;
973        drop(receipt_tx);
974
975        // Finish execution and get the result
976        let post_exec_start = Instant::now();
977        let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
978            .in_scope(|| executor.finish())
979            .map(|(evm, result)| (evm.into_db(), result))?;
980        self.metrics.record_post_execution(post_exec_start.elapsed());
981
982        // Merge transitions into bundle state
983        debug_span!(target: "engine::tree", "merge_transitions")
984            .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
985
986        let output = BlockExecutionOutput { result, state: db.take_bundle() };
987
988        let execution_duration = execution_start.elapsed();
989        self.metrics.record_block_execution(&output, execution_duration);
990        self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
991        debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
992
993        Ok((output, senders, result_rx))
994    }
995
996    /// Executes transactions and collects senders, streaming receipts to a background task.
997    ///
998    /// This method handles:
999    /// - Applying pre-execution changes (e.g., beacon root updates)
1000    /// - Executing each transaction with timing metrics
1001    /// - Streaming receipts to the receipt root computation task
1002    /// - Collecting transaction senders for later use
1003    ///
1004    /// Returns the executor (for finalization) and the collected senders.
1005    fn execute_transactions<E, Tx, InnerTx, Err>(
1006        &self,
1007        mut executor: E,
1008        transaction_count: usize,
1009        transactions: impl Iterator<Item = Result<Tx, Err>>,
1010        receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
1011        executed_tx_index: &AtomicUsize,
1012    ) -> Result<(E, Vec<Address>), BlockExecutionError>
1013    where
1014        E: BlockExecutor<Receipt = N::Receipt>,
1015        Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
1016        InnerTx: TxHashRef,
1017        Err: core::error::Error + Send + Sync + 'static,
1018    {
1019        let mut senders = Vec::with_capacity(transaction_count);
1020
1021        // Apply pre-execution changes (e.g., beacon root update)
1022        let pre_exec_start = Instant::now();
1023        debug_span!(target: "engine::tree", "pre_execution")
1024            .in_scope(|| executor.apply_pre_execution_changes())?;
1025        self.metrics.record_pre_execution(pre_exec_start.elapsed());
1026
1027        // Execute transactions
1028        let exec_span = debug_span!(target: "engine::tree", "execution").entered();
1029        let mut transactions = transactions.into_iter();
1030        // Some executors may execute transactions that do not append receipts during the
1031        // main loop (e.g., system transactions whose receipts are added during finalization).
1032        // In that case, invoking the callback on every transaction would resend the previous
1033        // receipt with the same index and can panic the ordered root builder.
1034        let mut last_sent_len = 0usize;
1035        loop {
1036            // Measure time spent waiting for next transaction from iterator
1037            // (e.g., parallel signature recovery)
1038            let wait_start = Instant::now();
1039            let Some(tx_result) = transactions.next() else { break };
1040            self.metrics.record_transaction_wait(wait_start.elapsed());
1041
1042            let tx = tx_result.map_err(BlockExecutionError::other)?;
1043            let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
1044
1045            senders.push(tx_signer);
1046
1047            let _enter = debug_span!(
1048                target: "engine::tree",
1049                "execute tx",
1050                tx_index = senders.len() - 1,
1051            )
1052            .entered();
1053            trace!(target: "engine::tree", "Executing transaction");
1054
1055            let tx_start = Instant::now();
1056            executor.execute_transaction(tx)?;
1057            self.metrics.record_transaction_execution(tx_start.elapsed());
1058
1059            // advance the shared counter so prewarm workers skip already-executed txs
1060            executed_tx_index.store(senders.len(), Ordering::Relaxed);
1061
1062            let current_len = executor.receipts().len();
1063            if current_len > last_sent_len {
1064                last_sent_len = current_len;
1065                // Send the latest receipt to the background task for incremental root computation.
1066                if let Some(receipt) = executor.receipts().last() {
1067                    let tx_index = current_len - 1;
1068                    let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
1069                }
1070            }
1071        }
1072        drop(exec_span);
1073
1074        Ok((executor, senders))
1075    }
1076
1077    /// Compute state root for the given hashed post state in parallel.
1078    ///
1079    /// Uses an overlay factory which provides the state of the parent block, along with the
1080    /// [`HashedPostState`] containing the changes of this block, to compute the state root and
1081    /// trie updates for this block.
1082    ///
1083    /// # Returns
1084    ///
1085    /// Returns `Ok(_)` if computed successfully.
1086    /// Returns `Err(_)` if error was encountered during computation.
1087    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1088    fn compute_state_root_parallel(
1089        &self,
1090        overlay_factory: OverlayStateProviderFactory<P>,
1091        hashed_state: &LazyHashedPostState,
1092    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
1093        let hashed_state = hashed_state.get();
1094        // The `hashed_state` argument will be taken into account as part of the overlay, but we
1095        // need to use the prefix sets which were generated from it to indicate to the
1096        // ParallelStateRoot which parts of the trie need to be recomputed.
1097        let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1098        let overlay_factory =
1099            overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1100        ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
1101            .incremental_root_with_updates()
1102    }
1103
1104    /// Compute state root for the given hashed post state in serial.
1105    ///
1106    /// Uses an overlay factory which provides the state of the parent block, along with the
1107    /// [`HashedPostState`] containing the changes of this block, to compute the state root and
1108    /// trie updates for this block.
1109    fn compute_state_root_serial(
1110        overlay_factory: OverlayStateProviderFactory<P>,
1111        hashed_state: &LazyHashedPostState,
1112    ) -> ProviderResult<(B256, TrieUpdates)> {
1113        let hashed_state = hashed_state.get();
1114        // The `hashed_state` argument will be taken into account as part of the overlay, but we
1115        // need to use the prefix sets which were generated from it to indicate to the
1116        // StateRoot which parts of the trie need to be recomputed.
1117        let prefix_sets = hashed_state.construct_prefix_sets().freeze();
1118        let overlay_factory =
1119            overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
1120
1121        let provider = overlay_factory.database_provider_ro()?;
1122
1123        Ok(StateRoot::new(&provider, &provider)
1124            .with_prefix_sets(prefix_sets)
1125            .root_with_updates()?)
1126    }
1127
1128    /// Awaits the state root from the background task, with an optional timeout fallback.
1129    ///
1130    /// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
1131    /// state root task up to the timeout duration. If the task doesn't complete in time, a
1132    /// sequential state root computation is spawned via `spawn_blocking`. Both computations
1133    /// then race: the main thread polls the task receiver and the sequential result channel
1134    /// in a loop, returning whichever finishes first.
1135    ///
1136    /// If no timeout is configured, this simply awaits the state root task without any fallback.
1137    ///
1138    /// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
1139    /// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
1140    /// `Result` captures parallel state root task errors that can still fall back to serial.
1141    #[instrument(
1142        level = "debug",
1143        target = "engine::tree::payload_validator",
1144        name = "await_state_root",
1145        skip_all
1146    )]
1147    fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
1148        &self,
1149        handle: &mut PayloadHandle<Tx, Err, R>,
1150        overlay_factory: OverlayStateProviderFactory<P>,
1151        hashed_state: &LazyHashedPostState,
1152    ) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
1153        let Some(timeout) = self.config.state_root_task_timeout() else {
1154            return Ok(handle.state_root());
1155        };
1156
1157        let task_rx = handle.take_state_root_rx();
1158
1159        match task_rx.recv_timeout(timeout) {
1160            Ok(result) => Ok(result),
1161            Err(RecvTimeoutError::Disconnected) => {
1162                Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
1163            }
1164            Err(RecvTimeoutError::Timeout) => {
1165                warn!(
1166                    target: "engine::tree::payload_validator",
1167                    ?timeout,
1168                    "State root task timed out, spawning sequential fallback"
1169                );
1170                self.metrics.block_validation.state_root_task_timeout_total.increment(1);
1171
1172                let (seq_tx, seq_rx) =
1173                    std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
1174
1175                let seq_overlay = overlay_factory;
1176                let seq_hashed_state = hashed_state.clone();
1177                self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
1178                    let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
1179                    let _ = seq_tx.send(result);
1180                });
1181
1182                const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
1183
1184                loop {
1185                    match task_rx.recv_timeout(POLL_INTERVAL) {
1186                        Ok(result) => {
1187                            debug!(
1188                                target: "engine::tree::payload_validator",
1189                                source = "task",
1190                                "State root timeout race won"
1191                            );
1192                            return Ok(result);
1193                        }
1194                        Err(RecvTimeoutError::Disconnected) => {
1195                            debug!(
1196                                target: "engine::tree::payload_validator",
1197                                "State root task dropped, waiting for sequential fallback"
1198                            );
1199                            let result = seq_rx.recv().map_err(|_| {
1200                                ProviderError::other(std::io::Error::other(
1201                                    "both state root computations failed",
1202                                ))
1203                            })?;
1204                            let (state_root, trie_updates) = result?;
1205                            return Ok(Ok(StateRootComputeOutcome {
1206                                state_root,
1207                                trie_updates: Arc::new(trie_updates),
1208                                #[cfg(feature = "trie-debug")]
1209                                debug_recorders: Vec::new(),
1210                            }));
1211                        }
1212                        Err(RecvTimeoutError::Timeout) => {}
1213                    }
1214
1215                    if let Ok(result) = seq_rx.try_recv() {
1216                        debug!(
1217                            target: "engine::tree::payload_validator",
1218                            source = "sequential",
1219                            "State root timeout race won"
1220                        );
1221                        let (state_root, trie_updates) = result?;
1222                        return Ok(Ok(StateRootComputeOutcome {
1223                            state_root,
1224                            trie_updates: Arc::new(trie_updates),
1225                            #[cfg(feature = "trie-debug")]
1226                            debug_recorders: Vec::new(),
1227                        }));
1228                    }
1229                }
1230            }
1231        }
1232    }
1233
1234    /// Compares trie updates from the state root task with serial state root computation.
1235    ///
1236    /// This is used for debugging and validating the correctness of the parallel state root
1237    /// task implementation. When enabled via `--engine.state-root-task-compare-updates`, this
1238    /// method runs a separate serial state root computation and compares the resulting trie
1239    /// updates.
1240    fn compare_trie_updates_with_serial(
1241        &self,
1242        overlay_factory: OverlayStateProviderFactory<P>,
1243        hashed_state: &LazyHashedPostState,
1244        task_trie_updates: TrieUpdates,
1245    ) -> bool {
1246        debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1247
1248        match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
1249            Ok((serial_root, serial_trie_updates)) => {
1250                debug!(
1251                    target: "engine::tree::payload_validator",
1252                    ?serial_root,
1253                    "Serial state root computation finished for comparison"
1254                );
1255
1256                // Get a database provider to use as trie cursor factory
1257                match overlay_factory.database_provider_ro() {
1258                    Ok(provider) => {
1259                        match super::trie_updates::compare_trie_updates(
1260                            &provider,
1261                            task_trie_updates,
1262                            serial_trie_updates,
1263                        ) {
1264                            Ok(has_diff) => return has_diff,
1265                            Err(err) => {
1266                                warn!(
1267                                    target: "engine::tree::payload_validator",
1268                                    %err,
1269                                    "Error comparing trie updates"
1270                                );
1271                                return true;
1272                            }
1273                        }
1274                    }
1275                    Err(err) => {
1276                        warn!(
1277                            target: "engine::tree::payload_validator",
1278                            %err,
1279                            "Failed to get database provider for trie update comparison"
1280                        );
1281                    }
1282                }
1283            }
1284            Err(err) => {
1285                warn!(
1286                    target: "engine::tree::payload_validator",
1287                    %err,
1288                    "Failed to compute serial state root for comparison"
1289                );
1290            }
1291        }
1292        false
1293    }
1294
1295    /// Writes trie debug recorders to a JSON file for the given block number.
1296    ///
1297    /// The file is written to the current working directory as
1298    /// `trie_debug_block_{block_number}.json`.
1299    #[cfg(feature = "trie-debug")]
1300    fn write_trie_debug_recorders(
1301        block_number: u64,
1302        recorders: &[(Option<B256>, TrieDebugRecorder)],
1303    ) {
1304        let path = format!("trie_debug_block_{block_number}.json");
1305        match serde_json::to_string_pretty(recorders) {
1306            Ok(json) => match std::fs::write(&path, json) {
1307                Ok(()) => {
1308                    warn!(
1309                        target: "engine::tree::payload_validator",
1310                        %path,
1311                        "Wrote trie debug recorders to file"
1312                    );
1313                }
1314                Err(err) => {
1315                    warn!(
1316                        target: "engine::tree::payload_validator",
1317                        %err,
1318                        %path,
1319                        "Failed to write trie debug recorders"
1320                    );
1321                }
1322            },
1323            Err(err) => {
1324                warn!(
1325                    target: "engine::tree::payload_validator",
1326                    %err,
1327                    "Failed to serialize trie debug recorders"
1328                );
1329            }
1330        }
1331    }
1332
1333    /// Validates the block after execution.
1334    ///
1335    /// This performs:
1336    /// - parent header validation
1337    /// - post-execution consensus validation
1338    /// - state-root based post-execution validation
1339    ///
1340    /// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
1341    /// and logs bloom from the receipts.
1342    ///
1343    /// The `hashed_state` handle wraps the background hashed post state computation.
1344    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1345    #[expect(clippy::too_many_arguments)]
1346    fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1347        &self,
1348        block: &RecoveredBlock<N::Block>,
1349        parent_block: &SealedHeader<N::BlockHeader>,
1350        output: &BlockExecutionOutput<N::Receipt>,
1351        ctx: &mut TreeCtx<'_, N>,
1352        transaction_root: Option<B256>,
1353        receipt_root_bloom: Option<ReceiptRootBloom>,
1354        hashed_state: LazyHashedPostState,
1355    ) -> Result<LazyHashedPostState, InsertBlockErrorKind>
1356    where
1357        V: PayloadValidator<T, Block = N::Block>,
1358    {
1359        let start = Instant::now();
1360
1361        trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1362        // validate block consensus rules
1363        if let Err(e) = self.validate_block_inner(block, transaction_root) {
1364            return Err(e.into())
1365        }
1366
1367        // now validate against the parent
1368        let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
1369        if let Err(e) =
1370            self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
1371        {
1372            warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
1373            return Err(e.into())
1374        }
1375        drop(_enter);
1376
1377        // Validate block post-execution rules
1378        let _enter =
1379            debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1380                .entered();
1381        if let Err(err) =
1382            self.consensus.validate_block_post_execution(block, output, receipt_root_bloom)
1383        {
1384            // call post-block hook
1385            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1386            return Err(err.into())
1387        }
1388        drop(_enter);
1389
1390        // Wait for the background keccak256 hashing task to complete. This blocks until
1391        // all changed addresses and storage slots have been hashed.
1392        let hashed_state_ref =
1393            debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
1394                .in_scope(|| hashed_state.get());
1395
1396        let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
1397        if let Err(err) =
1398            self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, block)
1399        {
1400            // call post-block hook
1401            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1402            return Err(err.into())
1403        }
1404
1405        // record post-execution validation duration
1406        self.metrics
1407            .block_validation
1408            .post_execution_validation_duration
1409            .record(start.elapsed().as_secs_f64());
1410
1411        Ok(hashed_state)
1412    }
1413
1414    /// Spawns a payload processor task based on the state root strategy.
1415    ///
1416    /// This method determines how to execute the block and compute its state root based on
1417    /// the selected strategy:
1418    /// - `StateRootTask`: Uses a dedicated task for state root computation with proof generation
1419    /// - `Parallel`: Computes state root in parallel with block execution
1420    /// - `Synchronous`: Falls back to sequential execution and state root computation
1421    ///
1422    /// The method handles strategy fallbacks if the preferred approach fails, ensuring
1423    /// block execution always completes with a valid state root.
1424    ///
1425    /// # Arguments
1426    ///
1427    /// * `overlay_factory` - Pre-computed overlay factory for multiproof generation
1428    ///   (`StateRootTask`)
1429    #[allow(clippy::too_many_arguments)]
1430    #[instrument(
1431        level = "debug",
1432        target = "engine::tree::payload_validator",
1433        skip_all,
1434        fields(?strategy)
1435    )]
1436    fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1437        &mut self,
1438        env: ExecutionEnv<Evm>,
1439        txs: T,
1440        provider_builder: StateProviderBuilder<N, P>,
1441        overlay_factory: OverlayStateProviderFactory<P>,
1442        strategy: StateRootStrategy,
1443        block_access_list: Option<Arc<BlockAccessList>>,
1444    ) -> Result<
1445        PayloadHandle<
1446            impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1447            impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1448            N::Receipt,
1449        >,
1450        InsertBlockErrorKind,
1451    > {
1452        match strategy {
1453            StateRootStrategy::StateRootTask => {
1454                let spawn_start = Instant::now();
1455
1456                // Use the pre-computed overlay factory for multiproofs
1457                let handle = self.payload_processor.spawn(
1458                    env,
1459                    txs,
1460                    provider_builder,
1461                    overlay_factory,
1462                    &self.config,
1463                    block_access_list,
1464                );
1465
1466                // record prewarming initialization duration
1467                self.metrics
1468                    .block_validation
1469                    .spawn_payload_processor
1470                    .record(spawn_start.elapsed().as_secs_f64());
1471
1472                Ok(handle)
1473            }
1474            StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
1475                let start = Instant::now();
1476                let handle = self.payload_processor.spawn_cache_exclusive(
1477                    env,
1478                    txs,
1479                    provider_builder,
1480                    block_access_list,
1481                );
1482
1483                // Record prewarming initialization duration
1484                self.metrics
1485                    .block_validation
1486                    .spawn_payload_processor
1487                    .record(start.elapsed().as_secs_f64());
1488
1489                Ok(handle)
1490            }
1491        }
1492    }
1493
1494    /// Creates a `StateProviderBuilder` for the given parent hash.
1495    ///
1496    /// This method checks if the parent is in the tree state (in-memory) or persisted to disk,
1497    /// and creates the appropriate provider builder.
1498    fn state_provider_builder(
1499        &self,
1500        hash: B256,
1501        state: &EngineApiTreeState<N>,
1502    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1503        if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1504            debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1505            // the block leads back to the canonical chain
1506            return Ok(Some(StateProviderBuilder::new(
1507                self.provider.clone(),
1508                historical,
1509                Some(blocks),
1510            )))
1511        }
1512
1513        // Check if the block is persisted
1514        if let Some(header) = self.provider.header(hash)? {
1515            debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1516            // For persisted blocks, we create a builder that will fetch state directly from the
1517            // database
1518            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1519        }
1520
1521        debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1522        Ok(None)
1523    }
1524
1525    /// Determines the state root computation strategy based on configuration.
1526    ///
1527    /// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
1528    /// too expensive because it requires walking all paths in every proof.
1529    const fn plan_state_root_computation(&self) -> StateRootStrategy {
1530        if self.config.state_root_fallback() {
1531            StateRootStrategy::Synchronous
1532        } else if self.config.use_state_root_task() {
1533            StateRootStrategy::StateRootTask
1534        } else {
1535            StateRootStrategy::Parallel
1536        }
1537    }
1538
1539    /// Called when an invalid block is encountered during validation.
1540    fn on_invalid_block(
1541        &self,
1542        parent_header: &SealedHeader<N::BlockHeader>,
1543        block: &RecoveredBlock<N::Block>,
1544        output: &BlockExecutionOutput<N::Receipt>,
1545        trie_updates: Option<(&TrieUpdates, B256)>,
1546        state: &mut EngineApiTreeState<N>,
1547    ) {
1548        if state.invalid_headers.get(&block.hash()).is_some() {
1549            // we already marked this block as invalid
1550            return
1551        }
1552        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1553    }
1554
1555    /// Creates a [`LazyOverlay`] for the parent block without blocking.
1556    ///
1557    /// Returns a lazy overlay that will compute the trie input on first access, and the anchor
1558    /// block hash (the highest persisted ancestor). This allows execution to start immediately
1559    /// while the trie input computation is deferred until the overlay is actually needed.
1560    ///
1561    /// If parent is on disk (no in-memory blocks), returns `None` for the lazy overlay.
1562    ///
1563    /// Uses a cached overlay if available for the canonical head (the common case).
1564    fn get_parent_lazy_overlay(
1565        parent_hash: B256,
1566        state: &EngineApiTreeState<N>,
1567    ) -> (Option<LazyOverlay>, B256) {
1568        // Get blocks leading to the parent to determine the anchor
1569        let (anchor_hash, blocks) =
1570            state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
1571
1572        if blocks.is_empty() {
1573            debug!(target: "engine::tree::payload_validator", "Parent found on disk, no lazy overlay needed");
1574            return (None, anchor_hash);
1575        }
1576
1577        // Try to use the cached overlay if it matches both parent hash and anchor
1578        if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
1579            debug!(
1580                target: "engine::tree::payload_validator",
1581                %parent_hash,
1582                %anchor_hash,
1583                "Using cached canonical overlay"
1584            );
1585            return (Some(cached.overlay.clone()), cached.anchor_hash);
1586        }
1587
1588        debug!(
1589            target: "engine::tree::payload_validator",
1590            %anchor_hash,
1591            num_blocks = blocks.len(),
1592            "Creating lazy overlay for in-memory blocks"
1593        );
1594
1595        // Extract deferred trie data handles (non-blocking)
1596        let handles: Vec<DeferredTrieData> = blocks.iter().map(|b| b.trie_data_handle()).collect();
1597
1598        (Some(LazyOverlay::new(anchor_hash, handles)), anchor_hash)
1599    }
1600
1601    /// Spawns a background task to compute and sort trie data for the executed block.
1602    ///
1603    /// This function creates a [`DeferredTrieData`] handle with fallback inputs and spawns a
1604    /// blocking task that calls `wait_cloned()` to:
1605    /// 1. Sort the block's hashed state and trie updates
1606    /// 2. Merge ancestor overlays and extend with the sorted data
1607    /// 3. Create an [`AnchoredTrieInput`](reth_chain_state::AnchoredTrieInput) for efficient future
1608    ///    trie computations
1609    /// 4. Cache the result so subsequent calls return immediately
1610    ///
1611    /// If the background task hasn't completed when `trie_data()` is called, `wait_cloned()`
1612    /// computes from the stored inputs, eliminating deadlock risk and duplicate computation.
1613    ///
1614    /// The validation hot path can return immediately after state root verification,
1615    /// while consumers (DB writes, overlay providers, proofs) get trie data either
1616    /// from the completed task or via fallback computation.
1617    fn spawn_deferred_trie_task(
1618        &self,
1619        block: RecoveredBlock<N::Block>,
1620        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1621        ctx: &TreeCtx<'_, N>,
1622        hashed_state: LazyHashedPostState,
1623        trie_output: Arc<TrieUpdates>,
1624        changeset_provider: impl TrieCursorFactory + Send + 'static,
1625    ) -> ExecutedBlock<N> {
1626        // Capture parent hash and ancestor overlays for deferred trie input construction.
1627        let (anchor_hash, overlay_blocks) = ctx
1628            .state()
1629            .tree_state
1630            .blocks_by_hash(block.parent_hash())
1631            .unwrap_or_else(|| (block.parent_hash(), Vec::new()));
1632
1633        // Collect lightweight ancestor trie data handles. We don't call trie_data() here;
1634        // the merge and any fallback sorting happens in the compute_trie_input_task.
1635        let ancestors: Vec<DeferredTrieData> =
1636            overlay_blocks.iter().rev().map(|b| b.trie_data_handle()).collect();
1637
1638        // Create deferred handle with fallback inputs in case the background task hasn't completed.
1639        // Resolve the lazy handle into Arc<HashedPostState>. By this point the hashed state has
1640        // already been computed and used for state root verification, so .get() returns instantly.
1641        let hashed_state = match hashed_state.try_into_inner() {
1642            Ok(state) => Arc::new(state),
1643            Err(handle) => Arc::new(handle.get().clone()),
1644        };
1645        let deferred_trie_data =
1646            DeferredTrieData::pending(hashed_state, trie_output, anchor_hash, ancestors);
1647        let deferred_handle_task = deferred_trie_data.clone();
1648        let block_validation_metrics = self.metrics.block_validation.clone();
1649
1650        // Capture block info and cache handle for changeset computation
1651        let block_hash = block.hash();
1652        let block_number = block.number();
1653
1654        // Register a pending changeset entry so that concurrent readers will wait for
1655        // this computation to finish rather than falling back to the expensive DB path.
1656        // The guard ensures the pending entry is cancelled if the task panics.
1657        let pending_changeset_guard = self.changeset_cache.register_pending(block_hash);
1658
1659        // Spawn background task to compute trie data. Calling `wait_cloned` will compute from
1660        // the stored inputs and cache the result, so subsequent calls return immediately.
1661        let compute_trie_input_task = move || {
1662            let _span = debug_span!(
1663                target: "engine::tree::payload_validator",
1664                "compute_trie_input_task",
1665                block_number
1666            )
1667            .entered();
1668
1669            let result = panic::catch_unwind(AssertUnwindSafe(|| {
1670                let compute_start = Instant::now();
1671                let computed = deferred_handle_task.wait_cloned();
1672                block_validation_metrics
1673                    .deferred_trie_compute_duration
1674                    .record(compute_start.elapsed().as_secs_f64());
1675
1676                // Record sizes of the computed trie data
1677                block_validation_metrics
1678                    .hashed_post_state_size
1679                    .record(computed.hashed_state.total_len() as f64);
1680                block_validation_metrics
1681                    .trie_updates_sorted_size
1682                    .record(computed.trie_updates.total_len() as f64);
1683                if let Some(anchored) = &computed.anchored_trie_input {
1684                    block_validation_metrics
1685                        .anchored_overlay_trie_updates_size
1686                        .record(anchored.trie_input.nodes.total_len() as f64);
1687                    block_validation_metrics
1688                        .anchored_overlay_hashed_state_size
1689                        .record(anchored.trie_input.state.total_len() as f64);
1690                }
1691
1692                // Compute and cache changesets using the computed trie_updates.
1693                // Use the pre-created provider to avoid races with changeset cache
1694                // eviction that can happen between task spawn and execution.
1695                let changeset_start = Instant::now();
1696
1697                match reth_trie::changesets::compute_trie_changesets(
1698                    &changeset_provider,
1699                    &computed.trie_updates,
1700                ) {
1701                    Ok(changesets) => {
1702                        debug!(
1703                            target: "engine::tree::changeset",
1704                            ?block_number,
1705                            elapsed = ?changeset_start.elapsed(),
1706                            "Computed and caching changesets"
1707                        );
1708
1709                        pending_changeset_guard.resolve(block_number, Arc::new(changesets));
1710                    }
1711                    Err(e) => {
1712                        warn!(
1713                            target: "engine::tree::changeset",
1714                            ?block_number,
1715                            ?e,
1716                            "Failed to compute changesets in deferred trie task"
1717                        );
1718                    }
1719                }
1720            }));
1721
1722            if result.is_err() {
1723                error!(
1724                    target: "engine::tree::payload_validator",
1725                    "Deferred trie task panicked; fallback computation will be used when trie data is accessed"
1726                );
1727            }
1728        };
1729
1730        // Spawn task that computes trie data asynchronously.
1731        self.payload_processor
1732            .executor()
1733            .spawn_blocking_named("trie-input", compute_trie_input_task);
1734
1735        ExecutedBlock::with_deferred_trie_data(
1736            Arc::new(block),
1737            execution_outcome,
1738            deferred_trie_data,
1739        )
1740    }
1741
1742    fn calculate_timing_stats(
1743        &self,
1744        block: &RecoveredBlock<N::Block>,
1745        provider_stats: Arc<StateProviderStats>,
1746        cache_stats: Option<Arc<CacheStats>>,
1747        output: &BlockExecutionOutput<N::Receipt>,
1748        execution_duration: Duration,
1749        state_hash_duration: Duration,
1750    ) -> Box<ExecutionTimingStats> {
1751        let accounts_read = provider_stats.total_account_fetches();
1752        let storage_read = provider_stats.total_storage_fetches();
1753        let code_read = provider_stats.total_code_fetches();
1754        let code_bytes_read = provider_stats.total_code_fetched_bytes();
1755
1756        // Write stats from BundleState (final state changes)
1757        let accounts_changed = output.state.state.len();
1758        let accounts_deleted =
1759            output.state.state.values().filter(|acc| acc.was_destroyed()).count();
1760        let storage_slots_changed =
1761            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
1762        let storage_slots_deleted = output
1763            .state
1764            .state
1765            .values()
1766            .flat_map(|account| account.storage.values())
1767            .filter(|slot| {
1768                slot.present_value.is_zero() && !slot.previous_or_original_value.is_zero()
1769            })
1770            .count();
1771
1772        // Helper: check if account represents a new contract deployment
1773        let is_new_deployment = |acc: &BundleAccount| -> bool {
1774            let has_code_now = acc.info.as_ref().is_some_and(|info| info.code_hash != KECCAK_EMPTY);
1775            let had_no_code_before = acc
1776                .original_info
1777                .as_ref()
1778                .map(|info| info.code_hash == KECCAK_EMPTY)
1779                .unwrap_or(true);
1780            has_code_now && had_no_code_before
1781        };
1782
1783        let bytecodes_changed =
1784            output.state.state.values().filter(|acc| is_new_deployment(acc)).count();
1785
1786        // Unique new code hashes to count actual bytes persisted (deduplicated)
1787        let unique_new_code_hashes: B256Set = output
1788            .state
1789            .state
1790            .values()
1791            .filter(|acc| is_new_deployment(acc))
1792            .filter_map(|acc| acc.info.as_ref().map(|info| info.code_hash))
1793            .collect();
1794        let code_bytes_written: usize = unique_new_code_hashes
1795            .iter()
1796            .filter_map(|hash| {
1797                output.state.contracts.get(hash).map(|bytecode| bytecode.original_bytes().len())
1798            })
1799            .sum();
1800
1801        // Total time spent fetching state during execution
1802        let state_read_duration = provider_stats.total_account_fetch_latency() +
1803            provider_stats.total_storage_fetch_latency() +
1804            provider_stats.total_code_fetch_latency();
1805
1806        // EIP-7702 delegation tracking from bytecode changes
1807        // Count new EIP-7702 bytecodes as delegations set
1808        let eip7702_delegations_set =
1809            output.state.contracts.values().filter(|bytecode| bytecode.is_eip7702()).count();
1810        // Delegations cleared: accounts where bytecode changed FROM EIP-7702 TO empty
1811        // This detects when an EIP-7702 delegation is removed by setting code to empty
1812        // Note: Clearing a delegation does NOT destroy the account - it just empties the
1813        // bytecode
1814        let eip7702_delegations_cleared = output
1815            .state
1816            .state
1817            .values()
1818            .filter(|acc| {
1819                // Check if original bytecode was EIP-7702
1820                let original_was_eip7702 = acc
1821                    .original_info
1822                    .as_ref()
1823                    .and_then(|info| info.code.as_ref())
1824                    .map(|bytecode| bytecode.is_eip7702())
1825                    .unwrap_or(false);
1826
1827                // Check if current code is empty (delegation cleared)
1828                let code_now_empty =
1829                    acc.info.as_ref().map(|info| info.code_hash == KECCAK_EMPTY).unwrap_or(false);
1830
1831                original_was_eip7702 && code_now_empty
1832            })
1833            .count();
1834
1835        // Get cache statistics for detailed block logging
1836        let (account_cache_hits, account_cache_misses) = cache_stats
1837            .as_ref()
1838            .map(|s| (s.account_hits(), s.account_misses()))
1839            .unwrap_or_default();
1840        let (storage_cache_hits, storage_cache_misses) = cache_stats
1841            .as_ref()
1842            .map(|s| (s.storage_hits(), s.storage_misses()))
1843            .unwrap_or_default();
1844        let (code_cache_hits, code_cache_misses) =
1845            cache_stats.as_ref().map(|s| (s.code_hits(), s.code_misses())).unwrap_or_default();
1846
1847        // Build execution timing stats for detailed block logging
1848        Box::new(ExecutionTimingStats {
1849            block_number: block.number(),
1850            block_hash: block.hash(),
1851            gas_used: output.result.gas_used,
1852            tx_count: block.transaction_count(),
1853            execution_duration,
1854            state_read_duration,
1855            state_hash_duration,
1856            accounts_read,
1857            storage_read,
1858            code_read,
1859            code_bytes_read,
1860            accounts_changed,
1861            accounts_deleted,
1862            storage_slots_changed,
1863            storage_slots_deleted,
1864            bytecodes_changed,
1865            code_bytes_written,
1866            eip7702_delegations_set,
1867            eip7702_delegations_cleared,
1868            account_cache_hits,
1869            account_cache_misses,
1870            storage_cache_hits,
1871            storage_cache_misses,
1872            code_cache_hits,
1873            code_cache_misses,
1874        })
1875    }
1876}
1877
1878/// Strategy describing how to compute the state root.
1879#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1880enum StateRootStrategy {
1881    /// Use the state root task (background sparse trie computation).
1882    StateRootTask,
1883    /// Run the parallel state root computation on the calling thread.
1884    Parallel,
1885    /// Fall back to synchronous computation via the state provider.
1886    Synchronous,
1887}
1888
1889/// Type that validates the payloads processed by the engine.
1890///
1891/// This provides the necessary functions for validating/executing payloads/blocks.
1892pub trait EngineValidator<
1893    Types: PayloadTypes,
1894    N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
1895>: Send + Sync + 'static
1896{
1897    /// Validates the payload attributes with respect to the header.
1898    ///
1899    /// By default, this enforces that the payload attributes timestamp is greater than the
1900    /// timestamp according to:
1901    ///   > 7. Client software MUST ensure that payloadAttributes.timestamp is greater than
1902    ///   > timestamp
1903    ///   > of a block referenced by forkchoiceState.headBlockHash.
1904    ///
1905    /// See also: <https://github.com/ethereum/execution-apis/blob/main/src/engine/common.md#specification-1>
1906    fn validate_payload_attributes_against_header(
1907        &self,
1908        attr: &Types::PayloadAttributes,
1909        header: &N::BlockHeader,
1910    ) -> Result<(), InvalidPayloadAttributesError>;
1911
1912    /// Ensures that the given payload does not violate any consensus rules that concern the block's
1913    /// layout.
1914    ///
1915    /// This function must convert the payload into the executable block and pre-validate its
1916    /// fields.
1917    ///
1918    /// Implementers should ensure that the checks are done in the order that conforms with the
1919    /// engine-API specification.
1920    fn convert_payload_to_block(
1921        &self,
1922        payload: Types::ExecutionData,
1923    ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
1924
1925    /// Validates a payload received from engine API.
1926    fn validate_payload(
1927        &mut self,
1928        payload: Types::ExecutionData,
1929        ctx: TreeCtx<'_, N>,
1930    ) -> ValidationOutcome<N>;
1931
1932    /// Validates a block downloaded from the network.
1933    fn validate_block(
1934        &mut self,
1935        block: SealedBlock<N::Block>,
1936        ctx: TreeCtx<'_, N>,
1937    ) -> ValidationOutcome<N>;
1938
1939    /// Hook called after an executed block is inserted directly into the tree.
1940    ///
1941    /// This is invoked when blocks are inserted via `InsertExecutedBlock` (e.g., locally built
1942    /// blocks by sequencers) to allow implementations to update internal state such as caches.
1943    fn on_inserted_executed_block(&self, block: ExecutedBlock<N>);
1944}
1945
1946impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
1947where
1948    P: DatabaseProviderFactory<
1949            Provider: BlockReader
1950                          + StageCheckpointReader
1951                          + PruneCheckpointReader
1952                          + ChangeSetReader
1953                          + StorageChangeSetReader
1954                          + BlockNumReader
1955                          + StorageSettingsCache,
1956        > + BlockReader<Header = N::BlockHeader>
1957        + StateProviderFactory
1958        + StateReader
1959        + ChangeSetReader
1960        + BlockNumReader
1961        + HashedPostStateProvider
1962        + Clone
1963        + 'static,
1964    N: NodePrimitives,
1965    V: PayloadValidator<Types, Block = N::Block> + Clone,
1966    Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
1967    Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1968{
1969    fn validate_payload_attributes_against_header(
1970        &self,
1971        attr: &Types::PayloadAttributes,
1972        header: &N::BlockHeader,
1973    ) -> Result<(), InvalidPayloadAttributesError> {
1974        self.validator.validate_payload_attributes_against_header(attr, header)
1975    }
1976
1977    fn convert_payload_to_block(
1978        &self,
1979        payload: Types::ExecutionData,
1980    ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
1981        let block = self.validator.convert_payload_to_block(payload)?;
1982        Ok(block)
1983    }
1984
1985    fn validate_payload(
1986        &mut self,
1987        payload: Types::ExecutionData,
1988        ctx: TreeCtx<'_, N>,
1989    ) -> ValidationOutcome<N> {
1990        self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
1991    }
1992
1993    fn validate_block(
1994        &mut self,
1995        block: SealedBlock<N::Block>,
1996        ctx: TreeCtx<'_, N>,
1997    ) -> ValidationOutcome<N> {
1998        self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
1999    }
2000
2001    fn on_inserted_executed_block(&self, block: ExecutedBlock<N>) {
2002        self.payload_processor.on_inserted_executed_block(
2003            block.recovered_block.block_with_parent(),
2004            &block.execution_output.state,
2005        );
2006    }
2007}
2008
2009impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
2010where
2011    Evm: ConfigureEvm,
2012{
2013    fn wait_for_caches(&self) -> CacheWaitDurations {
2014        self.payload_processor.wait_for_caches()
2015    }
2016}
2017
2018/// Enum representing either block or payload being validated.
2019#[derive(Debug, Clone)]
2020pub enum BlockOrPayload<T: PayloadTypes> {
2021    /// Payload.
2022    Payload(T::ExecutionData),
2023    /// Block.
2024    Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
2025}
2026
2027impl<T: PayloadTypes> BlockOrPayload<T> {
2028    /// Returns the hash of the block.
2029    pub fn hash(&self) -> B256 {
2030        match self {
2031            Self::Payload(payload) => payload.block_hash(),
2032            Self::Block(block) => block.hash(),
2033        }
2034    }
2035
2036    /// Returns the number and hash of the block.
2037    pub fn num_hash(&self) -> NumHash {
2038        match self {
2039            Self::Payload(payload) => payload.num_hash(),
2040            Self::Block(block) => block.num_hash(),
2041        }
2042    }
2043
2044    /// Returns the parent hash of the block.
2045    pub fn parent_hash(&self) -> B256 {
2046        match self {
2047            Self::Payload(payload) => payload.parent_hash(),
2048            Self::Block(block) => block.parent_hash(),
2049        }
2050    }
2051
2052    /// Returns [`BlockWithParent`] for the block.
2053    pub fn block_with_parent(&self) -> BlockWithParent {
2054        match self {
2055            Self::Payload(payload) => payload.block_with_parent(),
2056            Self::Block(block) => block.block_with_parent(),
2057        }
2058    }
2059
2060    /// Returns a string showing whether or not this is a block or payload.
2061    pub const fn type_name(&self) -> &'static str {
2062        match self {
2063            Self::Payload(_) => "payload",
2064            Self::Block(_) => "block",
2065        }
2066    }
2067
2068    /// Returns the block access list if available.
2069    pub const fn block_access_list(&self) -> Option<Result<BlockAccessList, alloy_rlp::Error>> {
2070        // TODO decode and return `BlockAccessList`
2071        None
2072    }
2073
2074    /// Returns the number of transactions in the payload or block.
2075    pub fn transaction_count(&self) -> usize
2076    where
2077        T::ExecutionData: ExecutionPayload,
2078    {
2079        match self {
2080            Self::Payload(payload) => payload.transaction_count(),
2081            Self::Block(block) => block.transaction_count(),
2082        }
2083    }
2084
2085    /// Returns the withdrawals from the payload or block.
2086    pub fn withdrawals(&self) -> Option<&[Withdrawal]>
2087    where
2088        T::ExecutionData: ExecutionPayload,
2089    {
2090        match self {
2091            Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
2092            Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
2093        }
2094    }
2095
2096    /// Returns the total gas used by the block.
2097    pub fn gas_used(&self) -> u64
2098    where
2099        T::ExecutionData: ExecutionPayload,
2100    {
2101        match self {
2102            Self::Payload(payload) => payload.gas_used(),
2103            Self::Block(block) => block.gas_used(),
2104        }
2105    }
2106}