Skip to main content

reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache},
6    payload_processor::{
7        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
8        sparse_trie::StateRootComputeOutcome,
9    },
10    sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
11    StateProviderBuilder, TreeConfig,
12};
13use alloy_eip7928::BlockAccessList;
14use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
15use alloy_evm::block::StateChangeSource;
16use alloy_primitives::B256;
17use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
18use metrics::{Counter, Histogram};
19use multiproof::{SparseTrieUpdate, *};
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use rayon::prelude::*;
23use reth_evm::{
24    block::ExecutableTxParts,
25    execute::{ExecutableTxFor, WithTxEnv},
26    ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
27    SpecFor, TxEnvFor,
28};
29use reth_metrics::Metrics;
30use reth_primitives_traits::NodePrimitives;
31use reth_provider::{
32    BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProvider,
33    StateProviderFactory, StateReader,
34};
35use reth_revm::{db::BundleState, state::EvmState};
36use reth_tasks::Runtime;
37use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
38use reth_trie_parallel::{
39    proof_task::{ProofTaskCtx, ProofWorkerHandle},
40    root::ParallelStateRootError,
41};
42use reth_trie_sparse::{
43    ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
44};
45use std::{
46    collections::BTreeMap,
47    ops::Not,
48    sync::{
49        atomic::AtomicBool,
50        mpsc::{self, channel},
51        Arc,
52    },
53    time::Instant,
54};
55use tracing::{debug, debug_span, instrument, warn, Span};
56
57pub mod bal;
58pub mod multiproof;
59mod preserved_sparse_trie;
60pub mod prewarm;
61pub mod receipt_root_task;
62pub mod sparse_trie;
63
64use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
65
66/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
67///
68/// These values were determined by performing benchmarks using gradually increasing values to judge
69/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
70/// would deteriorate to the point where PST might as well not be used.
71pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
72    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
73
74/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
75/// nodes in allocated sparse tries.
76///
77/// Node maps have a key of `Nibbles` and value of `SparseNode`.
78/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
79///
80/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
81/// 120MB.
82pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
83
84/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
85/// in allocated sparse tries.
86///
87/// There are storage and account values, the largest of the two being account values, which are
88/// essentially `TrieAccount`s.
89///
90/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
91/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
92///
93/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
94/// 144MB.
95pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
96
97/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
98/// prewarm workers exceeds the execution time saved.
99pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
100/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
101type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
102    WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
103    <I as ExecutableTxTuple>::Error,
104    <N as NodePrimitives>::Receipt,
105>;
106
107/// Entrypoint for executing the payload.
108#[derive(Debug)]
109pub struct PayloadProcessor<Evm>
110where
111    Evm: ConfigureEvm,
112{
113    /// The executor used by to spawn tasks.
114    executor: Runtime,
115    /// The most recent cache used for execution.
116    execution_cache: PayloadExecutionCache,
117    /// Metrics for trie operations
118    trie_metrics: MultiProofTaskMetrics,
119    /// Cross-block cache size in bytes.
120    cross_block_cache_size: usize,
121    /// Whether transactions should not be executed on prewarming task.
122    disable_transaction_prewarming: bool,
123    /// Whether state cache should be disable
124    disable_state_cache: bool,
125    /// Determines how to configure the evm for execution.
126    evm_config: Evm,
127    /// Whether precompile cache should be disabled.
128    precompile_cache_disabled: bool,
129    /// Precompile cache map.
130    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
131    /// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
132    /// re-use allocated memory. Stored with the block hash it was computed for to enable trie
133    /// preservation across sequential payload validations.
134    sparse_state_trie: SharedPreservedSparseTrie,
135    /// Maximum concurrency for prewarm task.
136    prewarm_max_concurrency: usize,
137    /// Sparse trie prune depth.
138    sparse_trie_prune_depth: usize,
139    /// Maximum storage tries to retain after pruning.
140    sparse_trie_max_storage_tries: usize,
141    /// Whether sparse trie cache pruning is fully disabled.
142    disable_sparse_trie_cache_pruning: bool,
143    /// Whether to disable cache metrics recording.
144    disable_cache_metrics: bool,
145}
146
147impl<N, Evm> PayloadProcessor<Evm>
148where
149    N: NodePrimitives,
150    Evm: ConfigureEvm<Primitives = N>,
151{
152    /// Returns a reference to the workload executor driving payload tasks.
153    pub const fn executor(&self) -> &Runtime {
154        &self.executor
155    }
156
157    /// Creates a new payload processor.
158    pub fn new(
159        executor: Runtime,
160        evm_config: Evm,
161        config: &TreeConfig,
162        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
163    ) -> Self {
164        Self {
165            executor,
166            execution_cache: Default::default(),
167            trie_metrics: Default::default(),
168            cross_block_cache_size: config.cross_block_cache_size(),
169            disable_transaction_prewarming: config.disable_prewarming(),
170            evm_config,
171            disable_state_cache: config.disable_state_cache(),
172            precompile_cache_disabled: config.precompile_cache_disabled(),
173            precompile_cache_map,
174            sparse_state_trie: SharedPreservedSparseTrie::default(),
175            prewarm_max_concurrency: config.prewarm_max_concurrency(),
176            sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
177            sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
178            disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
179            disable_cache_metrics: config.disable_cache_metrics(),
180        }
181    }
182}
183
184impl<N, Evm> PayloadProcessor<Evm>
185where
186    N: NodePrimitives,
187    Evm: ConfigureEvm<Primitives = N> + 'static,
188{
189    /// Spawns all background tasks and returns a handle connected to the tasks.
190    ///
191    /// - Transaction prewarming task
192    /// - State root task
193    /// - Sparse trie task
194    ///
195    /// # Transaction prewarming task
196    ///
197    /// Responsible for feeding state updates to the multi proof task.
198    ///
199    /// This task runs until:
200    ///  - externally cancelled (e.g. sequential block execution is complete)
201    ///
202    /// ## Multi proof task
203    ///
204    /// Responsible for preparing sparse trie messages for the sparse trie task.
205    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
206    /// output back to this task.
207    ///
208    /// Receives updates from sequential execution.
209    /// This task runs until it receives a shutdown signal, which should be after the block
210    /// was fully executed.
211    ///
212    /// ## Sparse trie task
213    ///
214    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
215    ///
216    /// This task runs until there are no further updates to process.
217    ///
218    ///
219    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
220    /// canceling)
221    #[instrument(
222        level = "debug",
223        target = "engine::tree::payload_processor",
224        name = "payload processor",
225        skip_all
226    )]
227    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
228        &mut self,
229        env: ExecutionEnv<Evm>,
230        transactions: I,
231        provider_builder: StateProviderBuilder<N, P>,
232        multiproof_provider_factory: F,
233        config: &TreeConfig,
234        bal: Option<Arc<BlockAccessList>>,
235    ) -> IteratorPayloadHandle<Evm, I, N>
236    where
237        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
238        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
239            + Clone
240            + Send
241            + 'static,
242    {
243        // start preparing transactions immediately
244        let (prewarm_rx, execution_rx) =
245            self.spawn_tx_iterator(transactions, env.transaction_count);
246
247        let span = Span::current();
248        let (to_sparse_trie, sparse_trie_rx) = channel();
249        let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
250
251        // Extract V2 proofs flag early so we can pass it to prewarm
252        let v2_proofs_enabled = !config.disable_proof_v2();
253
254        // Capture parent_state_root before env is moved into spawn_caching_with
255        let parent_state_root = env.parent_state_root;
256
257        // Handle BAL-based optimization if available
258        let prewarm_handle = if let Some(bal) = bal {
259            // When BAL is present, use BAL prewarming and send BAL to multiproof
260            debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
261
262            // The prewarm task converts the BAL to HashedPostState and sends it on
263            // to_multi_proof after slot prefetching completes.
264            self.spawn_caching_with(
265                env,
266                prewarm_rx,
267                provider_builder.clone(),
268                Some(to_multi_proof.clone()),
269                Some(bal),
270                v2_proofs_enabled,
271            )
272        } else {
273            // Normal path: spawn with transaction prewarming
274            self.spawn_caching_with(
275                env,
276                prewarm_rx,
277                provider_builder.clone(),
278                Some(to_multi_proof.clone()),
279                None,
280                v2_proofs_enabled,
281            )
282        };
283
284        // Create and spawn the storage proof task
285        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
286        let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
287
288        if config.disable_trie_cache() {
289            let multi_proof_task = MultiProofTask::new(
290                proof_handle.clone(),
291                to_sparse_trie,
292                config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
293                to_multi_proof.clone(),
294                from_multi_proof.clone(),
295            )
296            .with_v2_proofs_enabled(v2_proofs_enabled);
297
298            // spawn multi-proof task
299            let parent_span = span.clone();
300            let saved_cache = prewarm_handle.saved_cache.clone();
301            self.executor.spawn_blocking(move || {
302                let _enter = parent_span.entered();
303                // Build a state provider for the multiproof task
304                let provider = provider_builder.build().expect("failed to build provider");
305                let provider = if let Some(saved_cache) = saved_cache {
306                    let (cache, metrics, _disable_metrics) = saved_cache.split();
307                    Box::new(CachedStateProvider::new(provider, cache, metrics))
308                        as Box<dyn StateProvider>
309                } else {
310                    Box::new(provider)
311                };
312                multi_proof_task.run(provider);
313            });
314        }
315
316        // wire the sparse trie to the state root response receiver
317        let (state_root_tx, state_root_rx) = channel();
318
319        // Spawn the sparse trie task using any stored trie and parallel trie configuration.
320        self.spawn_sparse_trie_task(
321            sparse_trie_rx,
322            proof_handle,
323            state_root_tx,
324            from_multi_proof,
325            config,
326            parent_state_root,
327        );
328
329        PayloadHandle {
330            to_multi_proof: Some(to_multi_proof),
331            prewarm_handle,
332            state_root: Some(state_root_rx),
333            transactions: execution_rx,
334            _span: span,
335        }
336    }
337
338    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
339    ///
340    /// Returns a [`PayloadHandle`] to communicate with the task.
341    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
342    pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
343        &self,
344        env: ExecutionEnv<Evm>,
345        transactions: I,
346        provider_builder: StateProviderBuilder<N, P>,
347        bal: Option<Arc<BlockAccessList>>,
348    ) -> IteratorPayloadHandle<Evm, I, N>
349    where
350        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
351    {
352        let (prewarm_rx, execution_rx) =
353            self.spawn_tx_iterator(transactions, env.transaction_count);
354        // This path doesn't use multiproof, so V2 proofs flag doesn't matter
355        let prewarm_handle =
356            self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
357        PayloadHandle {
358            to_multi_proof: None,
359            prewarm_handle,
360            state_root: None,
361            transactions: execution_rx,
362            _span: Span::current(),
363        }
364    }
365
366    /// Transaction count threshold below which sequential signature recovery is used.
367    ///
368    /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
369    /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
370    /// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
371    /// for small blocks.
372    const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
373
374    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
375    ///
376    /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
377    /// sequential iteration to avoid rayon overhead.
378    #[expect(clippy::type_complexity)]
379    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
380        &self,
381        transactions: I,
382        transaction_count: usize,
383    ) -> (
384        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
385        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
386    ) {
387        let (ooo_tx, ooo_rx) = mpsc::channel();
388        let (prewarm_tx, prewarm_rx) = mpsc::channel();
389        let (execute_tx, execute_rx) = mpsc::channel();
390
391        if transaction_count == 0 {
392            // Empty block — nothing to do.
393        } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
394            // Sequential path for small blocks — avoids rayon work-stealing setup and
395            // channel-based reorder overhead when it costs more than the ECDSA recovery itself.
396            debug!(
397                target: "engine::tree::payload_processor",
398                transaction_count,
399                "using sequential sig recovery for small block"
400            );
401            self.executor.spawn_blocking(move || {
402                let (transactions, convert) = transactions.into_parts();
403                for (idx, tx) in transactions.into_iter().enumerate() {
404                    let tx = convert.convert(tx);
405                    let tx = tx.map(|tx| {
406                        let (tx_env, tx) = tx.into_parts();
407                        WithTxEnv { tx_env, tx: Arc::new(tx) }
408                    });
409                    if let Ok(tx) = &tx {
410                        let _ = prewarm_tx.send(tx.clone());
411                    }
412                    let _ = ooo_tx.send((idx, tx));
413                }
414            });
415        } else {
416            // Parallel path — spawn on rayon for parallel signature recovery.
417            rayon::spawn(move || {
418                let (transactions, convert) = transactions.into_parts();
419                transactions.into_par_iter().enumerate().for_each_with(
420                    ooo_tx,
421                    |ooo_tx, (idx, tx)| {
422                        let tx = convert.convert(tx);
423                        let tx = tx.map(|tx| {
424                            let (tx_env, tx) = tx.into_parts();
425                            WithTxEnv { tx_env, tx: Arc::new(tx) }
426                        });
427                        // Only send Ok(_) variants to prewarming task.
428                        if let Ok(tx) = &tx {
429                            let _ = prewarm_tx.send(tx.clone());
430                        }
431                        let _ = ooo_tx.send((idx, tx));
432                    },
433                );
434            });
435        }
436
437        // Spawn a task that processes out-of-order transactions from the task above and sends them
438        // to the execution task in order.
439        self.executor.spawn_blocking(move || {
440            let mut next_for_execution = 0;
441            let mut queue = BTreeMap::new();
442            while let Ok((idx, tx)) = ooo_rx.recv() {
443                if next_for_execution == idx {
444                    let _ = execute_tx.send(tx);
445                    next_for_execution += 1;
446
447                    while let Some(entry) = queue.first_entry() &&
448                        *entry.key() == next_for_execution
449                    {
450                        let _ = execute_tx.send(entry.remove());
451                        next_for_execution += 1;
452                    }
453                } else {
454                    queue.insert(idx, tx);
455                }
456            }
457        });
458
459        (prewarm_rx, execute_rx)
460    }
461
462    /// Spawn prewarming optionally wired to the multiproof task for target updates.
463    fn spawn_caching_with<P>(
464        &self,
465        env: ExecutionEnv<Evm>,
466        transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
467        provider_builder: StateProviderBuilder<N, P>,
468        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
469        bal: Option<Arc<BlockAccessList>>,
470        v2_proofs_enabled: bool,
471    ) -> CacheTaskHandle<N::Receipt>
472    where
473        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
474    {
475        let skip_prewarm =
476            self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
477
478        let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
479
480        // configure prewarming
481        let prewarm_ctx = PrewarmContext {
482            env,
483            evm_config: self.evm_config.clone(),
484            saved_cache: saved_cache.clone(),
485            provider: provider_builder,
486            metrics: PrewarmMetrics::default(),
487            terminate_execution: Arc::new(AtomicBool::new(false)),
488            precompile_cache_disabled: self.precompile_cache_disabled,
489            precompile_cache_map: self.precompile_cache_map.clone(),
490            v2_proofs_enabled,
491        };
492
493        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
494            self.executor.clone(),
495            self.execution_cache.clone(),
496            prewarm_ctx,
497            to_multi_proof,
498            self.prewarm_max_concurrency,
499        );
500
501        // spawn pre-warm task
502        {
503            let to_prewarm_task = to_prewarm_task.clone();
504            self.executor.spawn_blocking(move || {
505                let mode = if skip_prewarm {
506                    PrewarmMode::Skipped
507                } else if let Some(bal) = bal {
508                    PrewarmMode::BlockAccessList(bal)
509                } else {
510                    PrewarmMode::Transactions(transactions)
511                };
512                prewarm_task.run(mode, to_prewarm_task);
513            });
514        }
515
516        CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task) }
517    }
518
519    /// Returns the cache for the given parent hash.
520    ///
521    /// If the given hash is different then what is recently cached, then this will create a new
522    /// instance.
523    #[instrument(level = "debug", target = "engine::caching", skip(self))]
524    fn cache_for(&self, parent_hash: B256) -> SavedCache {
525        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
526            debug!("reusing execution cache");
527            cache
528        } else {
529            debug!("creating new execution cache on cache miss");
530            let start = Instant::now();
531            let cache = ExecutionCache::new(self.cross_block_cache_size);
532            let metrics = CachedStateMetrics::zeroed();
533            metrics.record_cache_creation(start.elapsed());
534            SavedCache::new(parent_hash, cache, metrics)
535                .with_disable_cache_metrics(self.disable_cache_metrics)
536        }
537    }
538
539    /// Spawns the [`SparseTrieTask`] for this payload processor.
540    ///
541    /// The trie is preserved when the new payload is a child of the previous one.
542    fn spawn_sparse_trie_task(
543        &self,
544        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
545        proof_worker_handle: ProofWorkerHandle,
546        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
547        from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
548        config: &TreeConfig,
549        parent_state_root: B256,
550    ) {
551        let preserved_sparse_trie = self.sparse_state_trie.clone();
552        let trie_metrics = self.trie_metrics.clone();
553        let disable_trie_cache = config.disable_trie_cache();
554        let prune_depth = self.sparse_trie_prune_depth;
555        let max_storage_tries = self.sparse_trie_max_storage_tries;
556        let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
557        let chunk_size =
558            config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
559        let executor = self.executor.clone();
560
561        let parent_span = Span::current();
562        self.executor.spawn_blocking(move || {
563            let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
564                .entered();
565
566            // Reuse a stored SparseStateTrie if available, applying continuation logic.
567            // If this payload's parent state root matches the preserved trie's anchor,
568            // we can reuse the pruned trie structure. Otherwise, we clear the trie but
569            // keep allocations.
570            let start = Instant::now();
571            let preserved = preserved_sparse_trie.take();
572            trie_metrics
573                .sparse_trie_cache_wait_duration_histogram
574                .record(start.elapsed().as_secs_f64());
575
576            let sparse_state_trie = preserved
577                .map(|preserved| preserved.into_trie_for(parent_state_root))
578                .unwrap_or_else(|| {
579                    debug!(
580                        target: "engine::tree::payload_processor",
581                        "Creating new sparse trie - no preserved trie available"
582                    );
583                    let default_trie = RevealableSparseTrie::blind_from(
584                        ParallelSparseTrie::default().with_parallelism_thresholds(
585                            PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
586                        ),
587                    );
588                    SparseStateTrie::new()
589                        .with_accounts_trie(default_trie.clone())
590                        .with_default_storage_trie(default_trie)
591                        .with_updates(true)
592                });
593
594            let mut task = if disable_trie_cache {
595                SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
596                    sparse_trie_rx,
597                    proof_worker_handle,
598                    trie_metrics.clone(),
599                    sparse_state_trie,
600                ))
601            } else {
602                SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie(
603                    &executor,
604                    from_multi_proof,
605                    proof_worker_handle,
606                    trie_metrics.clone(),
607                    sparse_state_trie.with_skip_proof_node_filtering(true),
608                    chunk_size,
609                ))
610            };
611
612            let result = task.run();
613            // Capture the computed state_root before sending the result
614            let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
615
616            // Acquire the guard before sending the result to prevent a race condition:
617            // Without this, the next block could start after send() but before store(),
618            // causing take() to return None and forcing it to create a new empty trie
619            // instead of reusing the preserved one. Holding the guard ensures the next
620            // block's take() blocks until we've stored the trie for reuse.
621            let mut guard = preserved_sparse_trie.lock();
622
623            // Send state root computation result - next block may start but will block on take()
624            if state_root_tx.send(result).is_err() {
625                // Receiver dropped - payload was likely invalid or cancelled.
626                // Clear the trie instead of preserving potentially invalid state.
627                debug!(
628                    target: "engine::tree::payload_processor",
629                    "State root receiver dropped, clearing trie"
630                );
631                let (trie, deferred) = task.into_cleared_trie(
632                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
633                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
634                );
635                guard.store(PreservedSparseTrie::cleared(trie));
636                // Drop guard before deferred to release lock before expensive deallocations
637                drop(guard);
638                drop(deferred);
639                return;
640            }
641
642            // Only preserve the trie as anchored if computation succeeded.
643            // A failed computation may have left the trie in a partially updated state.
644            let _enter =
645                debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
646            let deferred = if let Some(state_root) = computed_state_root {
647                let start = std::time::Instant::now();
648                let (trie, deferred) = task.into_trie_for_reuse(
649                    prune_depth,
650                    max_storage_tries,
651                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
652                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
653                    disable_cache_pruning,
654                );
655                trie_metrics
656                    .into_trie_for_reuse_duration_histogram
657                    .record(start.elapsed().as_secs_f64());
658                guard.store(PreservedSparseTrie::anchored(trie, state_root));
659                deferred
660            } else {
661                debug!(
662                    target: "engine::tree::payload_processor",
663                    "State root computation failed, clearing trie"
664                );
665                let (trie, deferred) = task.into_cleared_trie(
666                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
667                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
668                );
669                guard.store(PreservedSparseTrie::cleared(trie));
670                deferred
671            };
672            // Drop guard before deferred to release lock before expensive deallocations
673            drop(guard);
674            drop(deferred);
675        });
676    }
677
678    /// Updates the execution cache with the post-execution state from an inserted block.
679    ///
680    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
681    /// to ensure the cache remains warm for subsequent block execution.
682    ///
683    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
684    /// hitting the database, maintaining performance consistency.
685    pub fn on_inserted_executed_block(
686        &self,
687        block_with_parent: BlockWithParent,
688        bundle_state: &BundleState,
689    ) {
690        let disable_cache_metrics = self.disable_cache_metrics;
691        self.execution_cache.update_with_guard(|cached| {
692            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
693                debug!(
694                    target: "engine::caching",
695                    parent_hash = %block_with_parent.parent,
696                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
697                );
698                return
699            }
700
701            // Take existing cache (if any) or create fresh caches
702            let (caches, cache_metrics, _) = match cached.take() {
703                Some(existing) => existing.split(),
704                None => (
705                    ExecutionCache::new(self.cross_block_cache_size),
706                    CachedStateMetrics::zeroed(),
707                    false,
708                ),
709            };
710
711            // Insert the block's bundle state into cache
712            let new_cache =
713                SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
714                    .with_disable_cache_metrics(disable_cache_metrics);
715            if new_cache.cache().insert_state(bundle_state).is_err() {
716                *cached = None;
717                debug!(target: "engine::caching", "cleared execution cache on update error");
718                return
719            }
720            new_cache.update_metrics();
721
722            // Replace with the updated cache
723            *cached = Some(new_cache);
724            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
725        });
726    }
727}
728
729/// Handle to all the spawned tasks.
730///
731/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
732/// caching task without cloning the expensive `BundleState`.
733#[derive(Debug)]
734pub struct PayloadHandle<Tx, Err, R> {
735    /// Channel for evm state updates
736    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
737    // must include the receiver of the state root wired to the sparse trie
738    prewarm_handle: CacheTaskHandle<R>,
739    /// Stream of block transactions
740    transactions: mpsc::Receiver<Result<Tx, Err>>,
741    /// Receiver for the state root
742    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
743    /// Span for tracing
744    _span: Span,
745}
746
747impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
748    /// Awaits the state root
749    ///
750    /// # Panics
751    ///
752    /// If payload processing was started without background tasks.
753    #[instrument(
754        level = "debug",
755        target = "engine::tree::payload_processor",
756        name = "await_state_root",
757        skip_all
758    )]
759    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
760        self.state_root
761            .take()
762            .expect("state_root is None")
763            .recv()
764            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
765    }
766
767    /// Takes the state root receiver out of the handle for use with custom waiting logic
768    /// (e.g., timeout-based waiting).
769    ///
770    /// # Panics
771    ///
772    /// If payload processing was started without background tasks.
773    pub const fn take_state_root_rx(
774        &mut self,
775    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
776        self.state_root.take().expect("state_root is None")
777    }
778
779    /// Returns a state hook to be used to send state updates to this task.
780    ///
781    /// If a multiproof task is spawned the hook will notify it about new states.
782    pub fn state_hook(&self) -> impl OnStateHook {
783        // convert the channel into a `StateHookSender` that emits an event on drop
784        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
785
786        move |source: StateChangeSource, state: &EvmState| {
787            if let Some(sender) = &to_multi_proof {
788                let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
789            }
790        }
791    }
792
793    /// Returns a clone of the caches used by prewarming
794    pub fn caches(&self) -> Option<ExecutionCache> {
795        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
796    }
797
798    /// Returns a clone of the cache metrics used by prewarming
799    pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
800        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
801    }
802
803    /// Terminates the pre-warming transaction processing.
804    ///
805    /// Note: This does not terminate the task yet.
806    pub fn stop_prewarming_execution(&self) {
807        self.prewarm_handle.stop_prewarming_execution()
808    }
809
810    /// Terminates the entire caching task.
811    ///
812    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
813    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
814    /// path without cloning the expensive `BundleState`.
815    ///
816    /// Returns a sender for the channel that should be notified on block validation success.
817    pub fn terminate_caching(
818        &mut self,
819        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
820    ) -> Option<mpsc::Sender<()>> {
821        self.prewarm_handle.terminate_caching(execution_outcome)
822    }
823
824    /// Returns iterator yielding transactions from the stream.
825    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
826        core::iter::repeat_with(|| self.transactions.recv())
827            .take_while(|res| res.is_ok())
828            .map(|res| res.unwrap())
829    }
830}
831
832/// Access to the spawned [`PrewarmCacheTask`].
833///
834/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
835/// prewarm task without cloning the expensive `BundleState`.
836#[derive(Debug)]
837pub struct CacheTaskHandle<R> {
838    /// The shared cache the task operates with.
839    saved_cache: Option<SavedCache>,
840    /// Channel to the spawned prewarm task if any
841    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
842}
843
844impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
845    /// Terminates the pre-warming transaction processing.
846    ///
847    /// Note: This does not terminate the task yet.
848    pub fn stop_prewarming_execution(&self) {
849        self.to_prewarm_task
850            .as_ref()
851            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
852    }
853
854    /// Terminates the entire pre-warming task.
855    ///
856    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
857    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
858    #[must_use = "sender must be used and notified on block validation success"]
859    pub fn terminate_caching(
860        &mut self,
861        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
862    ) -> Option<mpsc::Sender<()>> {
863        if let Some(tx) = self.to_prewarm_task.take() {
864            let (valid_block_tx, valid_block_rx) = mpsc::channel();
865            let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
866            let _ = tx.send(event);
867
868            Some(valid_block_tx)
869        } else {
870            None
871        }
872    }
873}
874
875impl<R> Drop for CacheTaskHandle<R> {
876    fn drop(&mut self) {
877        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
878        if let Some(tx) = self.to_prewarm_task.take() {
879            let _ = tx.send(PrewarmTaskEvent::Terminate {
880                execution_outcome: None,
881                valid_block_rx: mpsc::channel().1,
882            });
883        }
884    }
885}
886
887/// Shared access to most recently used cache.
888///
889/// This cache is intended to used for processing the payload in the following manner:
890///  - Get Cache if the payload's parent block matches the parent block
891///  - Update cache upon successful payload execution
892///
893/// This process assumes that payloads are received sequentially.
894///
895/// ## Cache Safety
896///
897/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
898/// (such as prewarming tasks) must be terminated before calling
899/// [`PayloadExecutionCache::update_with_guard`], otherwise the cache may be corrupted or cleared.
900///
901/// ## Cache vs Prewarming Distinction
902///
903/// **[`PayloadExecutionCache`]**:
904/// - Stores parent block's execution state after completion
905/// - Used to fetch parent data for next block's execution
906/// - Must be exclusively accessed during save operations
907///
908/// **[`PrewarmCacheTask`]**:
909/// - Speculatively loads accounts/storage that might be used in transaction execution
910/// - Prepares data for state root proof computation
911/// - Runs concurrently but must not interfere with cache saves
912#[derive(Clone, Debug, Default)]
913pub struct PayloadExecutionCache {
914    /// Guarded cloneable cache identified by a block hash.
915    inner: Arc<RwLock<Option<SavedCache>>>,
916    /// Metrics for cache operations.
917    metrics: ExecutionCacheMetrics,
918}
919
920impl PayloadExecutionCache {
921    /// Returns the cache for `parent_hash` if it's available for use.
922    ///
923    /// A cache is considered available when:
924    /// - It exists and matches the requested parent hash
925    /// - No other tasks are currently using it (checked via Arc reference count)
926    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
927    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
928        let start = Instant::now();
929        let cache = self.inner.read();
930
931        let elapsed = start.elapsed();
932        self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
933        if elapsed.as_millis() > 5 {
934            warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
935        }
936
937        if let Some(c) = cache.as_ref() {
938            let cached_hash = c.executed_block_hash();
939            // Check that the cache hash matches the parent hash of the current block. It won't
940            // match in case it's a fork block.
941            let hash_matches = cached_hash == parent_hash;
942            // Check `is_available()` to ensure no other tasks (e.g., prewarming) currently hold
943            // a reference to this cache. We can only reuse it when we have exclusive access.
944            let available = c.is_available();
945            let usage_count = c.usage_count();
946
947            debug!(
948                target: "engine::caching",
949                %cached_hash,
950                %parent_hash,
951                hash_matches,
952                available,
953                usage_count,
954                "Existing cache found"
955            );
956
957            if available {
958                // If the has is available (no other threads are using it), but has a mismatching
959                // parent hash, we can just clear it and keep using without re-creating from
960                // scratch.
961                if !hash_matches {
962                    c.clear();
963                }
964                return Some(c.clone())
965            } else if hash_matches {
966                self.metrics.execution_cache_in_use.increment(1);
967            }
968        } else {
969            debug!(target: "engine::caching", %parent_hash, "No cache found");
970        }
971
972        None
973    }
974
975    /// Clears the tracked cache
976    #[expect(unused)]
977    pub(crate) fn clear(&self) {
978        self.inner.write().take();
979    }
980
981    /// Updates the cache with a closure that has exclusive access to the guard.
982    /// This ensures that all cache operations happen atomically.
983    ///
984    /// ## CRITICAL SAFETY REQUIREMENT
985    ///
986    /// **Before calling this method, you MUST ensure there are no other active cache users.**
987    /// This includes:
988    /// - No running [`PrewarmCacheTask`] instances that could write to the cache
989    /// - No concurrent transactions that might access the cached state
990    /// - All prewarming operations must be completed or cancelled
991    ///
992    /// Violating this requirement can result in cache corruption, incorrect state data,
993    /// and potential consensus failures.
994    pub fn update_with_guard<F>(&self, update_fn: F)
995    where
996        F: FnOnce(&mut Option<SavedCache>),
997    {
998        let mut guard = self.inner.write();
999        update_fn(&mut guard);
1000    }
1001}
1002
1003/// Metrics for execution cache operations.
1004#[derive(Metrics, Clone)]
1005#[metrics(scope = "consensus.engine.beacon")]
1006pub(crate) struct ExecutionCacheMetrics {
1007    /// Counter for when the execution cache was unavailable because other threads
1008    /// (e.g., prewarming) are still using it.
1009    pub(crate) execution_cache_in_use: Counter,
1010    /// Time spent waiting for execution cache mutex to become available.
1011    pub(crate) execution_cache_wait_duration: Histogram,
1012}
1013
1014/// EVM context required to execute a block.
1015#[derive(Debug, Clone)]
1016pub struct ExecutionEnv<Evm: ConfigureEvm> {
1017    /// Evm environment.
1018    pub evm_env: EvmEnvFor<Evm>,
1019    /// Hash of the block being executed.
1020    pub hash: B256,
1021    /// Hash of the parent block.
1022    pub parent_hash: B256,
1023    /// State root of the parent block.
1024    /// Used for sparse trie continuation: if the preserved trie's anchor matches this,
1025    /// the trie can be reused directly.
1026    pub parent_state_root: B256,
1027    /// Number of transactions in the block.
1028    /// Used to determine parallel worker count for prewarming.
1029    /// A value of 0 indicates the count is unknown.
1030    pub transaction_count: usize,
1031    /// Withdrawals included in the block.
1032    /// Used to generate prefetch targets for withdrawal addresses.
1033    pub withdrawals: Option<Vec<Withdrawal>>,
1034}
1035
1036impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
1037where
1038    EvmEnvFor<Evm>: Default,
1039{
1040    fn default() -> Self {
1041        Self {
1042            evm_env: Default::default(),
1043            hash: Default::default(),
1044            parent_hash: Default::default(),
1045            parent_state_root: Default::default(),
1046            transaction_count: 0,
1047            withdrawals: None,
1048        }
1049    }
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054    use super::PayloadExecutionCache;
1055    use crate::tree::{
1056        cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
1057        payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
1058        precompile_cache::PrecompileCacheMap,
1059        StateProviderBuilder, TreeConfig,
1060    };
1061    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1062    use alloy_evm::block::StateChangeSource;
1063    use rand::Rng;
1064    use reth_chainspec::ChainSpec;
1065    use reth_db_common::init::init_genesis;
1066    use reth_ethereum_primitives::TransactionSigned;
1067    use reth_evm::OnStateHook;
1068    use reth_evm_ethereum::EthEvmConfig;
1069    use reth_primitives_traits::{Account, Recovered, StorageEntry};
1070    use reth_provider::{
1071        providers::{BlockchainProvider, OverlayStateProviderFactory},
1072        test_utils::create_test_provider_factory_with_chain_spec,
1073        ChainSpecProvider, HashingWriter,
1074    };
1075    use reth_revm::db::BundleState;
1076    use reth_testing_utils::generators;
1077    use reth_trie::{test_utils::state_root, HashedPostState};
1078    use reth_trie_db::ChangesetCache;
1079    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1080    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1081    use std::sync::Arc;
1082
1083    fn make_saved_cache(hash: B256) -> SavedCache {
1084        let execution_cache = ExecutionCache::new(1_000);
1085        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
1086    }
1087
1088    #[test]
1089    fn execution_cache_allows_single_checkout() {
1090        let execution_cache = PayloadExecutionCache::default();
1091        let hash = B256::from([1u8; 32]);
1092
1093        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1094
1095        let first = execution_cache.get_cache_for(hash);
1096        assert!(first.is_some(), "expected initial checkout to succeed");
1097
1098        let second = execution_cache.get_cache_for(hash);
1099        assert!(second.is_none(), "second checkout should be blocked while guard is active");
1100
1101        drop(first);
1102
1103        let third = execution_cache.get_cache_for(hash);
1104        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1105    }
1106
1107    #[test]
1108    fn execution_cache_checkout_releases_on_drop() {
1109        let execution_cache = PayloadExecutionCache::default();
1110        let hash = B256::from([2u8; 32]);
1111
1112        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1113
1114        {
1115            let guard = execution_cache.get_cache_for(hash);
1116            assert!(guard.is_some(), "expected checkout to succeed");
1117            // Guard dropped at end of scope
1118        }
1119
1120        let retry = execution_cache.get_cache_for(hash);
1121        assert!(retry.is_some(), "checkout should succeed after guard drop");
1122    }
1123
1124    #[test]
1125    fn execution_cache_mismatch_parent_clears_and_returns() {
1126        let execution_cache = PayloadExecutionCache::default();
1127        let hash = B256::from([3u8; 32]);
1128
1129        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1130
1131        // When the parent hash doesn't match, the cache is cleared and returned for reuse
1132        let different_hash = B256::from([4u8; 32]);
1133        let cache = execution_cache.get_cache_for(different_hash);
1134        assert!(cache.is_some(), "cache should be returned for reuse after clearing")
1135    }
1136
1137    #[test]
1138    fn execution_cache_update_after_release_succeeds() {
1139        let execution_cache = PayloadExecutionCache::default();
1140        let initial = B256::from([5u8; 32]);
1141
1142        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1143
1144        let guard =
1145            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1146
1147        drop(guard);
1148
1149        let updated = B256::from([6u8; 32]);
1150        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1151
1152        let new_checkout = execution_cache.get_cache_for(updated);
1153        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1154    }
1155
1156    #[test]
1157    fn on_inserted_executed_block_populates_cache() {
1158        let payload_processor = PayloadProcessor::new(
1159            reth_tasks::Runtime::test(),
1160            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1161            &TreeConfig::default(),
1162            PrecompileCacheMap::default(),
1163        );
1164
1165        let parent_hash = B256::from([1u8; 32]);
1166        let block_hash = B256::from([10u8; 32]);
1167        let block_with_parent = BlockWithParent {
1168            block: BlockNumHash { hash: block_hash, number: 1 },
1169            parent: parent_hash,
1170        };
1171        let bundle_state = BundleState::default();
1172
1173        // Cache should be empty initially
1174        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1175
1176        // Update cache with inserted block
1177        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1178
1179        // Cache should now exist for the block hash
1180        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1181        assert!(cached.is_some());
1182        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1183    }
1184
1185    #[test]
1186    fn on_inserted_executed_block_skips_on_parent_mismatch() {
1187        let payload_processor = PayloadProcessor::new(
1188            reth_tasks::Runtime::test(),
1189            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1190            &TreeConfig::default(),
1191            PrecompileCacheMap::default(),
1192        );
1193
1194        // Setup: populate cache with block 1
1195        let block1_hash = B256::from([1u8; 32]);
1196        payload_processor
1197            .execution_cache
1198            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1199
1200        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
1201        let wrong_parent = B256::from([99u8; 32]);
1202        let block3_hash = B256::from([3u8; 32]);
1203        let block_with_parent = BlockWithParent {
1204            block: BlockNumHash { hash: block3_hash, number: 3 },
1205            parent: wrong_parent,
1206        };
1207        let bundle_state = BundleState::default();
1208
1209        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1210
1211        // Cache should still be for block 1 (unchanged)
1212        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1213        assert!(cached.is_some(), "Original cache should be preserved");
1214
1215        // Cache for block 3 should not exist
1216        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1217        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1218    }
1219
1220    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1221        let mut rng = generators::rng();
1222        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1223        let mut updates = Vec::with_capacity(updates_per_account);
1224
1225        for _ in 0..updates_per_account {
1226            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1227            let mut state_update = EvmState::default();
1228
1229            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1230
1231            for &address in selected_addresses {
1232                let mut storage = HashMap::default();
1233                if rng.random_bool(0.7) {
1234                    for _ in 0..rng.random_range(1..10) {
1235                        let slot = U256::from(rng.random::<u64>());
1236                        storage.insert(
1237                            slot,
1238                            EvmStorageSlot::new_changed(
1239                                U256::ZERO,
1240                                U256::from(rng.random::<u64>()),
1241                                0,
1242                            ),
1243                        );
1244                    }
1245                }
1246
1247                let account = revm_state::Account {
1248                    info: AccountInfo {
1249                        balance: U256::from(rng.random::<u64>()),
1250                        nonce: rng.random::<u64>(),
1251                        code_hash: KECCAK_EMPTY,
1252                        code: Some(Default::default()),
1253                        account_id: None,
1254                    },
1255                    original_info: Box::new(AccountInfo::default()),
1256                    storage,
1257                    status: AccountStatus::Touched,
1258                    transaction_id: 0,
1259                };
1260
1261                state_update.insert(address, account);
1262            }
1263
1264            updates.push(state_update);
1265        }
1266
1267        updates
1268    }
1269
1270    #[test]
1271    fn test_state_root() {
1272        reth_tracing::init_test_tracing();
1273
1274        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1275        let genesis_hash = init_genesis(&factory).unwrap();
1276
1277        let state_updates = create_mock_state_updates(10, 10);
1278        let mut hashed_state = HashedPostState::default();
1279        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1280            HashMap::default();
1281
1282        {
1283            let provider_rw = factory.provider_rw().expect("failed to get provider");
1284
1285            for update in &state_updates {
1286                let account_updates = update.iter().map(|(address, account)| {
1287                    (*address, Some(Account::from_revm_account(account)))
1288                });
1289                provider_rw
1290                    .insert_account_for_hashing(account_updates)
1291                    .expect("failed to insert accounts");
1292
1293                let storage_updates = update.iter().map(|(address, account)| {
1294                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1295                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1296                    });
1297                    (*address, storage_entries)
1298                });
1299                provider_rw
1300                    .insert_storage_for_hashing(storage_updates)
1301                    .expect("failed to insert storage");
1302            }
1303            provider_rw.commit().expect("failed to commit changes");
1304        }
1305
1306        for update in &state_updates {
1307            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1308
1309            for (address, account) in update {
1310                let storage: HashMap<B256, U256> = account
1311                    .storage
1312                    .iter()
1313                    .map(|(k, v)| (B256::from(*k), v.present_value))
1314                    .collect();
1315
1316                let entry = accumulated_state.entry(*address).or_default();
1317                entry.0 = Account::from_revm_account(account);
1318                entry.1.extend(storage);
1319            }
1320        }
1321
1322        let mut payload_processor = PayloadProcessor::new(
1323            reth_tasks::Runtime::test(),
1324            EthEvmConfig::new(factory.chain_spec()),
1325            &TreeConfig::default(),
1326            PrecompileCacheMap::default(),
1327        );
1328
1329        let provider_factory = BlockchainProvider::new(factory).unwrap();
1330
1331        let mut handle = payload_processor.spawn(
1332            Default::default(),
1333            (
1334                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1335                std::convert::identity,
1336            ),
1337            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1338            OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1339            &TreeConfig::default(),
1340            None, // No BAL for test
1341        );
1342
1343        let mut state_hook = handle.state_hook();
1344
1345        for (i, update) in state_updates.into_iter().enumerate() {
1346            state_hook.on_state(StateChangeSource::Transaction(i), &update);
1347        }
1348        drop(state_hook);
1349
1350        let root_from_task = handle.state_root().expect("task failed").state_root;
1351        let root_from_regular = state_root(accumulated_state);
1352
1353        assert_eq!(
1354            root_from_task, root_from_regular,
1355            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1356        );
1357    }
1358}