Skip to main content

reth_engine_tree/tree/
payload_validator.rs

1//! Types and traits for validating blocks and payloads.
2
3use crate::tree::{
4    cached_state::CachedStateProvider,
5    error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
6    instrumented_state::InstrumentedStateProvider,
7    payload_processor::PayloadProcessor,
8    precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
9    sparse_trie::StateRootComputeOutcome,
10    EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
11    StateProviderDatabase, TreeConfig,
12};
13use alloy_consensus::transaction::{Either, TxHashRef};
14use alloy_eip7928::BlockAccessList;
15use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
16use alloy_evm::Evm;
17use alloy_primitives::B256;
18
19use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
20use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
21use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
22use reth_engine_primitives::{
23    ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
24};
25use reth_errors::{BlockExecutionError, ProviderResult};
26use reth_evm::{
27    block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
28    SpecFor,
29};
30use reth_payload_primitives::{
31    BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
32};
33use reth_primitives_traits::{
34    AlloyBlockHeader, BlockBody, BlockTy, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock,
35    SealedHeader, SignerRecoverable,
36};
37use reth_provider::{
38    providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
39    ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
40    ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
41    StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
42};
43use reth_revm::db::{states::bundle_state::BundleRetention, State};
44use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot};
45use reth_trie_db::ChangesetCache;
46use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
47use revm_primitives::Address;
48use std::{
49    collections::HashMap,
50    panic::{self, AssertUnwindSafe},
51    sync::{mpsc::RecvTimeoutError, Arc},
52    time::Instant,
53};
54use tracing::{debug, debug_span, error, info, instrument, trace, warn};
55
56/// Context providing access to tree state during validation.
57///
58/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
59/// internals
60pub struct TreeCtx<'a, N: NodePrimitives> {
61    /// The engine API tree state
62    state: &'a mut EngineApiTreeState<N>,
63    /// Reference to the canonical in-memory state
64    canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
65}
66
67impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("TreeCtx")
70            .field("state", &"EngineApiTreeState")
71            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
72            .finish()
73    }
74}
75
76impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
77    /// Creates a new tree context
78    pub const fn new(
79        state: &'a mut EngineApiTreeState<N>,
80        canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
81    ) -> Self {
82        Self { state, canonical_in_memory_state }
83    }
84
85    /// Returns a reference to the engine tree state
86    pub const fn state(&self) -> &EngineApiTreeState<N> {
87        &*self.state
88    }
89
90    /// Returns a mutable reference to the engine tree state
91    pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
92        self.state
93    }
94
95    /// Returns a reference to the canonical in-memory state
96    pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
97        self.canonical_in_memory_state
98    }
99}
100
101/// A helper type that provides reusable payload validation logic for network-specific validators.
102///
103/// This type satisfies [`EngineValidator`] and is responsible for executing blocks/payloads.
104///
105/// This type contains common validation, execution, and state root computation logic that can be
106/// used by network-specific payload validators (e.g., Ethereum, Optimism). It is not meant to be
107/// used as a standalone component, but rather as a building block for concrete implementations.
108#[derive(derive_more::Debug)]
109pub struct BasicEngineValidator<P, Evm, V>
110where
111    Evm: ConfigureEvm,
112{
113    /// Provider for database access.
114    provider: P,
115    /// Consensus implementation for validation.
116    consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
117    /// EVM configuration.
118    evm_config: Evm,
119    /// Configuration for the tree.
120    config: TreeConfig,
121    /// Payload processor for state root computation.
122    payload_processor: PayloadProcessor<Evm>,
123    /// Precompile cache map.
124    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
125    /// Precompile cache metrics.
126    precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
127    /// Hook to call when invalid blocks are encountered.
128    #[debug(skip)]
129    invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
130    /// Metrics for the engine api.
131    metrics: EngineApiMetrics,
132    /// Validator for the payload.
133    validator: V,
134    /// Changeset cache for in-memory trie changesets
135    changeset_cache: ChangesetCache,
136    /// Task runtime for spawning parallel work.
137    runtime: reth_tasks::Runtime,
138}
139
140impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
141where
142    N: NodePrimitives,
143    P: DatabaseProviderFactory<
144            Provider: BlockReader
145                          + StageCheckpointReader
146                          + PruneCheckpointReader
147                          + ChangeSetReader
148                          + StorageChangeSetReader
149                          + BlockNumReader
150                          + StorageSettingsCache,
151        > + BlockReader<Header = N::BlockHeader>
152        + ChangeSetReader
153        + BlockNumReader
154        + StateProviderFactory
155        + StateReader
156        + HashedPostStateProvider
157        + Clone
158        + 'static,
159    Evm: ConfigureEvm<Primitives = N> + 'static,
160{
161    /// Creates a new `TreePayloadValidator`.
162    #[allow(clippy::too_many_arguments)]
163    pub fn new(
164        provider: P,
165        consensus: Arc<dyn FullConsensus<N>>,
166        evm_config: Evm,
167        validator: V,
168        config: TreeConfig,
169        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
170        changeset_cache: ChangesetCache,
171        runtime: reth_tasks::Runtime,
172    ) -> Self {
173        let precompile_cache_map = PrecompileCacheMap::default();
174        let payload_processor = PayloadProcessor::new(
175            runtime.clone(),
176            evm_config.clone(),
177            &config,
178            precompile_cache_map.clone(),
179        );
180        Self {
181            provider,
182            consensus,
183            evm_config,
184            payload_processor,
185            precompile_cache_map,
186            precompile_cache_metrics: HashMap::new(),
187            config,
188            invalid_block_hook,
189            metrics: EngineApiMetrics::default(),
190            validator,
191            changeset_cache,
192            runtime,
193        }
194    }
195
196    /// Converts a [`BlockOrPayload`] to a recovered block.
197    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
198    pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
199        &self,
200        input: BlockOrPayload<T>,
201    ) -> Result<SealedBlock<N::Block>, NewPayloadError>
202    where
203        V: PayloadValidator<T, Block = N::Block>,
204    {
205        match input {
206            BlockOrPayload::Payload(payload) => self.validator.convert_payload_to_block(payload),
207            BlockOrPayload::Block(block) => Ok(block),
208        }
209    }
210
211    /// Returns EVM environment for the given payload or block.
212    pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
213        &self,
214        input: &BlockOrPayload<T>,
215    ) -> Result<EvmEnvFor<Evm>, Evm::Error>
216    where
217        V: PayloadValidator<T, Block = N::Block>,
218        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
219    {
220        match input {
221            BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
222            BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
223        }
224    }
225
226    /// Returns [`ExecutableTxIterator`] for the given payload or block.
227    pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
228        &'a self,
229        input: &'a BlockOrPayload<T>,
230    ) -> Result<impl ExecutableTxIterator<Evm>, NewPayloadError>
231    where
232        V: PayloadValidator<T, Block = N::Block>,
233        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
234    {
235        Ok(match input {
236            BlockOrPayload::Payload(payload) => {
237                let iter = self
238                    .evm_config
239                    .tx_iterator_for_payload(payload)
240                    .map_err(NewPayloadError::other)?;
241                Either::Left(iter)
242            }
243            BlockOrPayload::Block(block) => {
244                let txs = block.body().clone_transactions();
245                let convert = |tx: N::SignedTx| tx.try_into_recovered();
246                Either::Right((txs, convert))
247            }
248        })
249    }
250
251    /// Returns a [`ExecutionCtxFor`] for the given payload or block.
252    pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
253        &self,
254        input: &'a BlockOrPayload<T>,
255    ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
256    where
257        V: PayloadValidator<T, Block = N::Block>,
258        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
259    {
260        match input {
261            BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
262            BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
263        }
264    }
265
266    /// Handles execution errors by checking if header validation errors should take precedence.
267    ///
268    /// When an execution error occurs, this function checks if there are any header validation
269    /// errors that should be reported instead, as header validation errors have higher priority.
270    fn handle_execution_error<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
271        &self,
272        input: BlockOrPayload<T>,
273        execution_err: InsertBlockErrorKind,
274        parent_block: &SealedHeader<N::BlockHeader>,
275    ) -> Result<ExecutedBlock<N>, InsertPayloadError<N::Block>>
276    where
277        V: PayloadValidator<T, Block = N::Block>,
278    {
279        debug!(
280            target: "engine::tree::payload_validator",
281            ?execution_err,
282            block = ?input.num_hash(),
283            "Block execution failed, checking for header validation errors"
284        );
285
286        // If execution failed, we should first check if there are any header validation
287        // errors that take precedence over the execution error
288        let block = self.convert_to_block(input)?;
289
290        // Validate block consensus rules which includes header validation
291        if let Err(consensus_err) = self.validate_block_inner(&block) {
292            // Header validation error takes precedence over execution error
293            return Err(InsertBlockError::new(block, consensus_err.into()).into())
294        }
295
296        // Also validate against the parent
297        if let Err(consensus_err) =
298            self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
299        {
300            // Parent validation error takes precedence over execution error
301            return Err(InsertBlockError::new(block, consensus_err.into()).into())
302        }
303
304        // No header validation errors, return the original execution error
305        Err(InsertBlockError::new(block, execution_err).into())
306    }
307
308    /// Validates a block that has already been converted from a payload.
309    ///
310    /// This method performs:
311    /// - Consensus validation
312    /// - Block execution
313    /// - State root computation
314    /// - Fork detection
315    #[instrument(
316        level = "debug",
317        target = "engine::tree::payload_validator",
318        skip_all,
319        fields(
320            parent = ?input.parent_hash(),
321            type_name = ?input.type_name(),
322        )
323    )]
324    pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
325        &mut self,
326        input: BlockOrPayload<T>,
327        mut ctx: TreeCtx<'_, N>,
328    ) -> ValidationOutcome<N, InsertPayloadError<N::Block>>
329    where
330        V: PayloadValidator<T, Block = N::Block>,
331        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
332    {
333        /// A helper macro that returns the block in case there was an error
334        /// This macro is used for early returns before block conversion
335        macro_rules! ensure_ok {
336            ($expr:expr) => {
337                match $expr {
338                    Ok(val) => val,
339                    Err(e) => {
340                        let block = self.convert_to_block(input)?;
341                        return Err(InsertBlockError::new(block, e.into()).into())
342                    }
343                }
344            };
345        }
346
347        /// A helper macro for handling errors after the input has been converted to a block
348        macro_rules! ensure_ok_post_block {
349            ($expr:expr, $block:expr) => {
350                match $expr {
351                    Ok(val) => val,
352                    Err(e) => {
353                        return Err(
354                            InsertBlockError::new($block.into_sealed_block(), e.into()).into()
355                        )
356                    }
357                }
358            };
359        }
360
361        let parent_hash = input.parent_hash();
362
363        trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
364        let _enter =
365            debug_span!(target: "engine::tree::payload_validator", "state provider").entered();
366        let Some(provider_builder) =
367            ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
368        else {
369            // this is pre-validated in the tree
370            return Err(InsertBlockError::new(
371                self.convert_to_block(input)?,
372                ProviderError::HeaderNotFound(parent_hash.into()).into(),
373            )
374            .into())
375        };
376        let mut state_provider = ensure_ok!(provider_builder.build());
377        drop(_enter);
378
379        // Fetch parent block. This goes to memory most of the time unless the parent block is
380        // beyond the in-memory buffer.
381        let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
382        else {
383            return Err(InsertBlockError::new(
384                self.convert_to_block(input)?,
385                ProviderError::HeaderNotFound(parent_hash.into()).into(),
386            )
387            .into())
388        };
389
390        let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm env")
391            .in_scope(|| self.evm_env_for(&input))
392            .map_err(NewPayloadError::other)?;
393
394        let env = ExecutionEnv {
395            evm_env,
396            hash: input.hash(),
397            parent_hash: input.parent_hash(),
398            parent_state_root: parent_block.state_root(),
399            transaction_count: input.transaction_count(),
400            withdrawals: input.withdrawals().map(|w| w.to_vec()),
401        };
402
403        // Plan the strategy used for state root computation.
404        let strategy = self.plan_state_root_computation();
405
406        debug!(
407            target: "engine::tree::payload_validator",
408            ?strategy,
409            "Decided which state root algorithm to run"
410        );
411
412        // Get an iterator over the transactions in the payload
413        let txs = self.tx_iterator_for(&input)?;
414
415        // Extract the BAL, if valid and available
416        let block_access_list = ensure_ok!(input
417            .block_access_list()
418            .transpose()
419            // Eventually gets converted to a `InsertBlockErrorKind::Other`
420            .map_err(Box::<dyn std::error::Error + Send + Sync>::from))
421        .map(Arc::new);
422
423        // Create lazy overlay from ancestors - this doesn't block, allowing execution to start
424        // before the trie data is ready. The overlay will be computed on first access.
425        let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
426
427        // Create overlay factory for payload processor (StateRootTask path needs it for
428        // multiproofs)
429        let overlay_factory =
430            OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
431                .with_block_hash(Some(anchor_hash))
432                .with_lazy_overlay(lazy_overlay);
433
434        // Spawn the appropriate processor based on strategy
435        let mut handle = ensure_ok!(self.spawn_payload_processor(
436            env.clone(),
437            txs,
438            provider_builder,
439            overlay_factory.clone(),
440            strategy,
441            block_access_list,
442        ));
443
444        // Use cached state provider before executing, used in execution after prewarming threads
445        // complete
446        if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
447            state_provider =
448                Box::new(CachedStateProvider::new(state_provider, caches, cache_metrics));
449        };
450
451        if self.config.state_provider_metrics() {
452            state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "engine"));
453        }
454
455        // Execute the block and handle any execution errors.
456        // The receipt root task is spawned before execution and receives receipts incrementally
457        // as transactions complete, allowing parallel computation during execution.
458        let (output, senders, receipt_root_rx) =
459            match self.execute_block(state_provider, env, &input, &mut handle) {
460                Ok(output) => output,
461                Err(err) => return self.handle_execution_error(input, err, &parent_block),
462            };
463
464        // After executing the block we can stop prewarming transactions
465        handle.stop_prewarming_execution();
466
467        // Create ExecutionOutcome early so we can terminate caching before validation and state
468        // root computation. Using Arc allows sharing with both the caching task and the deferred
469        // trie task without cloning the expensive BundleState.
470        let output = Arc::new(output);
471
472        // Terminate caching task early since execution is complete and caching is no longer
473        // needed. This frees up resources while state root computation continues.
474        let valid_block_tx = handle.terminate_caching(Some(output.clone()));
475
476        let block = self.convert_to_block(input)?.with_senders(senders);
477
478        // Wait for the receipt root computation to complete.
479        let receipt_root_bloom = receipt_root_rx
480            .blocking_recv()
481            .inspect_err(|_| {
482                tracing::error!(
483                    target: "engine::tree::payload_validator",
484                    "Receipt root task dropped sender without result, receipt root calculation likely aborted"
485                );
486            })
487            .ok();
488
489        let hashed_state = ensure_ok_post_block!(
490            self.validate_post_execution(
491                &block,
492                &parent_block,
493                &output,
494                &mut ctx,
495                receipt_root_bloom
496            ),
497            block
498        );
499
500        let root_time = Instant::now();
501        let mut maybe_state_root = None;
502        let mut state_root_task_failed = false;
503
504        match strategy {
505            StateRootStrategy::StateRootTask => {
506                debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
507
508                let task_result = ensure_ok_post_block!(
509                    self.await_state_root_with_timeout(
510                        &mut handle,
511                        overlay_factory.clone(),
512                        &hashed_state,
513                    ),
514                    block
515                );
516
517                match task_result {
518                    Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
519                        let elapsed = root_time.elapsed();
520                        info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
521
522                        // Compare trie updates with serial computation if configured
523                        if self.config.always_compare_trie_updates() {
524                            self.compare_trie_updates_with_serial(
525                                overlay_factory.clone(),
526                                &hashed_state,
527                                trie_updates.clone(),
528                            );
529                        }
530
531                        // we double check the state root here for good measure
532                        if state_root == block.header().state_root() {
533                            maybe_state_root = Some((state_root, trie_updates, elapsed))
534                        } else {
535                            warn!(
536                                target: "engine::tree::payload_validator",
537                                ?state_root,
538                                block_state_root = ?block.header().state_root(),
539                                "State root task returned incorrect state root"
540                            );
541                            state_root_task_failed = true;
542                        }
543                    }
544                    Err(error) => {
545                        debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
546                        state_root_task_failed = true;
547                    }
548                }
549            }
550            StateRootStrategy::Parallel => {
551                debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
552                match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
553                    Ok(result) => {
554                        let elapsed = root_time.elapsed();
555                        info!(
556                            target: "engine::tree::payload_validator",
557                            regular_state_root = ?result.0,
558                            ?elapsed,
559                            "Regular root task finished"
560                        );
561                        maybe_state_root = Some((result.0, result.1, elapsed));
562                    }
563                    Err(error) => {
564                        debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
565                    }
566                }
567            }
568            StateRootStrategy::Synchronous => {}
569        }
570
571        // Determine the state root.
572        // If the state root was computed in parallel, we use it.
573        // Otherwise, we fall back to computing it synchronously.
574        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
575            maybe_state_root
576        {
577            maybe_state_root
578        } else {
579            // fallback is to compute the state root regularly in sync
580            if self.config.state_root_fallback() {
581                debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
582            } else {
583                warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel");
584                self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
585            }
586
587            let (root, updates) = ensure_ok_post_block!(
588                Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
589                block
590            );
591
592            if state_root_task_failed {
593                self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
594            }
595
596            (root, updates, root_time.elapsed())
597        };
598
599        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
600        debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
601
602        // ensure state root matches
603        if state_root != block.header().state_root() {
604            // call post-block hook
605            self.on_invalid_block(
606                &parent_block,
607                &block,
608                &output,
609                Some((&trie_output, state_root)),
610                ctx.state_mut(),
611            );
612            let block_state_root = block.header().state_root();
613            return Err(InsertBlockError::new(
614                block.into_sealed_block(),
615                ConsensusError::BodyStateRootDiff(
616                    GotExpected { got: state_root, expected: block_state_root }.into(),
617                )
618                .into(),
619            )
620            .into())
621        }
622
623        if let Some(valid_block_tx) = valid_block_tx {
624            let _ = valid_block_tx.send(());
625        }
626
627        Ok(self.spawn_deferred_trie_task(
628            block,
629            output,
630            &ctx,
631            hashed_state,
632            trie_output,
633            overlay_factory,
634        ))
635    }
636
637    /// Return sealed block header from database or in-memory state by hash.
638    fn sealed_header_by_hash(
639        &self,
640        hash: B256,
641        state: &EngineApiTreeState<N>,
642    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
643        // check memory first
644        let header = state.tree_state.sealed_header_by_hash(&hash);
645
646        if header.is_some() {
647            Ok(header)
648        } else {
649            self.provider.sealed_header_by_hash(hash)
650        }
651    }
652
653    /// Validate if block is correct and satisfies all the consensus rules that concern the header
654    /// and block body itself.
655    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
656    fn validate_block_inner(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
657        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
658            error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
659            return Err(e)
660        }
661
662        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
663            error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
664            return Err(e)
665        }
666
667        Ok(())
668    }
669
670    /// Executes a block with the given state provider.
671    ///
672    /// This method orchestrates block execution:
673    /// 1. Sets up the EVM with state database and precompile caching
674    /// 2. Spawns a background task for incremental receipt root computation
675    /// 3. Executes transactions with metrics collection via state hooks
676    /// 4. Merges state transitions and records execution metrics
677    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
678    #[expect(clippy::type_complexity)]
679    fn execute_block<S, Err, T>(
680        &mut self,
681        state_provider: S,
682        env: ExecutionEnv<Evm>,
683        input: &BlockOrPayload<T>,
684        handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
685    ) -> Result<
686        (
687            BlockExecutionOutput<N::Receipt>,
688            Vec<Address>,
689            tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
690        ),
691        InsertBlockErrorKind,
692    >
693    where
694        S: StateProvider + Send,
695        Err: core::error::Error + Send + Sync + 'static,
696        V: PayloadValidator<T, Block = N::Block>,
697        T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
698        Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
699    {
700        debug!(target: "engine::tree::payload_validator", "Executing block");
701
702        let mut db = State::builder()
703            .with_database(StateProviderDatabase::new(state_provider))
704            .with_bundle_update()
705            .without_state_clear()
706            .build();
707
708        let spec_id = *env.evm_env.spec_id();
709        let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
710        let ctx =
711            self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
712        let mut executor = self.evm_config.create_executor(evm, ctx);
713
714        if !self.config.precompile_cache_disabled() {
715            // Only cache pure precompiles to avoid issues with stateful precompiles
716            executor.evm_mut().precompiles_mut().map_pure_precompiles(|address, precompile| {
717                let metrics = self
718                    .precompile_cache_metrics
719                    .entry(*address)
720                    .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
721                    .clone();
722                CachedPrecompile::wrap(
723                    precompile,
724                    self.precompile_cache_map.cache_for_address(*address),
725                    spec_id,
726                    Some(metrics),
727                )
728            });
729        }
730
731        // Spawn background task to compute receipt root and logs bloom incrementally.
732        // Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
733        let receipts_len = input.transaction_count();
734        let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
735        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
736        let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
737        self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
738
739        let transaction_count = input.transaction_count();
740        let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
741
742        let execution_start = Instant::now();
743
744        // Execute all transactions and finalize
745        let (executor, senders) = self.execute_transactions(
746            executor,
747            transaction_count,
748            handle.iter_transactions(),
749            &receipt_tx,
750        )?;
751        drop(receipt_tx);
752
753        // Finish execution and get the result
754        let post_exec_start = Instant::now();
755        let (_evm, result) = debug_span!(target: "engine::tree", "finish")
756            .in_scope(|| executor.finish())
757            .map(|(evm, result)| (evm.into_db(), result))?;
758        self.metrics.record_post_execution(post_exec_start.elapsed());
759
760        // Merge transitions into bundle state
761        debug_span!(target: "engine::tree", "merge transitions")
762            .in_scope(|| db.merge_transitions(BundleRetention::Reverts));
763
764        let output = BlockExecutionOutput { result, state: db.take_bundle() };
765
766        let execution_duration = execution_start.elapsed();
767        self.metrics.record_block_execution(&output, execution_duration);
768
769        debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
770        Ok((output, senders, result_rx))
771    }
772
773    /// Executes transactions and collects senders, streaming receipts to a background task.
774    ///
775    /// This method handles:
776    /// - Applying pre-execution changes (e.g., beacon root updates)
777    /// - Executing each transaction with timing metrics
778    /// - Streaming receipts to the receipt root computation task
779    /// - Collecting transaction senders for later use
780    ///
781    /// Returns the executor (for finalization) and the collected senders.
782    fn execute_transactions<E, Tx, InnerTx, Err>(
783        &self,
784        mut executor: E,
785        transaction_count: usize,
786        transactions: impl Iterator<Item = Result<Tx, Err>>,
787        receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
788    ) -> Result<(E, Vec<Address>), BlockExecutionError>
789    where
790        E: BlockExecutor<Receipt = N::Receipt>,
791        Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
792        InnerTx: TxHashRef,
793        Err: core::error::Error + Send + Sync + 'static,
794    {
795        let mut senders = Vec::with_capacity(transaction_count);
796
797        // Apply pre-execution changes (e.g., beacon root update)
798        let pre_exec_start = Instant::now();
799        debug_span!(target: "engine::tree", "pre execution")
800            .in_scope(|| executor.apply_pre_execution_changes())?;
801        self.metrics.record_pre_execution(pre_exec_start.elapsed());
802
803        // Execute transactions
804        let exec_span = debug_span!(target: "engine::tree", "execution").entered();
805        let mut transactions = transactions.into_iter();
806        // Some executors may execute transactions that do not append receipts during the
807        // main loop (e.g., system transactions whose receipts are added during finalization).
808        // In that case, invoking the callback on every transaction would resend the previous
809        // receipt with the same index and can panic the ordered root builder.
810        let mut last_sent_len = 0usize;
811        loop {
812            // Measure time spent waiting for next transaction from iterator
813            // (e.g., parallel signature recovery)
814            let wait_start = Instant::now();
815            let Some(tx_result) = transactions.next() else { break };
816            self.metrics.record_transaction_wait(wait_start.elapsed());
817
818            let tx = tx_result.map_err(BlockExecutionError::other)?;
819            let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
820
821            senders.push(tx_signer);
822
823            let _enter = debug_span!(
824                target: "engine::tree",
825                "execute tx",
826            )
827            .entered();
828            trace!(target: "engine::tree", "Executing transaction");
829
830            let tx_start = Instant::now();
831            executor.execute_transaction(tx)?;
832            self.metrics.record_transaction_execution(tx_start.elapsed());
833
834            let current_len = executor.receipts().len();
835            if current_len > last_sent_len {
836                last_sent_len = current_len;
837                // Send the latest receipt to the background task for incremental root computation.
838                if let Some(receipt) = executor.receipts().last() {
839                    let tx_index = current_len - 1;
840                    let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
841                }
842            }
843        }
844        drop(exec_span);
845
846        Ok((executor, senders))
847    }
848
849    /// Compute state root for the given hashed post state in parallel.
850    ///
851    /// Uses an overlay factory which provides the state of the parent block, along with the
852    /// [`HashedPostState`] containing the changes of this block, to compute the state root and
853    /// trie updates for this block.
854    ///
855    /// # Returns
856    ///
857    /// Returns `Ok(_)` if computed successfully.
858    /// Returns `Err(_)` if error was encountered during computation.
859    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
860    fn compute_state_root_parallel(
861        &self,
862        overlay_factory: OverlayStateProviderFactory<P>,
863        hashed_state: &HashedPostState,
864    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
865        // The `hashed_state` argument will be taken into account as part of the overlay, but we
866        // need to use the prefix sets which were generated from it to indicate to the
867        // ParallelStateRoot which parts of the trie need to be recomputed.
868        let prefix_sets = hashed_state.construct_prefix_sets().freeze();
869        let overlay_factory =
870            overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
871        ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
872            .incremental_root_with_updates()
873    }
874
875    /// Compute state root for the given hashed post state in serial.
876    ///
877    /// Uses an overlay factory which provides the state of the parent block, along with the
878    /// [`HashedPostState`] containing the changes of this block, to compute the state root and
879    /// trie updates for this block.
880    fn compute_state_root_serial(
881        overlay_factory: OverlayStateProviderFactory<P>,
882        hashed_state: &HashedPostState,
883    ) -> ProviderResult<(B256, TrieUpdates)> {
884        // The `hashed_state` argument will be taken into account as part of the overlay, but we
885        // need to use the prefix sets which were generated from it to indicate to the
886        // StateRoot which parts of the trie need to be recomputed.
887        let prefix_sets = hashed_state.construct_prefix_sets().freeze();
888        let overlay_factory =
889            overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
890
891        let provider = overlay_factory.database_provider_ro()?;
892
893        Ok(StateRoot::new(&provider, &provider)
894            .with_prefix_sets(prefix_sets)
895            .root_with_updates()?)
896    }
897
898    /// Awaits the state root from the background task, with an optional timeout fallback.
899    ///
900    /// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
901    /// state root task up to the timeout duration. If the task doesn't complete in time, a
902    /// sequential state root computation is spawned via `spawn_blocking`. Both computations
903    /// then race: the main thread polls the task receiver and the sequential result channel
904    /// in a loop, returning whichever finishes first.
905    ///
906    /// If no timeout is configured, this simply awaits the state root task without any fallback.
907    ///
908    /// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
909    /// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
910    /// `Result` captures parallel state root task errors that can still fall back to serial.
911    #[instrument(
912        level = "debug",
913        target = "engine::tree::payload_validator",
914        name = "await_state_root",
915        skip_all
916    )]
917    fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
918        &self,
919        handle: &mut PayloadHandle<Tx, Err, R>,
920        overlay_factory: OverlayStateProviderFactory<P>,
921        hashed_state: &HashedPostState,
922    ) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
923        let Some(timeout) = self.config.state_root_task_timeout() else {
924            return Ok(handle.state_root());
925        };
926
927        let task_rx = handle.take_state_root_rx();
928
929        match task_rx.recv_timeout(timeout) {
930            Ok(result) => Ok(result),
931            Err(RecvTimeoutError::Disconnected) => {
932                Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
933            }
934            Err(RecvTimeoutError::Timeout) => {
935                warn!(
936                    target: "engine::tree::payload_validator",
937                    ?timeout,
938                    "State root task timed out, spawning sequential fallback"
939                );
940                self.metrics.block_validation.state_root_task_timeout_total.increment(1);
941
942                let (seq_tx, seq_rx) =
943                    std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
944
945                let seq_overlay = overlay_factory;
946                let seq_hashed_state = hashed_state.clone();
947                self.payload_processor.executor().spawn_blocking(move || {
948                    let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
949                    let _ = seq_tx.send(result);
950                });
951
952                const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
953
954                loop {
955                    match task_rx.recv_timeout(POLL_INTERVAL) {
956                        Ok(result) => {
957                            debug!(
958                                target: "engine::tree::payload_validator",
959                                source = "task",
960                                "State root timeout race won"
961                            );
962                            return Ok(result);
963                        }
964                        Err(RecvTimeoutError::Disconnected) => {
965                            debug!(
966                                target: "engine::tree::payload_validator",
967                                "State root task dropped, waiting for sequential fallback"
968                            );
969                            let result = seq_rx.recv().map_err(|_| {
970                                ProviderError::other(std::io::Error::other(
971                                    "both state root computations failed",
972                                ))
973                            })?;
974                            let (state_root, trie_updates) = result?;
975                            return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
976                        }
977                        Err(RecvTimeoutError::Timeout) => {}
978                    }
979
980                    if let Ok(result) = seq_rx.try_recv() {
981                        debug!(
982                            target: "engine::tree::payload_validator",
983                            source = "sequential",
984                            "State root timeout race won"
985                        );
986                        let (state_root, trie_updates) = result?;
987                        return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
988                    }
989                }
990            }
991        }
992    }
993
994    /// Compares trie updates from the state root task with serial state root computation.
995    ///
996    /// This is used for debugging and validating the correctness of the parallel state root
997    /// task implementation. When enabled via `--engine.state-root-task-compare-updates`, this
998    /// method runs a separate serial state root computation and compares the resulting trie
999    /// updates.
1000    fn compare_trie_updates_with_serial(
1001        &self,
1002        overlay_factory: OverlayStateProviderFactory<P>,
1003        hashed_state: &HashedPostState,
1004        task_trie_updates: TrieUpdates,
1005    ) {
1006        debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
1007
1008        match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
1009            Ok((serial_root, serial_trie_updates)) => {
1010                debug!(
1011                    target: "engine::tree::payload_validator",
1012                    ?serial_root,
1013                    "Serial state root computation finished for comparison"
1014                );
1015
1016                // Get a database provider to use as trie cursor factory
1017                match overlay_factory.database_provider_ro() {
1018                    Ok(provider) => {
1019                        if let Err(err) = super::trie_updates::compare_trie_updates(
1020                            &provider,
1021                            task_trie_updates,
1022                            serial_trie_updates,
1023                        ) {
1024                            warn!(
1025                                target: "engine::tree::payload_validator",
1026                                %err,
1027                                "Error comparing trie updates"
1028                            );
1029                        }
1030                    }
1031                    Err(err) => {
1032                        warn!(
1033                            target: "engine::tree::payload_validator",
1034                            %err,
1035                            "Failed to get database provider for trie update comparison"
1036                        );
1037                    }
1038                }
1039            }
1040            Err(err) => {
1041                warn!(
1042                    target: "engine::tree::payload_validator",
1043                    %err,
1044                    "Failed to compute serial state root for comparison"
1045                );
1046            }
1047        }
1048    }
1049
1050    /// Validates the block after execution.
1051    ///
1052    /// This performs:
1053    /// - parent header validation
1054    /// - post-execution consensus validation
1055    /// - state-root based post-execution validation
1056    ///
1057    /// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
1058    /// and logs bloom from the receipts.
1059    #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
1060    fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1061        &self,
1062        block: &RecoveredBlock<N::Block>,
1063        parent_block: &SealedHeader<N::BlockHeader>,
1064        output: &BlockExecutionOutput<N::Receipt>,
1065        ctx: &mut TreeCtx<'_, N>,
1066        receipt_root_bloom: Option<ReceiptRootBloom>,
1067    ) -> Result<HashedPostState, InsertBlockErrorKind>
1068    where
1069        V: PayloadValidator<T, Block = N::Block>,
1070    {
1071        let start = Instant::now();
1072
1073        trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
1074        // validate block consensus rules
1075        if let Err(e) = self.validate_block_inner(block) {
1076            return Err(e.into())
1077        }
1078
1079        // now validate against the parent
1080        let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_header_against_parent").entered();
1081        if let Err(e) =
1082            self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
1083        {
1084            warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
1085            return Err(e.into())
1086        }
1087        drop(_enter);
1088
1089        // Validate block post-execution rules
1090        let _enter =
1091            debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
1092                .entered();
1093        if let Err(err) =
1094            self.consensus.validate_block_post_execution(block, output, receipt_root_bloom)
1095        {
1096            // call post-block hook
1097            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1098            return Err(err.into())
1099        }
1100        drop(_enter);
1101
1102        let _enter =
1103            debug_span!(target: "engine::tree::payload_validator", "hashed_post_state").entered();
1104        let hashed_state = self.provider.hashed_post_state(&output.state);
1105        drop(_enter);
1106
1107        let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
1108        if let Err(err) =
1109            self.validator.validate_block_post_execution_with_hashed_state(&hashed_state, block)
1110        {
1111            // call post-block hook
1112            self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
1113            return Err(err.into())
1114        }
1115
1116        // record post-execution validation duration
1117        self.metrics
1118            .block_validation
1119            .post_execution_validation_duration
1120            .record(start.elapsed().as_secs_f64());
1121
1122        Ok(hashed_state)
1123    }
1124
1125    /// Spawns a payload processor task based on the state root strategy.
1126    ///
1127    /// This method determines how to execute the block and compute its state root based on
1128    /// the selected strategy:
1129    /// - `StateRootTask`: Uses a dedicated task for state root computation with proof generation
1130    /// - `Parallel`: Computes state root in parallel with block execution
1131    /// - `Synchronous`: Falls back to sequential execution and state root computation
1132    ///
1133    /// The method handles strategy fallbacks if the preferred approach fails, ensuring
1134    /// block execution always completes with a valid state root.
1135    ///
1136    /// # Arguments
1137    ///
1138    /// * `overlay_factory` - Pre-computed overlay factory for multiproof generation
1139    ///   (`StateRootTask`)
1140    #[allow(clippy::too_many_arguments)]
1141    #[instrument(
1142        level = "debug",
1143        target = "engine::tree::payload_validator",
1144        skip_all,
1145        fields(strategy)
1146    )]
1147    fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
1148        &mut self,
1149        env: ExecutionEnv<Evm>,
1150        txs: T,
1151        provider_builder: StateProviderBuilder<N, P>,
1152        overlay_factory: OverlayStateProviderFactory<P>,
1153        strategy: StateRootStrategy,
1154        block_access_list: Option<Arc<BlockAccessList>>,
1155    ) -> Result<
1156        PayloadHandle<
1157            impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
1158            impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
1159            N::Receipt,
1160        >,
1161        InsertBlockErrorKind,
1162    > {
1163        match strategy {
1164            StateRootStrategy::StateRootTask => {
1165                let spawn_start = Instant::now();
1166
1167                // Use the pre-computed overlay factory for multiproofs
1168                let handle = self.payload_processor.spawn(
1169                    env,
1170                    txs,
1171                    provider_builder,
1172                    overlay_factory,
1173                    &self.config,
1174                    block_access_list,
1175                );
1176
1177                // record prewarming initialization duration
1178                self.metrics
1179                    .block_validation
1180                    .spawn_payload_processor
1181                    .record(spawn_start.elapsed().as_secs_f64());
1182
1183                Ok(handle)
1184            }
1185            StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
1186                let start = Instant::now();
1187                let handle = self.payload_processor.spawn_cache_exclusive(
1188                    env,
1189                    txs,
1190                    provider_builder,
1191                    block_access_list,
1192                );
1193
1194                // Record prewarming initialization duration
1195                self.metrics
1196                    .block_validation
1197                    .spawn_payload_processor
1198                    .record(start.elapsed().as_secs_f64());
1199
1200                Ok(handle)
1201            }
1202        }
1203    }
1204
1205    /// Creates a `StateProviderBuilder` for the given parent hash.
1206    ///
1207    /// This method checks if the parent is in the tree state (in-memory) or persisted to disk,
1208    /// and creates the appropriate provider builder.
1209    fn state_provider_builder(
1210        &self,
1211        hash: B256,
1212        state: &EngineApiTreeState<N>,
1213    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
1214        if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
1215            debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
1216            // the block leads back to the canonical chain
1217            return Ok(Some(StateProviderBuilder::new(
1218                self.provider.clone(),
1219                historical,
1220                Some(blocks),
1221            )))
1222        }
1223
1224        // Check if the block is persisted
1225        if let Some(header) = self.provider.header(hash)? {
1226            debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
1227            // For persisted blocks, we create a builder that will fetch state directly from the
1228            // database
1229            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
1230        }
1231
1232        debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
1233        Ok(None)
1234    }
1235
1236    /// Determines the state root computation strategy based on configuration.
1237    ///
1238    /// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
1239    /// too expensive because it requires walking all paths in every proof.
1240    const fn plan_state_root_computation(&self) -> StateRootStrategy {
1241        if self.config.state_root_fallback() {
1242            StateRootStrategy::Synchronous
1243        } else if self.config.use_state_root_task() {
1244            StateRootStrategy::StateRootTask
1245        } else {
1246            StateRootStrategy::Parallel
1247        }
1248    }
1249
1250    /// Called when an invalid block is encountered during validation.
1251    fn on_invalid_block(
1252        &self,
1253        parent_header: &SealedHeader<N::BlockHeader>,
1254        block: &RecoveredBlock<N::Block>,
1255        output: &BlockExecutionOutput<N::Receipt>,
1256        trie_updates: Option<(&TrieUpdates, B256)>,
1257        state: &mut EngineApiTreeState<N>,
1258    ) {
1259        if state.invalid_headers.get(&block.hash()).is_some() {
1260            // we already marked this block as invalid
1261            return
1262        }
1263        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1264    }
1265
1266    /// Creates a [`LazyOverlay`] for the parent block without blocking.
1267    ///
1268    /// Returns a lazy overlay that will compute the trie input on first access, and the anchor
1269    /// block hash (the highest persisted ancestor). This allows execution to start immediately
1270    /// while the trie input computation is deferred until the overlay is actually needed.
1271    ///
1272    /// If parent is on disk (no in-memory blocks), returns `None` for the lazy overlay.
1273    ///
1274    /// Uses a cached overlay if available for the canonical head (the common case).
1275    fn get_parent_lazy_overlay(
1276        parent_hash: B256,
1277        state: &EngineApiTreeState<N>,
1278    ) -> (Option<LazyOverlay>, B256) {
1279        // Get blocks leading to the parent to determine the anchor
1280        let (anchor_hash, blocks) =
1281            state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
1282
1283        if blocks.is_empty() {
1284            debug!(target: "engine::tree::payload_validator", "Parent found on disk, no lazy overlay needed");
1285            return (None, anchor_hash);
1286        }
1287
1288        // Try to use the cached overlay if it matches both parent hash and anchor
1289        if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
1290            debug!(
1291                target: "engine::tree::payload_validator",
1292                %parent_hash,
1293                %anchor_hash,
1294                "Using cached canonical overlay"
1295            );
1296            return (Some(cached.overlay.clone()), cached.anchor_hash);
1297        }
1298
1299        debug!(
1300            target: "engine::tree::payload_validator",
1301            %anchor_hash,
1302            num_blocks = blocks.len(),
1303            "Creating lazy overlay for in-memory blocks"
1304        );
1305
1306        // Extract deferred trie data handles (non-blocking)
1307        let handles: Vec<DeferredTrieData> = blocks.iter().map(|b| b.trie_data_handle()).collect();
1308
1309        (Some(LazyOverlay::new(anchor_hash, handles)), anchor_hash)
1310    }
1311
1312    /// Spawns a background task to compute and sort trie data for the executed block.
1313    ///
1314    /// This function creates a [`DeferredTrieData`] handle with fallback inputs and spawns a
1315    /// blocking task that calls `wait_cloned()` to:
1316    /// 1. Sort the block's hashed state and trie updates
1317    /// 2. Merge ancestor overlays and extend with the sorted data
1318    /// 3. Create an [`AnchoredTrieInput`](reth_chain_state::AnchoredTrieInput) for efficient future
1319    ///    trie computations
1320    /// 4. Cache the result so subsequent calls return immediately
1321    ///
1322    /// If the background task hasn't completed when `trie_data()` is called, `wait_cloned()`
1323    /// computes from the stored inputs, eliminating deadlock risk and duplicate computation.
1324    ///
1325    /// The validation hot path can return immediately after state root verification,
1326    /// while consumers (DB writes, overlay providers, proofs) get trie data either
1327    /// from the completed task or via fallback computation.
1328    fn spawn_deferred_trie_task(
1329        &self,
1330        block: RecoveredBlock<N::Block>,
1331        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
1332        ctx: &TreeCtx<'_, N>,
1333        hashed_state: HashedPostState,
1334        trie_output: TrieUpdates,
1335        overlay_factory: OverlayStateProviderFactory<P>,
1336    ) -> ExecutedBlock<N> {
1337        // Capture parent hash and ancestor overlays for deferred trie input construction.
1338        let (anchor_hash, overlay_blocks) = ctx
1339            .state()
1340            .tree_state
1341            .blocks_by_hash(block.parent_hash())
1342            .unwrap_or_else(|| (block.parent_hash(), Vec::new()));
1343
1344        // Collect lightweight ancestor trie data handles. We don't call trie_data() here;
1345        // the merge and any fallback sorting happens in the compute_trie_input_task.
1346        let ancestors: Vec<DeferredTrieData> =
1347            overlay_blocks.iter().rev().map(|b| b.trie_data_handle()).collect();
1348
1349        // Create deferred handle with fallback inputs in case the background task hasn't completed.
1350        let deferred_trie_data = DeferredTrieData::pending(
1351            Arc::new(hashed_state),
1352            Arc::new(trie_output),
1353            anchor_hash,
1354            ancestors,
1355        );
1356        let deferred_handle_task = deferred_trie_data.clone();
1357        let block_validation_metrics = self.metrics.block_validation.clone();
1358
1359        // Capture block info and cache handle for changeset computation
1360        let block_hash = block.hash();
1361        let block_number = block.number();
1362        let changeset_cache = self.changeset_cache.clone();
1363
1364        // Spawn background task to compute trie data. Calling `wait_cloned` will compute from
1365        // the stored inputs and cache the result, so subsequent calls return immediately.
1366        let compute_trie_input_task = move || {
1367            let _span = debug_span!(
1368                target: "engine::tree::payload_validator",
1369                "compute_trie_input_task",
1370                block_number
1371            )
1372            .entered();
1373
1374            let result = panic::catch_unwind(AssertUnwindSafe(|| {
1375                let compute_start = Instant::now();
1376                let computed = deferred_handle_task.wait_cloned();
1377                block_validation_metrics
1378                    .deferred_trie_compute_duration
1379                    .record(compute_start.elapsed().as_secs_f64());
1380
1381                // Record sizes of the computed trie data
1382                block_validation_metrics
1383                    .hashed_post_state_size
1384                    .record(computed.hashed_state.total_len() as f64);
1385                block_validation_metrics
1386                    .trie_updates_sorted_size
1387                    .record(computed.trie_updates.total_len() as f64);
1388                if let Some(anchored) = &computed.anchored_trie_input {
1389                    block_validation_metrics
1390                        .anchored_overlay_trie_updates_size
1391                        .record(anchored.trie_input.nodes.total_len() as f64);
1392                    block_validation_metrics
1393                        .anchored_overlay_hashed_state_size
1394                        .record(anchored.trie_input.state.total_len() as f64);
1395                }
1396
1397                // Compute and cache changesets using the computed trie_updates
1398                let changeset_start = Instant::now();
1399
1400                // Get a provider from the overlay factory for trie cursor access
1401                let changeset_result =
1402                    overlay_factory.database_provider_ro().and_then(|provider| {
1403                        reth_trie::changesets::compute_trie_changesets(
1404                            &provider,
1405                            &computed.trie_updates,
1406                        )
1407                        .map_err(ProviderError::Database)
1408                    });
1409
1410                match changeset_result {
1411                    Ok(changesets) => {
1412                        debug!(
1413                            target: "engine::tree::changeset",
1414                            ?block_number,
1415                            elapsed = ?changeset_start.elapsed(),
1416                            "Computed and caching changesets"
1417                        );
1418
1419                        changeset_cache.insert(block_hash, block_number, Arc::new(changesets));
1420                    }
1421                    Err(e) => {
1422                        warn!(
1423                            target: "engine::tree::changeset",
1424                            ?block_number,
1425                            ?e,
1426                            "Failed to compute changesets in deferred trie task"
1427                        );
1428                    }
1429                }
1430            }));
1431
1432            if result.is_err() {
1433                error!(
1434                    target: "engine::tree::payload_validator",
1435                    "Deferred trie task panicked; fallback computation will be used when trie data is accessed"
1436                );
1437            }
1438        };
1439
1440        // Spawn task that computes trie data asynchronously.
1441        self.payload_processor.executor().spawn_blocking(compute_trie_input_task);
1442
1443        ExecutedBlock::with_deferred_trie_data(
1444            Arc::new(block),
1445            execution_outcome,
1446            deferred_trie_data,
1447        )
1448    }
1449}
1450
1451/// Output of block or payload validation.
1452pub type ValidationOutcome<N, E = InsertPayloadError<BlockTy<N>>> = Result<ExecutedBlock<N>, E>;
1453
1454/// Strategy describing how to compute the state root.
1455#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1456enum StateRootStrategy {
1457    /// Use the state root task (background sparse trie computation).
1458    StateRootTask,
1459    /// Run the parallel state root computation on the calling thread.
1460    Parallel,
1461    /// Fall back to synchronous computation via the state provider.
1462    Synchronous,
1463}
1464
1465/// Type that validates the payloads processed by the engine.
1466///
1467/// This provides the necessary functions for validating/executing payloads/blocks.
1468pub trait EngineValidator<
1469    Types: PayloadTypes,
1470    N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
1471>: Send + Sync + 'static
1472{
1473    /// Validates the payload attributes with respect to the header.
1474    ///
1475    /// By default, this enforces that the payload attributes timestamp is greater than the
1476    /// timestamp according to:
1477    ///   > 7. Client software MUST ensure that payloadAttributes.timestamp is greater than
1478    ///   > timestamp
1479    ///   > of a block referenced by forkchoiceState.headBlockHash.
1480    ///
1481    /// See also: <https://github.com/ethereum/execution-apis/blob/main/src/engine/common.md#specification-1>
1482    fn validate_payload_attributes_against_header(
1483        &self,
1484        attr: &Types::PayloadAttributes,
1485        header: &N::BlockHeader,
1486    ) -> Result<(), InvalidPayloadAttributesError>;
1487
1488    /// Ensures that the given payload does not violate any consensus rules that concern the block's
1489    /// layout.
1490    ///
1491    /// This function must convert the payload into the executable block and pre-validate its
1492    /// fields.
1493    ///
1494    /// Implementers should ensure that the checks are done in the order that conforms with the
1495    /// engine-API specification.
1496    fn convert_payload_to_block(
1497        &self,
1498        payload: Types::ExecutionData,
1499    ) -> Result<SealedBlock<N::Block>, NewPayloadError>;
1500
1501    /// Validates a payload received from engine API.
1502    fn validate_payload(
1503        &mut self,
1504        payload: Types::ExecutionData,
1505        ctx: TreeCtx<'_, N>,
1506    ) -> ValidationOutcome<N>;
1507
1508    /// Validates a block downloaded from the network.
1509    fn validate_block(
1510        &mut self,
1511        block: SealedBlock<N::Block>,
1512        ctx: TreeCtx<'_, N>,
1513    ) -> ValidationOutcome<N>;
1514
1515    /// Hook called after an executed block is inserted directly into the tree.
1516    ///
1517    /// This is invoked when blocks are inserted via `InsertExecutedBlock` (e.g., locally built
1518    /// blocks by sequencers) to allow implementations to update internal state such as caches.
1519    fn on_inserted_executed_block(&self, block: ExecutedBlock<N>);
1520}
1521
1522impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
1523where
1524    P: DatabaseProviderFactory<
1525            Provider: BlockReader
1526                          + StageCheckpointReader
1527                          + PruneCheckpointReader
1528                          + ChangeSetReader
1529                          + StorageChangeSetReader
1530                          + BlockNumReader
1531                          + StorageSettingsCache,
1532        > + BlockReader<Header = N::BlockHeader>
1533        + StateProviderFactory
1534        + StateReader
1535        + ChangeSetReader
1536        + BlockNumReader
1537        + HashedPostStateProvider
1538        + Clone
1539        + 'static,
1540    N: NodePrimitives,
1541    V: PayloadValidator<Types, Block = N::Block>,
1542    Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
1543    Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1544{
1545    fn validate_payload_attributes_against_header(
1546        &self,
1547        attr: &Types::PayloadAttributes,
1548        header: &N::BlockHeader,
1549    ) -> Result<(), InvalidPayloadAttributesError> {
1550        self.validator.validate_payload_attributes_against_header(attr, header)
1551    }
1552
1553    fn convert_payload_to_block(
1554        &self,
1555        payload: Types::ExecutionData,
1556    ) -> Result<SealedBlock<N::Block>, NewPayloadError> {
1557        let block = self.validator.convert_payload_to_block(payload)?;
1558        Ok(block)
1559    }
1560
1561    fn validate_payload(
1562        &mut self,
1563        payload: Types::ExecutionData,
1564        ctx: TreeCtx<'_, N>,
1565    ) -> ValidationOutcome<N> {
1566        self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
1567    }
1568
1569    fn validate_block(
1570        &mut self,
1571        block: SealedBlock<N::Block>,
1572        ctx: TreeCtx<'_, N>,
1573    ) -> ValidationOutcome<N> {
1574        self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
1575    }
1576
1577    fn on_inserted_executed_block(&self, block: ExecutedBlock<N>) {
1578        self.payload_processor.on_inserted_executed_block(
1579            block.recovered_block.block_with_parent(),
1580            &block.execution_output.state,
1581        );
1582    }
1583}
1584
1585/// Enum representing either block or payload being validated.
1586#[derive(Debug)]
1587pub enum BlockOrPayload<T: PayloadTypes> {
1588    /// Payload.
1589    Payload(T::ExecutionData),
1590    /// Block.
1591    Block(SealedBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
1592}
1593
1594impl<T: PayloadTypes> BlockOrPayload<T> {
1595    /// Returns the hash of the block.
1596    pub fn hash(&self) -> B256 {
1597        match self {
1598            Self::Payload(payload) => payload.block_hash(),
1599            Self::Block(block) => block.hash(),
1600        }
1601    }
1602
1603    /// Returns the number and hash of the block.
1604    pub fn num_hash(&self) -> NumHash {
1605        match self {
1606            Self::Payload(payload) => payload.num_hash(),
1607            Self::Block(block) => block.num_hash(),
1608        }
1609    }
1610
1611    /// Returns the parent hash of the block.
1612    pub fn parent_hash(&self) -> B256 {
1613        match self {
1614            Self::Payload(payload) => payload.parent_hash(),
1615            Self::Block(block) => block.parent_hash(),
1616        }
1617    }
1618
1619    /// Returns [`BlockWithParent`] for the block.
1620    pub fn block_with_parent(&self) -> BlockWithParent {
1621        match self {
1622            Self::Payload(payload) => payload.block_with_parent(),
1623            Self::Block(block) => block.block_with_parent(),
1624        }
1625    }
1626
1627    /// Returns a string showing whether or not this is a block or payload.
1628    pub const fn type_name(&self) -> &'static str {
1629        match self {
1630            Self::Payload(_) => "payload",
1631            Self::Block(_) => "block",
1632        }
1633    }
1634
1635    /// Returns the block access list if available.
1636    pub const fn block_access_list(&self) -> Option<Result<BlockAccessList, alloy_rlp::Error>> {
1637        // TODO decode and return `BlockAccessList`
1638        None
1639    }
1640
1641    /// Returns the number of transactions in the payload or block.
1642    pub fn transaction_count(&self) -> usize
1643    where
1644        T::ExecutionData: ExecutionPayload,
1645    {
1646        match self {
1647            Self::Payload(payload) => payload.transaction_count(),
1648            Self::Block(block) => block.transaction_count(),
1649        }
1650    }
1651
1652    /// Returns the withdrawals from the payload or block.
1653    pub fn withdrawals(&self) -> Option<&[Withdrawal]>
1654    where
1655        T::ExecutionData: ExecutionPayload,
1656    {
1657        match self {
1658            Self::Payload(payload) => payload.withdrawals().map(|w| w.as_slice()),
1659            Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
1660        }
1661    }
1662}