Skip to main content

reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6    sparse_trie::SparseTrieCacheTask,
7    CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
8    PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
9};
10use alloy_eip7928::BlockAccessList;
11use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
12use alloy_primitives::B256;
13use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
14use multiproof::*;
15use prewarm::PrewarmMetrics;
16use rayon::prelude::*;
17use reth_evm::{
18    block::ExecutableTxParts,
19    execute::{ExecutableTxFor, WithTxEnv},
20    ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
21    SpecFor, TxEnvFor,
22};
23use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
24use reth_provider::{
25    BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
26};
27use reth_revm::db::BundleState;
28use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
29use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
30use reth_trie_parallel::{
31    proof_task::{ProofTaskCtx, ProofWorkerHandle},
32    root::ParallelStateRootError,
33};
34use reth_trie_sparse::{
35    ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
36};
37use std::{
38    ops::Not,
39    sync::{
40        atomic::{AtomicBool, AtomicUsize},
41        mpsc::{self, channel},
42        Arc,
43    },
44};
45use tracing::{debug, debug_span, instrument, trace, warn, Span};
46
47pub mod multiproof;
48mod preserved_sparse_trie;
49pub mod prewarm;
50pub mod receipt_root_task;
51pub mod sparse_trie;
52
53use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
54
55/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
56/// nodes in allocated sparse tries.
57///
58/// Node maps have a key of `Nibbles` and value of `SparseNode`.
59/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
60///
61/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
62/// 120MB.
63pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
64
65/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
66/// in allocated sparse tries.
67///
68/// There are storage and account values, the largest of the two being account values, which are
69/// essentially `TrieAccount`s.
70///
71/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
72/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
73///
74/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
75/// 144MB.
76pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
77
78/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
79/// prewarm workers exceeds the execution time saved.
80pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
81
82/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
83type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
84    WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
85    <I as ExecutableTxTuple>::Error,
86    <N as NodePrimitives>::Receipt,
87>;
88
89/// Entrypoint for executing the payload.
90#[derive(Debug)]
91pub struct PayloadProcessor<Evm>
92where
93    Evm: ConfigureEvm,
94{
95    /// The executor used by to spawn tasks.
96    executor: Runtime,
97    /// The most recent cache used for execution.
98    execution_cache: PayloadExecutionCache,
99    /// Metrics for the execution cache.
100    cache_metrics: Option<CachedStateMetrics>,
101    /// Metrics for trie operations
102    trie_metrics: MultiProofTaskMetrics,
103    /// Cross-block cache size in bytes.
104    cross_block_cache_size: usize,
105    /// Whether transactions should not be executed on prewarming task.
106    disable_transaction_prewarming: bool,
107    /// Whether state cache should be disable
108    disable_state_cache: bool,
109    /// Determines how to configure the evm for execution.
110    evm_config: Evm,
111    /// Whether precompile cache should be disabled.
112    precompile_cache_disabled: bool,
113    /// Precompile cache map.
114    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
115    /// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
116    /// re-use allocated memory. Stored with the block hash it was computed for to enable trie
117    /// preservation across sequential payload validations.
118    sparse_state_trie: SharedPreservedSparseTrie,
119    /// LFU hot-slot capacity: max storage slots retained across prune cycles.
120    sparse_trie_max_hot_slots: usize,
121    /// LFU hot-account capacity: max account addresses retained across prune cycles.
122    sparse_trie_max_hot_accounts: usize,
123    /// Whether sparse trie cache pruning is fully disabled.
124    disable_sparse_trie_cache_pruning: bool,
125}
126
127impl<N, Evm> PayloadProcessor<Evm>
128where
129    N: NodePrimitives,
130    Evm: ConfigureEvm<Primitives = N>,
131{
132    /// Returns a reference to the workload executor driving payload tasks.
133    pub const fn executor(&self) -> &Runtime {
134        &self.executor
135    }
136
137    /// Creates a new payload processor.
138    pub fn new(
139        executor: Runtime,
140        evm_config: Evm,
141        config: &TreeConfig,
142        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
143    ) -> Self {
144        Self {
145            executor,
146            execution_cache: Default::default(),
147            trie_metrics: Default::default(),
148            cross_block_cache_size: config.cross_block_cache_size(),
149            disable_transaction_prewarming: config.disable_prewarming(),
150            evm_config,
151            disable_state_cache: config.disable_state_cache(),
152            precompile_cache_disabled: config.precompile_cache_disabled(),
153            precompile_cache_map,
154            sparse_state_trie: SharedPreservedSparseTrie::default(),
155            sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
156            sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
157            disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
158            cache_metrics: (!config.disable_cache_metrics())
159                .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
160        }
161    }
162}
163
164impl<Evm> WaitForCaches for PayloadProcessor<Evm>
165where
166    Evm: ConfigureEvm,
167{
168    fn wait_for_caches(&self) -> CacheWaitDurations {
169        debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
170
171        // Wait for both caches in parallel using std threads
172        let execution_cache = self.execution_cache.clone();
173        let sparse_trie = self.sparse_state_trie.clone();
174
175        // Use channels and spawn_blocking instead of std::thread::spawn
176        let (execution_tx, execution_rx) = std::sync::mpsc::channel();
177        let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
178
179        self.executor.spawn_blocking_named("wait-exec-cache", move || {
180            let _ = execution_tx.send(execution_cache.wait_for_availability());
181        });
182        self.executor.spawn_blocking_named("wait-sparse-tri", move || {
183            let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
184        });
185
186        let execution_cache_duration =
187            execution_rx.recv().expect("execution cache wait task failed to send result");
188        let sparse_trie_duration =
189            sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
190
191        debug!(
192            target: "engine::tree::payload_processor",
193            ?execution_cache_duration,
194            ?sparse_trie_duration,
195            "Execution cache and sparse trie locks acquired"
196        );
197        CacheWaitDurations {
198            execution_cache: execution_cache_duration,
199            sparse_trie: sparse_trie_duration,
200        }
201    }
202}
203
204impl<N, Evm> PayloadProcessor<Evm>
205where
206    N: NodePrimitives,
207    Evm: ConfigureEvm<Primitives = N> + 'static,
208{
209    /// Spawns all background tasks and returns a handle connected to the tasks.
210    ///
211    /// - Transaction prewarming task
212    /// - State root task
213    /// - Sparse trie task
214    ///
215    /// # Transaction prewarming task
216    ///
217    /// Responsible for feeding state updates to the sparse trie task.
218    ///
219    /// This task runs until:
220    ///  - externally cancelled (e.g. sequential block execution is complete)
221    ///
222    /// ## Sparse trie task
223    ///
224    /// Responsible for calculating the state root.
225    ///
226    /// This task runs until there are no further updates to process.
227    ///
228    ///
229    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
230    /// canceling)
231    #[instrument(
232        level = "debug",
233        target = "engine::tree::payload_processor",
234        name = "payload processor",
235        skip_all
236    )]
237    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
238        &mut self,
239        env: ExecutionEnv<Evm>,
240        transactions: I,
241        provider_builder: StateProviderBuilder<N, P>,
242        multiproof_provider_factory: F,
243        config: &TreeConfig,
244        bal: Option<Arc<BlockAccessList>>,
245    ) -> IteratorPayloadHandle<Evm, I, N>
246    where
247        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
248        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
249            + Clone
250            + Send
251            + Sync
252            + 'static,
253    {
254        // start preparing transactions immediately
255        let (prewarm_rx, execution_rx) =
256            self.spawn_tx_iterator(transactions, env.transaction_count);
257
258        let span = Span::current();
259
260        let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
261        let state_root_handle = self.spawn_state_root(
262            multiproof_provider_factory,
263            env.parent_state_root,
264            halve_workers,
265            config,
266        );
267        let install_state_hook = bal.is_none();
268        let prewarm_handle = self.spawn_caching_with(
269            env,
270            prewarm_rx,
271            provider_builder,
272            Some(state_root_handle.updates_tx().clone()),
273            bal,
274        );
275
276        PayloadHandle {
277            state_root_handle: Some(state_root_handle),
278            install_state_hook,
279            prewarm_handle,
280            transactions: execution_rx,
281            _span: span,
282        }
283    }
284
285    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
286    ///
287    /// Returns a [`PayloadHandle`] to communicate with the task.
288    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
289    pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
290        &self,
291        env: ExecutionEnv<Evm>,
292        transactions: I,
293        provider_builder: StateProviderBuilder<N, P>,
294        bal: Option<Arc<BlockAccessList>>,
295    ) -> IteratorPayloadHandle<Evm, I, N>
296    where
297        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
298    {
299        let (prewarm_rx, execution_rx) =
300            self.spawn_tx_iterator(transactions, env.transaction_count);
301        let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
302        PayloadHandle {
303            state_root_handle: None,
304            install_state_hook: false,
305            prewarm_handle,
306            transactions: execution_rx,
307            _span: Span::current(),
308        }
309    }
310
311    /// Spawns state root computation pipeline (multiproof + sparse trie tasks).
312    ///
313    /// The returned [`StateRootHandle`] provides:
314    /// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
315    ///   execution.
316    /// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
317    ///   state root.
318    ///
319    /// The state hook **must** be dropped after execution to signal the end of state updates.
320    ///
321    /// When `halve_workers` is true, the proof worker pool is halved (for small blocks where
322    /// fewer transactions produce fewer state changes and most workers would be idle).
323    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
324    pub fn spawn_state_root<F>(
325        &self,
326        multiproof_provider_factory: F,
327        parent_state_root: B256,
328        halve_workers: bool,
329        config: &TreeConfig,
330    ) -> StateRootHandle
331    where
332        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
333            + Clone
334            + Send
335            + Sync
336            + 'static,
337    {
338        let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
339
340        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
341        #[cfg(feature = "trie-debug")]
342        let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
343        let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
344
345        let (state_root_tx, state_root_rx) = channel();
346
347        self.spawn_sparse_trie_task(
348            proof_handle,
349            state_root_tx,
350            from_multi_proof,
351            parent_state_root,
352            config.multiproof_chunk_size(),
353        );
354
355        StateRootHandle::new(parent_state_root, updates_tx, state_root_rx)
356    }
357
358    /// Transaction count threshold below which proof workers are halved, since fewer transactions
359    /// produce fewer state changes and most workers would be idle overhead.
360    const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
361
362    /// Transaction count threshold below which sequential conversion is used.
363    ///
364    /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
365    /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
366    /// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
367    /// blocks.
368    const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
369
370    /// Number of leading transactions to convert sequentially before entering the rayon
371    /// parallel path.
372    ///
373    /// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
374    /// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
375    /// a small head sequentially and sending it immediately, execution can start without
376    /// waiting for rayon scheduling.
377    const PARALLEL_PREFETCH_COUNT: usize = 4;
378
379    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
380    ///
381    /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
382    /// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
383    /// iteration with [`ForEachOrdered`] to convert transactions in parallel while streaming
384    /// results to execution in the original transaction order.
385    #[expect(clippy::type_complexity)]
386    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
387    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
388        &self,
389        transactions: I,
390        transaction_count: usize,
391    ) -> (
392        mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
393        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
394    ) {
395        let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
396        let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
397
398        if transaction_count == 0 {
399            // Empty block — nothing to do.
400        } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
401            // Sequential path for small blocks — avoids rayon work-stealing setup and
402            // channel-based reorder overhead when it costs more than sequential conversion.
403            debug!(
404                target: "engine::tree::payload_processor",
405                transaction_count,
406                "using sequential sig recovery for small block"
407            );
408            self.executor.spawn_blocking_named("tx-iterator", move || {
409                let (transactions, convert) = transactions.into_parts();
410                convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
411            });
412        } else {
413            // Parallel path — recover signatures in parallel on rayon, stream results
414            // to execution in order via `for_each_ordered`.
415            //
416            // To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
417            // few transactions are recovered sequentially and sent immediately before
418            // entering the parallel iterator for the remainder.
419            let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
420            let executor = self.executor.clone();
421            self.executor.spawn_blocking_named("tx-iterator", move || {
422                let (transactions, convert) = transactions.into_parts();
423                let mut all: Vec<_> = transactions.into_iter().collect();
424                let rest = all.split_off(prefetch.min(all.len()));
425
426                // Convert the first few transactions sequentially so execution can
427                // start immediately without waiting for rayon work-stealing.
428                convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
429
430                // Convert the remaining transactions in parallel.
431                rest.into_par_iter()
432                    .enumerate()
433                    .map(|(i, tx)| {
434                        let idx = i + prefetch;
435                        let tx = convert.convert(tx);
436                        (idx, tx)
437                    })
438                    .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
439                        let tx = tx.map(|tx| {
440                            let (tx_env, tx) = tx.into_parts();
441                            let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
442                            let _ = prewarm_tx.send((idx, tx.clone()));
443                            tx
444                        });
445                        let _ = execute_tx.send(tx);
446                        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
447                        });
448                        });
449        }
450
451        (prewarm_rx, execute_rx)
452    }
453
454    /// Spawn prewarming optionally wired to the sparse trie task for target updates.
455    #[instrument(
456        level = "debug",
457        target = "engine::tree::payload_processor",
458        skip_all,
459        fields(bal=%bal.is_some())
460    )]
461    fn spawn_caching_with<P>(
462        &self,
463        env: ExecutionEnv<Evm>,
464        transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
465        provider_builder: StateProviderBuilder<N, P>,
466        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
467        bal: Option<Arc<BlockAccessList>>,
468    ) -> CacheTaskHandle<N::Receipt>
469    where
470        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
471    {
472        let skip_prewarm =
473            self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
474
475        let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
476
477        let executed_tx_index = Arc::new(AtomicUsize::new(0));
478
479        // configure prewarming
480        let prewarm_ctx = PrewarmContext {
481            env,
482            evm_config: self.evm_config.clone(),
483            saved_cache: saved_cache.clone(),
484            provider: provider_builder,
485            metrics: PrewarmMetrics::default(),
486            cache_metrics: self.cache_metrics.clone(),
487            terminate_execution: Arc::new(AtomicBool::new(false)),
488            executed_tx_index: Arc::clone(&executed_tx_index),
489            precompile_cache_disabled: self.precompile_cache_disabled,
490            precompile_cache_map: self.precompile_cache_map.clone(),
491        };
492
493        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
494            self.executor.clone(),
495            self.execution_cache.clone(),
496            prewarm_ctx,
497            to_sparse_trie_task,
498        );
499
500        {
501            let to_prewarm_task = to_prewarm_task.clone();
502            self.executor.spawn_blocking_named("prewarm", move || {
503                let mode = if skip_prewarm {
504                    PrewarmMode::Skipped
505                } else if let Some(bal) = bal {
506                    PrewarmMode::BlockAccessList(bal)
507                } else {
508                    PrewarmMode::Transactions(transactions)
509                };
510                prewarm_task.run(mode, to_prewarm_task);
511            });
512        }
513
514        CacheTaskHandle {
515            saved_cache,
516            to_prewarm_task: Some(to_prewarm_task),
517            executed_tx_index,
518            cache_metrics: self.cache_metrics.clone(),
519        }
520    }
521
522    /// Returns the cache for the given parent hash.
523    ///
524    /// If the given hash is different then what is recently cached, then this will create a new
525    /// instance.
526    #[instrument(level = "debug", target = "engine::caching", skip(self))]
527    pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
528        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
529            debug!("reusing execution cache");
530            cache
531        } else {
532            debug!("creating new execution cache on cache miss");
533            let start = Instant::now();
534            let cache = ExecutionCache::new(self.cross_block_cache_size);
535            if let Some(metrics) = &self.cache_metrics {
536                metrics.record_cache_creation(start.elapsed());
537            }
538            SavedCache::new(parent_hash, cache)
539        }
540    }
541
542    /// Spawns the [`SparseTrieCacheTask`] for this payload processor.
543    ///
544    /// The trie is preserved when the new payload is a child of the previous one.
545    fn spawn_sparse_trie_task(
546        &self,
547        proof_worker_handle: ProofWorkerHandle,
548        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
549        from_multi_proof: CrossbeamReceiver<StateRootMessage>,
550        parent_state_root: B256,
551        chunk_size: usize,
552    ) {
553        let preserved_sparse_trie = self.sparse_state_trie.clone();
554        let trie_metrics = self.trie_metrics.clone();
555        let max_hot_slots = self.sparse_trie_max_hot_slots;
556        let max_hot_accounts = self.sparse_trie_max_hot_accounts;
557        let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
558        let executor = self.executor.clone();
559
560        let parent_span = Span::current();
561        self.executor.spawn_blocking_named("sparse-trie", move || {
562            reth_tasks::once!(increase_thread_priority);
563
564            let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
565                .entered();
566
567            // Reuse a stored SparseStateTrie if available, applying continuation logic.
568            // If this payload's parent state root matches the preserved trie's anchor,
569            // we can reuse the pruned trie structure. Otherwise, we clear the trie but
570            // keep allocations.
571            let start = Instant::now();
572            let preserved = preserved_sparse_trie.take();
573            trie_metrics
574                .sparse_trie_cache_wait_duration_histogram
575                .record(start.elapsed().as_secs_f64());
576
577            let mut sparse_state_trie = preserved
578                .map(|preserved| preserved.into_trie_for(parent_state_root))
579                .unwrap_or_else(|| {
580                    debug!(
581                        target: "engine::tree::payload_processor",
582                        "Creating new sparse trie - no preserved trie available"
583                    );
584                    let default_trie = RevealableSparseTrie::blind_from(
585                        ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
586                    );
587                    SparseStateTrie::default()
588                        .with_accounts_trie(default_trie.clone())
589                        .with_default_storage_trie(default_trie)
590                        .with_updates(true)
591                });
592            sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
593
594            let mut task = SparseTrieCacheTask::new_with_trie(
595                &executor,
596                from_multi_proof,
597                proof_worker_handle,
598                trie_metrics.clone(),
599                sparse_state_trie,
600                chunk_size,
601            );
602
603            let result = task.run();
604
605            // Acquire the guard before sending the result to prevent a race condition:
606            // Without this, the next block could start after send() but before store(),
607            // causing take() to return None and forcing it to create a new empty trie
608            // instead of reusing the preserved one. Holding the guard ensures the next
609            // block's take() blocks until we've stored the trie for reuse.
610            let mut guard = preserved_sparse_trie.lock();
611
612            let task_result = result.as_ref().ok().cloned();
613            // Send state root computation result - next block may start but will block on take()
614            if state_root_tx.send(result).is_err() {
615                // Receiver dropped - payload was likely invalid or cancelled.
616                // Clear the trie instead of preserving potentially invalid state.
617                debug!(
618                    target: "engine::tree::payload_processor",
619                    "State root receiver dropped, clearing trie"
620                );
621                let (trie, deferred) = task.into_cleared_trie(
622                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
623                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
624                );
625                guard.store(PreservedSparseTrie::cleared(trie));
626                drop(guard);
627                executor.spawn_drop(deferred);
628                return;
629            }
630
631            // Only preserve the trie as anchored if computation succeeded.
632            // A failed computation may have left the trie in a partially updated state.
633            let _enter =
634                debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
635            let deferred = if let Some(result) = task_result {
636                let start = Instant::now();
637                let (trie, deferred) = task.into_trie_for_reuse(
638                    max_hot_slots,
639                    max_hot_accounts,
640                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
641                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
642                    disable_cache_pruning,
643                    &result.trie_updates,
644                );
645                trie_metrics
646                    .into_trie_for_reuse_duration_histogram
647                    .record(start.elapsed().as_secs_f64());
648                trie_metrics
649                    .sparse_trie_retained_memory_bytes
650                    .set(trie.memory_size() as f64);
651                trie_metrics
652                    .sparse_trie_retained_storage_tries
653                    .set(trie.retained_storage_tries_count() as f64);
654                guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
655                deferred
656            } else {
657                debug!(
658                    target: "engine::tree::payload_processor",
659                    "State root computation failed, clearing trie"
660                );
661                let (trie, deferred) = task.into_cleared_trie(
662                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
663                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
664                );
665                guard.store(PreservedSparseTrie::cleared(trie));
666                deferred
667            };
668            drop(guard);
669            executor.spawn_drop(deferred);
670        });
671    }
672
673    /// Updates the execution cache with the post-execution state from an inserted block.
674    ///
675    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
676    /// to ensure the cache remains warm for subsequent block execution.
677    ///
678    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
679    /// hitting the database, maintaining performance consistency.
680    pub fn on_inserted_executed_block(
681        &self,
682        block_with_parent: BlockWithParent,
683        bundle_state: &BundleState,
684    ) {
685        let cache_metrics = self.cache_metrics.clone();
686        self.execution_cache.update_with_guard(|cached| {
687            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
688                debug!(
689                    target: "engine::caching",
690                    parent_hash = %block_with_parent.parent,
691                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
692                );
693                return
694            }
695
696            // Take existing cache (if any) or create fresh caches
697            let caches = match cached.take() {
698                Some(existing) => existing.cache().clone(),
699                None => ExecutionCache::new(self.cross_block_cache_size),
700            };
701
702            // Insert the block's bundle state into cache
703            let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
704            if new_cache.cache().insert_state(bundle_state).is_err() {
705                *cached = None;
706                debug!(target: "engine::caching", "cleared execution cache on update error");
707                return
708            }
709            new_cache.update_metrics(cache_metrics.as_ref());
710
711            // Replace with the updated cache
712            *cached = Some(new_cache);
713            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
714        });
715    }
716}
717
718/// Converts transactions sequentially and sends them to the prewarm and execute channels.
719fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
720    iter: impl Iterator<Item = RawTx>,
721    convert: &C,
722    prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
723    execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
724) where
725    Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
726    TxEnv: Clone,
727    C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
728{
729    for (idx, raw_tx) in iter.enumerate() {
730        let tx = convert.convert(raw_tx);
731        let tx = tx.map(|tx| {
732            let (tx_env, tx) = tx.into_parts();
733            WithTxEnv { tx_env, tx: Arc::new(tx) }
734        });
735        if let Ok(tx) = &tx {
736            let _ = prewarm_tx.send((idx, tx.clone()));
737        }
738        let _ = execute_tx.send(tx);
739        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
740    }
741}
742
743/// Handle to all the spawned tasks.
744///
745/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
746/// caching task without cloning the expensive `BundleState`.
747#[derive(Debug)]
748pub struct PayloadHandle<Tx, Err, R> {
749    /// Handle to the background state root computation, if spawned.
750    state_root_handle: Option<StateRootHandle>,
751    /// Whether main execution should stream per-tx state updates into the sparse trie task.
752    install_state_hook: bool,
753    // must include the receiver of the state root wired to the sparse trie
754    prewarm_handle: CacheTaskHandle<R>,
755    /// Stream of block transactions
756    transactions: mpsc::Receiver<Result<Tx, Err>>,
757    /// Span for tracing
758    _span: Span,
759}
760
761impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
762    /// Awaits the state root
763    ///
764    /// # Panics
765    ///
766    /// If payload processing was started without background tasks.
767    #[instrument(
768        level = "debug",
769        target = "engine::tree::payload_processor",
770        name = "await_state_root",
771        skip_all
772    )]
773    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
774        self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
775    }
776
777    /// Takes the state root receiver out of the handle for use with custom waiting logic
778    /// (e.g., timeout-based waiting).
779    ///
780    /// # Panics
781    ///
782    /// If payload processing was started without background tasks.
783    pub const fn take_state_root_rx(
784        &mut self,
785    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
786        self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
787    }
788
789    /// Returns a state hook to stream execution state updates to the sparse trie cache task.
790    ///
791    /// Returns `None` when execution should not send state updates, such as BAL-driven execution.
792    pub fn state_hook(&self) -> Option<impl OnStateHook> {
793        self.install_state_hook
794            .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
795            .flatten()
796    }
797
798    /// Returns a clone of the caches used by prewarming
799    pub fn caches(&self) -> Option<ExecutionCache> {
800        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
801    }
802
803    /// Returns engine cache metrics if a cache exists for prewarming.
804    pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
805        self.prewarm_handle.cache_metrics.clone()
806    }
807
808    /// Returns a reference to the shared executed transaction index counter.
809    ///
810    /// The main execution loop should store `index + 1` after executing each transaction so that
811    /// prewarm workers can skip transactions that have already been processed.
812    pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
813        &self.prewarm_handle.executed_tx_index
814    }
815
816    /// Terminates the pre-warming transaction processing.
817    ///
818    /// Note: This does not terminate the task yet.
819    pub fn stop_prewarming_execution(&self) {
820        self.prewarm_handle.stop_prewarming_execution()
821    }
822
823    /// Terminates the entire caching task.
824    ///
825    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
826    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
827    /// path without cloning the expensive `BundleState`.
828    ///
829    /// Returns a sender for the channel that should be notified on block validation success.
830    pub fn terminate_caching(
831        &mut self,
832        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
833    ) -> Option<mpsc::Sender<()>> {
834        self.prewarm_handle.terminate_caching(execution_outcome)
835    }
836
837    /// Returns iterator yielding transactions from the stream.
838    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
839        self.transactions.iter()
840    }
841}
842
843/// Access to the spawned [`PrewarmCacheTask`].
844///
845/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
846/// prewarm task without cloning the expensive `BundleState`.
847#[derive(Debug)]
848pub struct CacheTaskHandle<R> {
849    /// The shared cache the task operates with.
850    saved_cache: Option<SavedCache>,
851    /// Channel to the spawned prewarm task if any
852    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
853    /// Shared counter tracking the next transaction index to be executed by the main execution
854    /// loop. Prewarm workers skip transactions below this index.
855    executed_tx_index: Arc<AtomicUsize>,
856    /// Metrics for the execution cache.
857    cache_metrics: Option<CachedStateMetrics>,
858}
859
860impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
861    /// Terminates the pre-warming transaction processing.
862    ///
863    /// Note: This does not terminate the task yet.
864    pub fn stop_prewarming_execution(&self) {
865        self.to_prewarm_task
866            .as_ref()
867            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
868    }
869
870    /// Terminates the entire pre-warming task.
871    ///
872    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
873    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
874    #[must_use = "sender must be used and notified on block validation success"]
875    pub fn terminate_caching(
876        &mut self,
877        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
878    ) -> Option<mpsc::Sender<()>> {
879        if let Some(tx) = self.to_prewarm_task.take() {
880            let (valid_block_tx, valid_block_rx) = mpsc::channel();
881            let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
882            let _ = tx.send(event);
883
884            Some(valid_block_tx)
885        } else {
886            None
887        }
888    }
889}
890
891impl<R> Drop for CacheTaskHandle<R> {
892    fn drop(&mut self) {
893        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
894        if let Some(tx) = self.to_prewarm_task.take() {
895            let _ = tx.send(PrewarmTaskEvent::Terminate {
896                execution_outcome: None,
897                valid_block_rx: mpsc::channel().1,
898            });
899        }
900    }
901}
902
903/// EVM context required to execute a block.
904#[derive(Debug, Clone)]
905pub struct ExecutionEnv<Evm: ConfigureEvm> {
906    /// Evm environment.
907    pub evm_env: EvmEnvFor<Evm>,
908    /// Hash of the block being executed.
909    pub hash: B256,
910    /// Hash of the parent block.
911    pub parent_hash: B256,
912    /// State root of the parent block.
913    /// Used for sparse trie continuation: if the preserved trie's anchor matches this,
914    /// the trie can be reused directly.
915    pub parent_state_root: B256,
916    /// Number of transactions in the block.
917    /// Used to determine parallel worker count for prewarming.
918    /// A value of 0 indicates the count is unknown.
919    pub transaction_count: usize,
920    /// Total gas used by all transactions in the block.
921    /// Used to adaptively select multiproof chunk size for optimal throughput.
922    pub gas_used: u64,
923    /// Withdrawals included in the block.
924    /// Used to generate prefetch targets for withdrawal addresses.
925    pub withdrawals: Option<Vec<Withdrawal>>,
926}
927
928impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
929where
930    EvmEnvFor<Evm>: Default,
931{
932    /// Creates a new [`ExecutionEnv`] with default values for testing.
933    #[cfg(any(test, feature = "test-utils"))]
934    pub fn test_default() -> Self {
935        Self {
936            evm_env: Default::default(),
937            hash: Default::default(),
938            parent_hash: Default::default(),
939            parent_state_root: Default::default(),
940            transaction_count: 0,
941            gas_used: 0,
942            withdrawals: None,
943        }
944    }
945}
946
947#[cfg(test)]
948mod tests {
949    use crate::tree::{
950        payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
951        precompile_cache::PrecompileCacheMap,
952        ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
953    };
954    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
955    use alloy_evm::block::StateChangeSource;
956    use rand::Rng;
957    use reth_chainspec::ChainSpec;
958    use reth_db_common::init::init_genesis;
959    use reth_ethereum_primitives::TransactionSigned;
960    use reth_evm::OnStateHook;
961    use reth_evm_ethereum::EthEvmConfig;
962    use reth_primitives_traits::{Account, Recovered, StorageEntry};
963    use reth_provider::{
964        providers::{BlockchainProvider, OverlayStateProviderFactory},
965        test_utils::create_test_provider_factory_with_chain_spec,
966        ChainSpecProvider, HashingWriter,
967    };
968    use reth_revm::db::BundleState;
969    use reth_testing_utils::generators;
970    use reth_trie::{test_utils::state_root, HashedPostState};
971    use reth_trie_db::ChangesetCache;
972    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
973    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
974    use std::sync::Arc;
975
976    fn make_saved_cache(hash: B256) -> SavedCache {
977        let execution_cache = ExecutionCache::new(1_000);
978        SavedCache::new(hash, execution_cache)
979    }
980
981    #[test]
982    fn execution_cache_allows_single_checkout() {
983        let execution_cache = PayloadExecutionCache::default();
984        let hash = B256::from([1u8; 32]);
985
986        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
987
988        let first = execution_cache.get_cache_for(hash);
989        assert!(first.is_some(), "expected initial checkout to succeed");
990
991        let second = execution_cache.get_cache_for(hash);
992        assert!(second.is_none(), "second checkout should be blocked while guard is active");
993
994        drop(first);
995
996        let third = execution_cache.get_cache_for(hash);
997        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
998    }
999
1000    #[test]
1001    fn execution_cache_checkout_releases_on_drop() {
1002        let execution_cache = PayloadExecutionCache::default();
1003        let hash = B256::from([2u8; 32]);
1004
1005        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1006
1007        {
1008            let guard = execution_cache.get_cache_for(hash);
1009            assert!(guard.is_some(), "expected checkout to succeed");
1010            // Guard dropped at end of scope
1011        }
1012
1013        let retry = execution_cache.get_cache_for(hash);
1014        assert!(retry.is_some(), "checkout should succeed after guard drop");
1015    }
1016
1017    #[test]
1018    fn execution_cache_mismatch_parent_clears_and_returns() {
1019        let execution_cache = PayloadExecutionCache::default();
1020        let hash = B256::from([3u8; 32]);
1021
1022        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1023
1024        // When the parent hash doesn't match (fork block), the cache is cleared,
1025        // hash updated on the original, and clone returned for reuse
1026        let different_hash = B256::from([4u8; 32]);
1027        let cache = execution_cache.get_cache_for(different_hash);
1028        assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1029
1030        drop(cache);
1031
1032        // The stored cache now has the fork block's parent hash.
1033        // Canonical chain looking for original hash sees a mismatch → clears and reuses.
1034        let original = execution_cache.get_cache_for(hash);
1035        assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1036    }
1037
1038    #[test]
1039    fn execution_cache_update_after_release_succeeds() {
1040        let execution_cache = PayloadExecutionCache::default();
1041        let initial = B256::from([5u8; 32]);
1042
1043        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1044
1045        let guard =
1046            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1047
1048        drop(guard);
1049
1050        let updated = B256::from([6u8; 32]);
1051        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1052
1053        let new_checkout = execution_cache.get_cache_for(updated);
1054        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1055    }
1056
1057    #[test]
1058    fn on_inserted_executed_block_populates_cache() {
1059        let payload_processor = PayloadProcessor::new(
1060            reth_tasks::Runtime::test(),
1061            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1062            &TreeConfig::default(),
1063            PrecompileCacheMap::default(),
1064        );
1065
1066        let parent_hash = B256::from([1u8; 32]);
1067        let block_hash = B256::from([10u8; 32]);
1068        let block_with_parent = BlockWithParent {
1069            block: BlockNumHash { hash: block_hash, number: 1 },
1070            parent: parent_hash,
1071        };
1072        let bundle_state = BundleState::default();
1073
1074        // Cache should be empty initially
1075        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1076
1077        // Update cache with inserted block
1078        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1079
1080        // Cache should now exist for the block hash
1081        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1082        assert!(cached.is_some());
1083        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1084    }
1085
1086    #[test]
1087    fn on_inserted_executed_block_skips_on_parent_mismatch() {
1088        let payload_processor = PayloadProcessor::new(
1089            reth_tasks::Runtime::test(),
1090            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1091            &TreeConfig::default(),
1092            PrecompileCacheMap::default(),
1093        );
1094
1095        // Setup: populate cache with block 1
1096        let block1_hash = B256::from([1u8; 32]);
1097        payload_processor
1098            .execution_cache
1099            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1100
1101        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
1102        let wrong_parent = B256::from([99u8; 32]);
1103        let block3_hash = B256::from([3u8; 32]);
1104        let block_with_parent = BlockWithParent {
1105            block: BlockNumHash { hash: block3_hash, number: 3 },
1106            parent: wrong_parent,
1107        };
1108        let bundle_state = BundleState::default();
1109
1110        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1111
1112        // Cache should still be for block 1 (unchanged)
1113        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1114        assert!(cached.is_some(), "Original cache should be preserved");
1115
1116        // Cache for block 3 should not exist
1117        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1118        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1119    }
1120
1121    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1122        let mut rng = generators::rng();
1123        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1124        let mut updates = Vec::with_capacity(updates_per_account);
1125
1126        for _ in 0..updates_per_account {
1127            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1128            let mut state_update = EvmState::default();
1129
1130            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1131
1132            for &address in selected_addresses {
1133                let mut storage = HashMap::default();
1134                if rng.random_bool(0.7) {
1135                    for _ in 0..rng.random_range(1..10) {
1136                        let slot = U256::from(rng.random::<u64>());
1137                        storage.insert(
1138                            slot,
1139                            EvmStorageSlot::new_changed(
1140                                U256::ZERO,
1141                                U256::from(rng.random::<u64>()),
1142                                0,
1143                            ),
1144                        );
1145                    }
1146                }
1147
1148                let account = revm_state::Account {
1149                    info: AccountInfo {
1150                        balance: U256::from(rng.random::<u64>()),
1151                        nonce: rng.random::<u64>(),
1152                        code_hash: KECCAK_EMPTY,
1153                        code: Some(Default::default()),
1154                        account_id: None,
1155                    },
1156                    original_info: Box::new(AccountInfo::default()),
1157                    storage,
1158                    status: AccountStatus::Touched,
1159                    transaction_id: 0,
1160                };
1161
1162                state_update.insert(address, account);
1163            }
1164
1165            updates.push(state_update);
1166        }
1167
1168        updates
1169    }
1170
1171    #[test]
1172    fn test_state_root() {
1173        reth_tracing::init_test_tracing();
1174
1175        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1176        let genesis_hash = init_genesis(&factory).unwrap();
1177
1178        let state_updates = create_mock_state_updates(10, 10);
1179        let mut hashed_state = HashedPostState::default();
1180        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1181            HashMap::default();
1182
1183        {
1184            let provider_rw = factory.provider_rw().expect("failed to get provider");
1185
1186            for update in &state_updates {
1187                let account_updates = update.iter().map(|(address, account)| {
1188                    (*address, Some(Account::from_revm_account(account)))
1189                });
1190                provider_rw
1191                    .insert_account_for_hashing(account_updates)
1192                    .expect("failed to insert accounts");
1193
1194                let storage_updates = update.iter().map(|(address, account)| {
1195                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1196                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1197                    });
1198                    (*address, storage_entries)
1199                });
1200                provider_rw
1201                    .insert_storage_for_hashing(storage_updates)
1202                    .expect("failed to insert storage");
1203            }
1204            provider_rw.commit().expect("failed to commit changes");
1205        }
1206
1207        for update in &state_updates {
1208            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1209
1210            for (address, account) in update {
1211                let storage: HashMap<B256, U256> = account
1212                    .storage
1213                    .iter()
1214                    .map(|(k, v)| (B256::from(*k), v.present_value))
1215                    .collect();
1216
1217                let entry = accumulated_state.entry(*address).or_default();
1218                entry.0 = Account::from_revm_account(account);
1219                entry.1.extend(storage);
1220            }
1221        }
1222
1223        let mut payload_processor = PayloadProcessor::new(
1224            reth_tasks::Runtime::test(),
1225            EthEvmConfig::new(factory.chain_spec()),
1226            &TreeConfig::default(),
1227            PrecompileCacheMap::default(),
1228        );
1229
1230        let provider_factory = BlockchainProvider::new(factory).unwrap();
1231
1232        let mut handle = payload_processor.spawn(
1233            ExecutionEnv::test_default(),
1234            (
1235                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1236                std::convert::identity,
1237            ),
1238            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1239            OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1240            &TreeConfig::default(),
1241            None, // No BAL for test
1242        );
1243
1244        let mut state_hook = handle.state_hook().expect("state hook is None");
1245
1246        for (i, update) in state_updates.into_iter().enumerate() {
1247            state_hook.on_state(StateChangeSource::Transaction(i), &update);
1248        }
1249        drop(state_hook);
1250
1251        let root_from_task = handle.state_root().expect("task failed").state_root;
1252        let root_from_regular = state_root(accumulated_state);
1253
1254        assert_eq!(
1255            root_from_task, root_from_regular,
1256            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1257        );
1258    }
1259
1260    /// Tests the full prewarm lifecycle for a fork block:
1261    ///
1262    /// 1. Cache is at canonical block 4.
1263    /// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
1264    ///    `PrewarmCacheTask` does when it receives a `SavedCache`.
1265    /// 3. Prewarm populates the shared cache with fork-specific state.
1266    /// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
1267    /// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
1268    /// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
1269    ///    data.
1270    #[test]
1271    fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1272        let execution_cache = PayloadExecutionCache::default();
1273
1274        // Canonical chain at block 4.
1275        let block4_hash = B256::from([4u8; 32]);
1276        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1277
1278        // Fork block arrives with parent = block 2. Prewarm task checks out the cache.
1279        // This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
1280        let fork_parent = B256::from([2u8; 32]);
1281        let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1282        assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1283        let prewarm_cache = prewarm_cache.unwrap();
1284        assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1285
1286        // Prewarm populates cache with fork-specific state (ancestor data for block 2).
1287        // Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
1288        let fork_addr = Address::from([0xBB; 20]);
1289        let fork_key = B256::from([0xCC; 32]);
1290        prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1291
1292        // While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
1293        let during_prewarm = execution_cache.get_cache_for(block4_hash);
1294        assert!(
1295            during_prewarm.is_none(),
1296            "cache must be unavailable while prewarm holds a reference"
1297        );
1298
1299        // Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
1300        drop(prewarm_cache);
1301
1302        // Canonical block 5 arrives (parent = block 4).
1303        // Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
1304        // clears the stale fork data, and returns a cache with hash = block4_hash.
1305        let block5_cache = execution_cache.get_cache_for(block4_hash);
1306        assert!(
1307            block5_cache.is_some(),
1308            "canonical chain must get cache after fork prewarm is dropped"
1309        );
1310        assert_eq!(
1311            block5_cache.as_ref().unwrap().executed_block_hash(),
1312            block4_hash,
1313            "cache must carry the canonical parent hash, not the fork parent"
1314        );
1315    }
1316}