Skip to main content

reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    payload_processor::{
6        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
7        sparse_trie::StateRootComputeOutcome,
8    },
9    sparse_trie::SparseTrieCacheTask,
10    CacheWaitDurations, CachedStateMetrics, ExecutionCache, PayloadExecutionCache, SavedCache,
11    StateProviderBuilder, TreeConfig, WaitForCaches,
12};
13use alloy_eip7928::BlockAccessList;
14use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
15use alloy_evm::block::StateChangeSource;
16use alloy_primitives::B256;
17use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
18use multiproof::*;
19use prewarm::PrewarmMetrics;
20use rayon::prelude::*;
21use reth_evm::{
22    block::ExecutableTxParts,
23    execute::{ExecutableTxFor, WithTxEnv},
24    ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
25    SpecFor, TxEnvFor,
26};
27use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
28use reth_provider::{
29    BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
30};
31use reth_revm::{db::BundleState, state::EvmState};
32use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
33use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
34use reth_trie_parallel::{
35    proof_task::{ProofTaskCtx, ProofWorkerHandle},
36    root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39    ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
40};
41use std::{
42    ops::Not,
43    sync::{
44        atomic::{AtomicBool, AtomicUsize},
45        mpsc::{self, channel},
46        Arc,
47    },
48};
49use tracing::{debug, debug_span, instrument, trace, warn, Span};
50
51pub mod bal;
52pub mod multiproof;
53mod preserved_sparse_trie;
54pub mod prewarm;
55pub mod receipt_root_task;
56pub mod sparse_trie;
57
58use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
59
60/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
61/// nodes in allocated sparse tries.
62///
63/// Node maps have a key of `Nibbles` and value of `SparseNode`.
64/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
65///
66/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
67/// 120MB.
68pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
69
70/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
71/// in allocated sparse tries.
72///
73/// There are storage and account values, the largest of the two being account values, which are
74/// essentially `TrieAccount`s.
75///
76/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
77/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
78///
79/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
80/// 144MB.
81pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
82
83/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
84/// prewarm workers exceeds the execution time saved.
85pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
86
87/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
88type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
89    WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
90    <I as ExecutableTxTuple>::Error,
91    <N as NodePrimitives>::Receipt,
92>;
93
94/// Entrypoint for executing the payload.
95#[derive(Debug)]
96pub struct PayloadProcessor<Evm>
97where
98    Evm: ConfigureEvm,
99{
100    /// The executor used by to spawn tasks.
101    executor: Runtime,
102    /// The most recent cache used for execution.
103    execution_cache: PayloadExecutionCache,
104    /// Metrics for trie operations
105    trie_metrics: MultiProofTaskMetrics,
106    /// Cross-block cache size in bytes.
107    cross_block_cache_size: usize,
108    /// Whether transactions should not be executed on prewarming task.
109    disable_transaction_prewarming: bool,
110    /// Whether state cache should be disable
111    disable_state_cache: bool,
112    /// Determines how to configure the evm for execution.
113    evm_config: Evm,
114    /// Whether precompile cache should be disabled.
115    precompile_cache_disabled: bool,
116    /// Precompile cache map.
117    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
118    /// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
119    /// re-use allocated memory. Stored with the block hash it was computed for to enable trie
120    /// preservation across sequential payload validations.
121    sparse_state_trie: SharedPreservedSparseTrie,
122    /// LFU hot-slot capacity: max storage slots retained across prune cycles.
123    sparse_trie_max_hot_slots: usize,
124    /// LFU hot-account capacity: max account addresses retained across prune cycles.
125    sparse_trie_max_hot_accounts: usize,
126    /// Whether sparse trie cache pruning is fully disabled.
127    disable_sparse_trie_cache_pruning: bool,
128    /// Whether to disable cache metrics recording.
129    disable_cache_metrics: bool,
130}
131
132impl<N, Evm> PayloadProcessor<Evm>
133where
134    N: NodePrimitives,
135    Evm: ConfigureEvm<Primitives = N>,
136{
137    /// Returns a reference to the workload executor driving payload tasks.
138    pub const fn executor(&self) -> &Runtime {
139        &self.executor
140    }
141
142    /// Creates a new payload processor.
143    pub fn new(
144        executor: Runtime,
145        evm_config: Evm,
146        config: &TreeConfig,
147        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
148    ) -> Self {
149        Self {
150            executor,
151            execution_cache: Default::default(),
152            trie_metrics: Default::default(),
153            cross_block_cache_size: config.cross_block_cache_size(),
154            disable_transaction_prewarming: config.disable_prewarming(),
155            evm_config,
156            disable_state_cache: config.disable_state_cache(),
157            precompile_cache_disabled: config.precompile_cache_disabled(),
158            precompile_cache_map,
159            sparse_state_trie: SharedPreservedSparseTrie::default(),
160            sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
161            sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
162            disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
163            disable_cache_metrics: config.disable_cache_metrics(),
164        }
165    }
166}
167
168impl<Evm> WaitForCaches for PayloadProcessor<Evm>
169where
170    Evm: ConfigureEvm,
171{
172    fn wait_for_caches(&self) -> CacheWaitDurations {
173        debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
174
175        // Wait for both caches in parallel using std threads
176        let execution_cache = self.execution_cache.clone();
177        let sparse_trie = self.sparse_state_trie.clone();
178
179        // Use channels and spawn_blocking instead of std::thread::spawn
180        let (execution_tx, execution_rx) = std::sync::mpsc::channel();
181        let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
182
183        self.executor.spawn_blocking_named("wait-exec-cache", move || {
184            let _ = execution_tx.send(execution_cache.wait_for_availability());
185        });
186        self.executor.spawn_blocking_named("wait-sparse-tri", move || {
187            let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
188        });
189
190        let execution_cache_duration =
191            execution_rx.recv().expect("execution cache wait task failed to send result");
192        let sparse_trie_duration =
193            sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
194
195        debug!(
196            target: "engine::tree::payload_processor",
197            ?execution_cache_duration,
198            ?sparse_trie_duration,
199            "Execution cache and sparse trie locks acquired"
200        );
201        CacheWaitDurations {
202            execution_cache: execution_cache_duration,
203            sparse_trie: sparse_trie_duration,
204        }
205    }
206}
207
208impl<N, Evm> PayloadProcessor<Evm>
209where
210    N: NodePrimitives,
211    Evm: ConfigureEvm<Primitives = N> + 'static,
212{
213    /// Spawns all background tasks and returns a handle connected to the tasks.
214    ///
215    /// - Transaction prewarming task
216    /// - State root task
217    /// - Sparse trie task
218    ///
219    /// # Transaction prewarming task
220    ///
221    /// Responsible for feeding state updates to the multi proof task.
222    ///
223    /// This task runs until:
224    ///  - externally cancelled (e.g. sequential block execution is complete)
225    ///
226    /// ## Multi proof task
227    ///
228    /// Responsible for preparing sparse trie messages for the sparse trie task.
229    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
230    /// output back to this task.
231    ///
232    /// Receives updates from sequential execution.
233    /// This task runs until it receives a shutdown signal, which should be after the block
234    /// was fully executed.
235    ///
236    /// ## Sparse trie task
237    ///
238    /// Responsible for calculating the state root.
239    ///
240    /// This task runs until there are no further updates to process.
241    ///
242    ///
243    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
244    /// canceling)
245    #[instrument(
246        level = "debug",
247        target = "engine::tree::payload_processor",
248        name = "payload processor",
249        skip_all
250    )]
251    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
252        &mut self,
253        env: ExecutionEnv<Evm>,
254        transactions: I,
255        provider_builder: StateProviderBuilder<N, P>,
256        multiproof_provider_factory: F,
257        config: &TreeConfig,
258        bal: Option<Arc<BlockAccessList>>,
259    ) -> IteratorPayloadHandle<Evm, I, N>
260    where
261        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
262        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
263            + Clone
264            + Send
265            + Sync
266            + 'static,
267    {
268        // start preparing transactions immediately
269        let (prewarm_rx, execution_rx) =
270            self.spawn_tx_iterator(transactions, env.transaction_count);
271
272        let span = Span::current();
273
274        let state_root_handle = self.spawn_state_root(multiproof_provider_factory, &env, config);
275        let prewarm_handle = self.spawn_caching_with(
276            env,
277            prewarm_rx,
278            provider_builder,
279            Some(state_root_handle.to_multi_proof.clone()),
280            bal,
281        );
282
283        PayloadHandle {
284            state_root_handle: Some(state_root_handle),
285            prewarm_handle,
286            transactions: execution_rx,
287            _span: span,
288        }
289    }
290
291    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
292    ///
293    /// Returns a [`PayloadHandle`] to communicate with the task.
294    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
295    pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
296        &self,
297        env: ExecutionEnv<Evm>,
298        transactions: I,
299        provider_builder: StateProviderBuilder<N, P>,
300        bal: Option<Arc<BlockAccessList>>,
301    ) -> IteratorPayloadHandle<Evm, I, N>
302    where
303        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
304    {
305        let (prewarm_rx, execution_rx) =
306            self.spawn_tx_iterator(transactions, env.transaction_count);
307        let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
308        PayloadHandle {
309            state_root_handle: None,
310            prewarm_handle,
311            transactions: execution_rx,
312            _span: Span::current(),
313        }
314    }
315
316    /// Spawns state root computation pipeline (multiproof + sparse trie tasks).
317    ///
318    /// The returned [`StateRootHandle`] provides:
319    /// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
320    ///   execution.
321    /// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
322    ///   state root.
323    ///
324    /// The state hook **must** be dropped after execution to signal the end of state updates.
325    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
326    pub fn spawn_state_root<F>(
327        &mut self,
328        multiproof_provider_factory: F,
329        env: &ExecutionEnv<Evm>,
330        config: &TreeConfig,
331    ) -> StateRootHandle
332    where
333        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
334            + Clone
335            + Send
336            + Sync
337            + 'static,
338    {
339        let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
340
341        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
342        #[cfg(feature = "trie-debug")]
343        let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
344        let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
345        let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
346
347        let (state_root_tx, state_root_rx) = channel();
348
349        self.spawn_sparse_trie_task(
350            proof_handle,
351            state_root_tx,
352            from_multi_proof,
353            env.parent_state_root,
354            config.multiproof_chunk_size(),
355        );
356
357        StateRootHandle::new(to_multi_proof, state_root_rx)
358    }
359
360    /// Transaction count threshold below which proof workers are halved, since fewer transactions
361    /// produce fewer state changes and most workers would be idle overhead.
362    const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
363
364    /// Transaction count threshold below which sequential conversion is used.
365    ///
366    /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
367    /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
368    /// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
369    /// blocks.
370    const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
371
372    /// Number of leading transactions to convert sequentially before entering the rayon
373    /// parallel path.
374    ///
375    /// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
376    /// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
377    /// a small head sequentially and sending it immediately, execution can start without
378    /// waiting for rayon scheduling.
379    const PARALLEL_PREFETCH_COUNT: usize = 4;
380
381    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
382    ///
383    /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
384    /// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
385    /// iteration with [`ForEachOrdered`] to convert transactions in parallel while streaming
386    /// results to execution in the original transaction order.
387    #[expect(clippy::type_complexity)]
388    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
389    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
390        &self,
391        transactions: I,
392        transaction_count: usize,
393    ) -> (
394        mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
395        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
396    ) {
397        let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
398        let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
399
400        if transaction_count == 0 {
401            // Empty block — nothing to do.
402        } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
403            // Sequential path for small blocks — avoids rayon work-stealing setup and
404            // channel-based reorder overhead when it costs more than sequential conversion.
405            debug!(
406                target: "engine::tree::payload_processor",
407                transaction_count,
408                "using sequential sig recovery for small block"
409            );
410            self.executor.spawn_blocking_named("tx-iterator", move || {
411                let (transactions, convert) = transactions.into_parts();
412                convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
413            });
414        } else {
415            // Parallel path — recover signatures in parallel on rayon, stream results
416            // to execution in order via `for_each_ordered`.
417            //
418            // To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
419            // few transactions are recovered sequentially and sent immediately before
420            // entering the parallel iterator for the remainder.
421            let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
422            let executor = self.executor.clone();
423            self.executor.spawn_blocking_named("tx-iterator", move || {
424                let (transactions, convert) = transactions.into_parts();
425                let mut all: Vec<_> = transactions.into_iter().collect();
426                let rest = all.split_off(prefetch.min(all.len()));
427
428                // Convert the first few transactions sequentially so execution can
429                // start immediately without waiting for rayon work-stealing.
430                convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
431
432                // Convert the remaining transactions in parallel.
433                rest.into_par_iter()
434                    .enumerate()
435                    .map(|(i, tx)| {
436                        let idx = i + prefetch;
437                        let tx = convert.convert(tx);
438                        (idx, tx)
439                    })
440                    .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
441                        let tx = tx.map(|tx| {
442                            let (tx_env, tx) = tx.into_parts();
443                            let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
444                            let _ = prewarm_tx.send((idx, tx.clone()));
445                            tx
446                        });
447                        let _ = execute_tx.send(tx);
448                        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
449                        });
450                        });
451        }
452
453        (prewarm_rx, execute_rx)
454    }
455
456    /// Spawn prewarming optionally wired to the multiproof task for target updates.
457    #[instrument(
458        level = "debug",
459        target = "engine::tree::payload_processor",
460        skip_all,
461        fields(bal=%bal.is_some())
462    )]
463    fn spawn_caching_with<P>(
464        &self,
465        env: ExecutionEnv<Evm>,
466        transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
467        provider_builder: StateProviderBuilder<N, P>,
468        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
469        bal: Option<Arc<BlockAccessList>>,
470    ) -> CacheTaskHandle<N::Receipt>
471    where
472        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
473    {
474        let skip_prewarm =
475            self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
476
477        let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
478
479        let executed_tx_index = Arc::new(AtomicUsize::new(0));
480
481        // configure prewarming
482        let prewarm_ctx = PrewarmContext {
483            env,
484            evm_config: self.evm_config.clone(),
485            saved_cache: saved_cache.clone(),
486            provider: provider_builder,
487            metrics: PrewarmMetrics::default(),
488            terminate_execution: Arc::new(AtomicBool::new(false)),
489            executed_tx_index: Arc::clone(&executed_tx_index),
490            precompile_cache_disabled: self.precompile_cache_disabled,
491            precompile_cache_map: self.precompile_cache_map.clone(),
492        };
493
494        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
495            self.executor.clone(),
496            self.execution_cache.clone(),
497            prewarm_ctx,
498            to_multi_proof,
499        );
500
501        {
502            let to_prewarm_task = to_prewarm_task.clone();
503            self.executor.spawn_blocking_named("prewarm", move || {
504                let mode = if skip_prewarm {
505                    PrewarmMode::Skipped
506                } else if let Some(bal) = bal {
507                    PrewarmMode::BlockAccessList(bal)
508                } else {
509                    PrewarmMode::Transactions(transactions)
510                };
511                prewarm_task.run(mode, to_prewarm_task);
512            });
513        }
514
515        CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index }
516    }
517
518    /// Returns the cache for the given parent hash.
519    ///
520    /// If the given hash is different then what is recently cached, then this will create a new
521    /// instance.
522    #[instrument(level = "debug", target = "engine::caching", skip(self))]
523    fn cache_for(&self, parent_hash: B256) -> SavedCache {
524        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
525            debug!("reusing execution cache");
526            cache
527        } else {
528            debug!("creating new execution cache on cache miss");
529            let start = Instant::now();
530            let cache = ExecutionCache::new(self.cross_block_cache_size);
531            let metrics = CachedStateMetrics::zeroed();
532            metrics.record_cache_creation(start.elapsed());
533            SavedCache::new(parent_hash, cache, metrics)
534                .with_disable_cache_metrics(self.disable_cache_metrics)
535        }
536    }
537
538    /// Spawns the [`SparseTrieCacheTask`] for this payload processor.
539    ///
540    /// The trie is preserved when the new payload is a child of the previous one.
541    fn spawn_sparse_trie_task(
542        &self,
543        proof_worker_handle: ProofWorkerHandle,
544        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
545        from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
546        parent_state_root: B256,
547        chunk_size: usize,
548    ) {
549        let preserved_sparse_trie = self.sparse_state_trie.clone();
550        let trie_metrics = self.trie_metrics.clone();
551        let max_hot_slots = self.sparse_trie_max_hot_slots;
552        let max_hot_accounts = self.sparse_trie_max_hot_accounts;
553        let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
554        let executor = self.executor.clone();
555
556        let parent_span = Span::current();
557        self.executor.spawn_blocking_named("sparse-trie", move || {
558            reth_tasks::once!(increase_thread_priority);
559
560            let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
561                .entered();
562
563            // Reuse a stored SparseStateTrie if available, applying continuation logic.
564            // If this payload's parent state root matches the preserved trie's anchor,
565            // we can reuse the pruned trie structure. Otherwise, we clear the trie but
566            // keep allocations.
567            let start = Instant::now();
568            let preserved = preserved_sparse_trie.take();
569            trie_metrics
570                .sparse_trie_cache_wait_duration_histogram
571                .record(start.elapsed().as_secs_f64());
572
573            let mut sparse_state_trie = preserved
574                .map(|preserved| preserved.into_trie_for(parent_state_root))
575                .unwrap_or_else(|| {
576                    debug!(
577                        target: "engine::tree::payload_processor",
578                        "Creating new sparse trie - no preserved trie available"
579                    );
580                    let default_trie = RevealableSparseTrie::blind_from(
581                        ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
582                    );
583                    SparseStateTrie::default()
584                        .with_accounts_trie(default_trie.clone())
585                        .with_default_storage_trie(default_trie)
586                        .with_updates(true)
587                });
588            sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
589
590            let mut task = SparseTrieCacheTask::new_with_trie(
591                &executor,
592                from_multi_proof,
593                proof_worker_handle,
594                trie_metrics.clone(),
595                sparse_state_trie,
596                chunk_size,
597            );
598
599            let result = task.run();
600
601            // Acquire the guard before sending the result to prevent a race condition:
602            // Without this, the next block could start after send() but before store(),
603            // causing take() to return None and forcing it to create a new empty trie
604            // instead of reusing the preserved one. Holding the guard ensures the next
605            // block's take() blocks until we've stored the trie for reuse.
606            let mut guard = preserved_sparse_trie.lock();
607
608            let task_result = result.as_ref().ok().cloned();
609            // Send state root computation result - next block may start but will block on take()
610            if state_root_tx.send(result).is_err() {
611                // Receiver dropped - payload was likely invalid or cancelled.
612                // Clear the trie instead of preserving potentially invalid state.
613                debug!(
614                    target: "engine::tree::payload_processor",
615                    "State root receiver dropped, clearing trie"
616                );
617                let (trie, deferred) = task.into_cleared_trie(
618                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
619                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
620                );
621                guard.store(PreservedSparseTrie::cleared(trie));
622                drop(guard);
623                executor.spawn_drop(deferred);
624                return;
625            }
626
627            // Only preserve the trie as anchored if computation succeeded.
628            // A failed computation may have left the trie in a partially updated state.
629            let _enter =
630                debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
631            let deferred = if let Some(result) = task_result {
632                let start = Instant::now();
633                let (trie, deferred) = task.into_trie_for_reuse(
634                    max_hot_slots,
635                    max_hot_accounts,
636                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
637                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
638                    disable_cache_pruning,
639                    &result.trie_updates,
640                );
641                trie_metrics
642                    .into_trie_for_reuse_duration_histogram
643                    .record(start.elapsed().as_secs_f64());
644                trie_metrics
645                    .sparse_trie_retained_memory_bytes
646                    .set(trie.memory_size() as f64);
647                trie_metrics
648                    .sparse_trie_retained_storage_tries
649                    .set(trie.retained_storage_tries_count() as f64);
650                guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
651                deferred
652            } else {
653                debug!(
654                    target: "engine::tree::payload_processor",
655                    "State root computation failed, clearing trie"
656                );
657                let (trie, deferred) = task.into_cleared_trie(
658                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
659                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
660                );
661                guard.store(PreservedSparseTrie::cleared(trie));
662                deferred
663            };
664            drop(guard);
665            executor.spawn_drop(deferred);
666        });
667    }
668
669    /// Updates the execution cache with the post-execution state from an inserted block.
670    ///
671    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
672    /// to ensure the cache remains warm for subsequent block execution.
673    ///
674    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
675    /// hitting the database, maintaining performance consistency.
676    pub fn on_inserted_executed_block(
677        &self,
678        block_with_parent: BlockWithParent,
679        bundle_state: &BundleState,
680    ) {
681        let disable_cache_metrics = self.disable_cache_metrics;
682        self.execution_cache.update_with_guard(|cached| {
683            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
684                debug!(
685                    target: "engine::caching",
686                    parent_hash = %block_with_parent.parent,
687                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
688                );
689                return
690            }
691
692            // Take existing cache (if any) or create fresh caches
693            let (caches, cache_metrics, _) = match cached.take() {
694                Some(existing) => existing.split(),
695                None => (
696                    ExecutionCache::new(self.cross_block_cache_size),
697                    CachedStateMetrics::zeroed(),
698                    false,
699                ),
700            };
701
702            // Insert the block's bundle state into cache
703            let new_cache =
704                SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
705                    .with_disable_cache_metrics(disable_cache_metrics);
706            if new_cache.cache().insert_state(bundle_state).is_err() {
707                *cached = None;
708                debug!(target: "engine::caching", "cleared execution cache on update error");
709                return
710            }
711            new_cache.update_metrics();
712
713            // Replace with the updated cache
714            *cached = Some(new_cache);
715            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
716        });
717    }
718}
719
720/// Converts transactions sequentially and sends them to the prewarm and execute channels.
721fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
722    iter: impl Iterator<Item = RawTx>,
723    convert: &C,
724    prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
725    execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
726) where
727    Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
728    TxEnv: Clone,
729    C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
730{
731    for (idx, raw_tx) in iter.enumerate() {
732        let tx = convert.convert(raw_tx);
733        let tx = tx.map(|tx| {
734            let (tx_env, tx) = tx.into_parts();
735            WithTxEnv { tx_env, tx: Arc::new(tx) }
736        });
737        if let Ok(tx) = &tx {
738            let _ = prewarm_tx.send((idx, tx.clone()));
739        }
740        let _ = execute_tx.send(tx);
741        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
742    }
743}
744
745/// Handle to a background state root computation task.
746///
747/// Unlike [`PayloadHandle`], this does not include transaction iteration or cache prewarming.
748/// It only provides access to the state root computation via [`Self::state_hook`] and
749/// [`Self::state_root`].
750///
751/// Created by [`PayloadProcessor::spawn_state_root`].
752#[derive(Debug)]
753pub struct StateRootHandle {
754    /// Channel for evm state updates to the multiproof pipeline.
755    to_multi_proof: CrossbeamSender<MultiProofMessage>,
756    /// Receiver for the computed state root.
757    state_root_rx: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
758}
759
760impl StateRootHandle {
761    /// Creates a new state root handle.
762    pub const fn new(
763        to_multi_proof: CrossbeamSender<MultiProofMessage>,
764        state_root_rx: mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>,
765    ) -> Self {
766        Self { to_multi_proof, state_root_rx: Some(state_root_rx) }
767    }
768
769    /// Returns a state hook that streams state updates to the background state root task.
770    ///
771    /// The hook must be dropped after execution completes to signal the end of state updates.
772    pub fn state_hook(&self) -> impl OnStateHook {
773        let to_multi_proof = StateHookSender::new(self.to_multi_proof.clone());
774
775        move |source: StateChangeSource, state: &EvmState| {
776            let _ =
777                to_multi_proof.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
778        }
779    }
780
781    /// Awaits the state root computation result.
782    ///
783    /// # Panics
784    ///
785    /// If called more than once.
786    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
787        self.state_root_rx
788            .take()
789            .expect("state_root already taken")
790            .recv()
791            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
792    }
793
794    /// Takes the state root receiver for use with custom waiting logic (e.g., timeouts).
795    ///
796    /// # Panics
797    ///
798    /// If called more than once.
799    pub const fn take_state_root_rx(
800        &mut self,
801    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
802        self.state_root_rx.take().expect("state_root already taken")
803    }
804}
805
806/// Handle to all the spawned tasks.
807///
808/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
809/// caching task without cloning the expensive `BundleState`.
810#[derive(Debug)]
811pub struct PayloadHandle<Tx, Err, R> {
812    /// Handle to the background state root computation, if spawned.
813    state_root_handle: Option<StateRootHandle>,
814    // must include the receiver of the state root wired to the sparse trie
815    prewarm_handle: CacheTaskHandle<R>,
816    /// Stream of block transactions
817    transactions: mpsc::Receiver<Result<Tx, Err>>,
818    /// Span for tracing
819    _span: Span,
820}
821
822impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
823    /// Awaits the state root
824    ///
825    /// # Panics
826    ///
827    /// If payload processing was started without background tasks.
828    #[instrument(
829        level = "debug",
830        target = "engine::tree::payload_processor",
831        name = "await_state_root",
832        skip_all
833    )]
834    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
835        self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
836    }
837
838    /// Takes the state root receiver out of the handle for use with custom waiting logic
839    /// (e.g., timeout-based waiting).
840    ///
841    /// # Panics
842    ///
843    /// If payload processing was started without background tasks.
844    pub const fn take_state_root_rx(
845        &mut self,
846    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
847        self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
848    }
849
850    /// Returns a state hook to be used to send state updates to this task.
851    ///
852    /// If a multiproof task is spawned the hook will notify it about new states.
853    pub fn state_hook(&self) -> Option<impl OnStateHook> {
854        self.state_root_handle.as_ref().map(|handle| handle.state_hook())
855    }
856
857    /// Returns a clone of the caches used by prewarming
858    pub fn caches(&self) -> Option<ExecutionCache> {
859        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
860    }
861
862    /// Returns a clone of the cache metrics used by prewarming
863    pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
864        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
865    }
866
867    /// Returns a reference to the shared executed transaction index counter.
868    ///
869    /// The main execution loop should store `index + 1` after executing each transaction so that
870    /// prewarm workers can skip transactions that have already been processed.
871    pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
872        &self.prewarm_handle.executed_tx_index
873    }
874
875    /// Terminates the pre-warming transaction processing.
876    ///
877    /// Note: This does not terminate the task yet.
878    pub fn stop_prewarming_execution(&self) {
879        self.prewarm_handle.stop_prewarming_execution()
880    }
881
882    /// Terminates the entire caching task.
883    ///
884    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
885    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
886    /// path without cloning the expensive `BundleState`.
887    ///
888    /// Returns a sender for the channel that should be notified on block validation success.
889    pub fn terminate_caching(
890        &mut self,
891        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
892    ) -> Option<mpsc::Sender<()>> {
893        self.prewarm_handle.terminate_caching(execution_outcome)
894    }
895
896    /// Returns iterator yielding transactions from the stream.
897    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
898        self.transactions.iter()
899    }
900}
901
902/// Access to the spawned [`PrewarmCacheTask`].
903///
904/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
905/// prewarm task without cloning the expensive `BundleState`.
906#[derive(Debug)]
907pub struct CacheTaskHandle<R> {
908    /// The shared cache the task operates with.
909    saved_cache: Option<SavedCache>,
910    /// Channel to the spawned prewarm task if any
911    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
912    /// Shared counter tracking the next transaction index to be executed by the main execution
913    /// loop. Prewarm workers skip transactions below this index.
914    executed_tx_index: Arc<AtomicUsize>,
915}
916
917impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
918    /// Terminates the pre-warming transaction processing.
919    ///
920    /// Note: This does not terminate the task yet.
921    pub fn stop_prewarming_execution(&self) {
922        self.to_prewarm_task
923            .as_ref()
924            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
925    }
926
927    /// Terminates the entire pre-warming task.
928    ///
929    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
930    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
931    #[must_use = "sender must be used and notified on block validation success"]
932    pub fn terminate_caching(
933        &mut self,
934        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
935    ) -> Option<mpsc::Sender<()>> {
936        if let Some(tx) = self.to_prewarm_task.take() {
937            let (valid_block_tx, valid_block_rx) = mpsc::channel();
938            let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
939            let _ = tx.send(event);
940
941            Some(valid_block_tx)
942        } else {
943            None
944        }
945    }
946}
947
948impl<R> Drop for CacheTaskHandle<R> {
949    fn drop(&mut self) {
950        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
951        if let Some(tx) = self.to_prewarm_task.take() {
952            let _ = tx.send(PrewarmTaskEvent::Terminate {
953                execution_outcome: None,
954                valid_block_rx: mpsc::channel().1,
955            });
956        }
957    }
958}
959
960/// EVM context required to execute a block.
961#[derive(Debug, Clone)]
962pub struct ExecutionEnv<Evm: ConfigureEvm> {
963    /// Evm environment.
964    pub evm_env: EvmEnvFor<Evm>,
965    /// Hash of the block being executed.
966    pub hash: B256,
967    /// Hash of the parent block.
968    pub parent_hash: B256,
969    /// State root of the parent block.
970    /// Used for sparse trie continuation: if the preserved trie's anchor matches this,
971    /// the trie can be reused directly.
972    pub parent_state_root: B256,
973    /// Number of transactions in the block.
974    /// Used to determine parallel worker count for prewarming.
975    /// A value of 0 indicates the count is unknown.
976    pub transaction_count: usize,
977    /// Total gas used by all transactions in the block.
978    /// Used to adaptively select multiproof chunk size for optimal throughput.
979    pub gas_used: u64,
980    /// Withdrawals included in the block.
981    /// Used to generate prefetch targets for withdrawal addresses.
982    pub withdrawals: Option<Vec<Withdrawal>>,
983}
984
985impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
986where
987    EvmEnvFor<Evm>: Default,
988{
989    /// Creates a new [`ExecutionEnv`] with default values for testing.
990    #[cfg(any(test, feature = "test-utils"))]
991    pub fn test_default() -> Self {
992        Self {
993            evm_env: Default::default(),
994            hash: Default::default(),
995            parent_hash: Default::default(),
996            parent_state_root: Default::default(),
997            transaction_count: 0,
998            gas_used: 0,
999            withdrawals: None,
1000        }
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use crate::tree::{
1007        payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1008        precompile_cache::PrecompileCacheMap,
1009        CachedStateMetrics, ExecutionCache, PayloadExecutionCache, SavedCache,
1010        StateProviderBuilder, TreeConfig,
1011    };
1012    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1013    use alloy_evm::block::StateChangeSource;
1014    use rand::Rng;
1015    use reth_chainspec::ChainSpec;
1016    use reth_db_common::init::init_genesis;
1017    use reth_ethereum_primitives::TransactionSigned;
1018    use reth_evm::OnStateHook;
1019    use reth_evm_ethereum::EthEvmConfig;
1020    use reth_primitives_traits::{Account, Recovered, StorageEntry};
1021    use reth_provider::{
1022        providers::{BlockchainProvider, OverlayStateProviderFactory},
1023        test_utils::create_test_provider_factory_with_chain_spec,
1024        ChainSpecProvider, HashingWriter,
1025    };
1026    use reth_revm::db::BundleState;
1027    use reth_testing_utils::generators;
1028    use reth_trie::{test_utils::state_root, HashedPostState};
1029    use reth_trie_db::ChangesetCache;
1030    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1031    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1032    use std::sync::Arc;
1033
1034    fn make_saved_cache(hash: B256) -> SavedCache {
1035        let execution_cache = ExecutionCache::new(1_000);
1036        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
1037    }
1038
1039    #[test]
1040    fn execution_cache_allows_single_checkout() {
1041        let execution_cache = PayloadExecutionCache::default();
1042        let hash = B256::from([1u8; 32]);
1043
1044        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1045
1046        let first = execution_cache.get_cache_for(hash);
1047        assert!(first.is_some(), "expected initial checkout to succeed");
1048
1049        let second = execution_cache.get_cache_for(hash);
1050        assert!(second.is_none(), "second checkout should be blocked while guard is active");
1051
1052        drop(first);
1053
1054        let third = execution_cache.get_cache_for(hash);
1055        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1056    }
1057
1058    #[test]
1059    fn execution_cache_checkout_releases_on_drop() {
1060        let execution_cache = PayloadExecutionCache::default();
1061        let hash = B256::from([2u8; 32]);
1062
1063        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1064
1065        {
1066            let guard = execution_cache.get_cache_for(hash);
1067            assert!(guard.is_some(), "expected checkout to succeed");
1068            // Guard dropped at end of scope
1069        }
1070
1071        let retry = execution_cache.get_cache_for(hash);
1072        assert!(retry.is_some(), "checkout should succeed after guard drop");
1073    }
1074
1075    #[test]
1076    fn execution_cache_mismatch_parent_clears_and_returns() {
1077        let execution_cache = PayloadExecutionCache::default();
1078        let hash = B256::from([3u8; 32]);
1079
1080        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1081
1082        // When the parent hash doesn't match (fork block), the cache is cleared,
1083        // hash updated on the original, and clone returned for reuse
1084        let different_hash = B256::from([4u8; 32]);
1085        let cache = execution_cache.get_cache_for(different_hash);
1086        assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1087
1088        drop(cache);
1089
1090        // The stored cache now has the fork block's parent hash.
1091        // Canonical chain looking for original hash sees a mismatch → clears and reuses.
1092        let original = execution_cache.get_cache_for(hash);
1093        assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1094    }
1095
1096    #[test]
1097    fn execution_cache_update_after_release_succeeds() {
1098        let execution_cache = PayloadExecutionCache::default();
1099        let initial = B256::from([5u8; 32]);
1100
1101        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1102
1103        let guard =
1104            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1105
1106        drop(guard);
1107
1108        let updated = B256::from([6u8; 32]);
1109        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1110
1111        let new_checkout = execution_cache.get_cache_for(updated);
1112        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1113    }
1114
1115    #[test]
1116    fn on_inserted_executed_block_populates_cache() {
1117        let payload_processor = PayloadProcessor::new(
1118            reth_tasks::Runtime::test(),
1119            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1120            &TreeConfig::default(),
1121            PrecompileCacheMap::default(),
1122        );
1123
1124        let parent_hash = B256::from([1u8; 32]);
1125        let block_hash = B256::from([10u8; 32]);
1126        let block_with_parent = BlockWithParent {
1127            block: BlockNumHash { hash: block_hash, number: 1 },
1128            parent: parent_hash,
1129        };
1130        let bundle_state = BundleState::default();
1131
1132        // Cache should be empty initially
1133        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1134
1135        // Update cache with inserted block
1136        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1137
1138        // Cache should now exist for the block hash
1139        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1140        assert!(cached.is_some());
1141        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1142    }
1143
1144    #[test]
1145    fn on_inserted_executed_block_skips_on_parent_mismatch() {
1146        let payload_processor = PayloadProcessor::new(
1147            reth_tasks::Runtime::test(),
1148            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1149            &TreeConfig::default(),
1150            PrecompileCacheMap::default(),
1151        );
1152
1153        // Setup: populate cache with block 1
1154        let block1_hash = B256::from([1u8; 32]);
1155        payload_processor
1156            .execution_cache
1157            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1158
1159        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
1160        let wrong_parent = B256::from([99u8; 32]);
1161        let block3_hash = B256::from([3u8; 32]);
1162        let block_with_parent = BlockWithParent {
1163            block: BlockNumHash { hash: block3_hash, number: 3 },
1164            parent: wrong_parent,
1165        };
1166        let bundle_state = BundleState::default();
1167
1168        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1169
1170        // Cache should still be for block 1 (unchanged)
1171        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1172        assert!(cached.is_some(), "Original cache should be preserved");
1173
1174        // Cache for block 3 should not exist
1175        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1176        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1177    }
1178
1179    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1180        let mut rng = generators::rng();
1181        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1182        let mut updates = Vec::with_capacity(updates_per_account);
1183
1184        for _ in 0..updates_per_account {
1185            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1186            let mut state_update = EvmState::default();
1187
1188            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1189
1190            for &address in selected_addresses {
1191                let mut storage = HashMap::default();
1192                if rng.random_bool(0.7) {
1193                    for _ in 0..rng.random_range(1..10) {
1194                        let slot = U256::from(rng.random::<u64>());
1195                        storage.insert(
1196                            slot,
1197                            EvmStorageSlot::new_changed(
1198                                U256::ZERO,
1199                                U256::from(rng.random::<u64>()),
1200                                0,
1201                            ),
1202                        );
1203                    }
1204                }
1205
1206                let account = revm_state::Account {
1207                    info: AccountInfo {
1208                        balance: U256::from(rng.random::<u64>()),
1209                        nonce: rng.random::<u64>(),
1210                        code_hash: KECCAK_EMPTY,
1211                        code: Some(Default::default()),
1212                        account_id: None,
1213                    },
1214                    original_info: Box::new(AccountInfo::default()),
1215                    storage,
1216                    status: AccountStatus::Touched,
1217                    transaction_id: 0,
1218                };
1219
1220                state_update.insert(address, account);
1221            }
1222
1223            updates.push(state_update);
1224        }
1225
1226        updates
1227    }
1228
1229    #[test]
1230    fn test_state_root() {
1231        reth_tracing::init_test_tracing();
1232
1233        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1234        let genesis_hash = init_genesis(&factory).unwrap();
1235
1236        let state_updates = create_mock_state_updates(10, 10);
1237        let mut hashed_state = HashedPostState::default();
1238        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1239            HashMap::default();
1240
1241        {
1242            let provider_rw = factory.provider_rw().expect("failed to get provider");
1243
1244            for update in &state_updates {
1245                let account_updates = update.iter().map(|(address, account)| {
1246                    (*address, Some(Account::from_revm_account(account)))
1247                });
1248                provider_rw
1249                    .insert_account_for_hashing(account_updates)
1250                    .expect("failed to insert accounts");
1251
1252                let storage_updates = update.iter().map(|(address, account)| {
1253                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1254                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1255                    });
1256                    (*address, storage_entries)
1257                });
1258                provider_rw
1259                    .insert_storage_for_hashing(storage_updates)
1260                    .expect("failed to insert storage");
1261            }
1262            provider_rw.commit().expect("failed to commit changes");
1263        }
1264
1265        for update in &state_updates {
1266            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1267
1268            for (address, account) in update {
1269                let storage: HashMap<B256, U256> = account
1270                    .storage
1271                    .iter()
1272                    .map(|(k, v)| (B256::from(*k), v.present_value))
1273                    .collect();
1274
1275                let entry = accumulated_state.entry(*address).or_default();
1276                entry.0 = Account::from_revm_account(account);
1277                entry.1.extend(storage);
1278            }
1279        }
1280
1281        let mut payload_processor = PayloadProcessor::new(
1282            reth_tasks::Runtime::test(),
1283            EthEvmConfig::new(factory.chain_spec()),
1284            &TreeConfig::default(),
1285            PrecompileCacheMap::default(),
1286        );
1287
1288        let provider_factory = BlockchainProvider::new(factory).unwrap();
1289
1290        let mut handle = payload_processor.spawn(
1291            ExecutionEnv::test_default(),
1292            (
1293                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1294                std::convert::identity,
1295            ),
1296            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1297            OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1298            &TreeConfig::default(),
1299            None, // No BAL for test
1300        );
1301
1302        let mut state_hook = handle.state_hook().expect("state hook is None");
1303
1304        for (i, update) in state_updates.into_iter().enumerate() {
1305            state_hook.on_state(StateChangeSource::Transaction(i), &update);
1306        }
1307        drop(state_hook);
1308
1309        let root_from_task = handle.state_root().expect("task failed").state_root;
1310        let root_from_regular = state_root(accumulated_state);
1311
1312        assert_eq!(
1313            root_from_task, root_from_regular,
1314            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1315        );
1316    }
1317
1318    /// Tests the full prewarm lifecycle for a fork block:
1319    ///
1320    /// 1. Cache is at canonical block 4.
1321    /// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
1322    ///    `PrewarmCacheTask` does when it receives a `SavedCache`.
1323    /// 3. Prewarm populates the shared cache with fork-specific state.
1324    /// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
1325    /// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
1326    /// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
1327    ///    data.
1328    #[test]
1329    fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1330        let execution_cache = PayloadExecutionCache::default();
1331
1332        // Canonical chain at block 4.
1333        let block4_hash = B256::from([4u8; 32]);
1334        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1335
1336        // Fork block arrives with parent = block 2. Prewarm task checks out the cache.
1337        // This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
1338        let fork_parent = B256::from([2u8; 32]);
1339        let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1340        assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1341        let prewarm_cache = prewarm_cache.unwrap();
1342        assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1343
1344        // Prewarm populates cache with fork-specific state (ancestor data for block 2).
1345        // Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
1346        let fork_addr = Address::from([0xBB; 20]);
1347        let fork_key = B256::from([0xCC; 32]);
1348        prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1349
1350        // While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
1351        let during_prewarm = execution_cache.get_cache_for(block4_hash);
1352        assert!(
1353            during_prewarm.is_none(),
1354            "cache must be unavailable while prewarm holds a reference"
1355        );
1356
1357        // Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
1358        drop(prewarm_cache);
1359
1360        // Canonical block 5 arrives (parent = block 4).
1361        // Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
1362        // clears the stale fork data, and returns a cache with hash = block4_hash.
1363        let block5_cache = execution_cache.get_cache_for(block4_hash);
1364        assert!(
1365            block5_cache.is_some(),
1366            "canonical chain must get cache after fork prewarm is dropped"
1367        );
1368        assert_eq!(
1369            block5_cache.as_ref().unwrap().executed_block_hash(),
1370            block4_hash,
1371            "cache must carry the canonical parent hash, not the fork parent"
1372        );
1373    }
1374}