Skip to main content

reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6    sparse_trie::SparseTrieCacheTask,
7    CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
8    PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
9};
10use alloy_eip7928::bal::DecodedBal;
11use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
12use alloy_primitives::B256;
13use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
14use multiproof::*;
15use prewarm::PrewarmMetrics;
16use rayon::prelude::*;
17use reth_evm::{
18    block::ExecutableTxParts,
19    execute::{ExecutableTxFor, WithTxEnv},
20    ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
21    SpecFor, TxEnvFor,
22};
23use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
24use reth_provider::{
25    BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
26};
27use reth_revm::db::BundleState;
28use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
29use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
30use reth_trie_parallel::{
31    proof_task::{ProofTaskCtx, ProofWorkerHandle},
32    root::ParallelStateRootError,
33};
34use reth_trie_sparse::{
35    ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
36};
37use std::{
38    ops::Not,
39    sync::{
40        atomic::{AtomicBool, AtomicUsize},
41        mpsc::{self, channel},
42        Arc,
43    },
44};
45use tracing::{debug, debug_span, instrument, trace, warn, Span};
46
47pub mod bal;
48pub mod multiproof;
49mod preserved_sparse_trie;
50pub mod prewarm;
51pub mod receipt_root_task;
52pub mod sparse_trie;
53
54use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
55
56/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
57/// nodes in allocated sparse tries.
58///
59/// Node maps have a key of `Nibbles` and value of `SparseNode`.
60/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
61///
62/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
63/// 120MB.
64pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
65
66/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
67/// in allocated sparse tries.
68///
69/// There are storage and account values, the largest of the two being account values, which are
70/// essentially `TrieAccount`s.
71///
72/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
73/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
74///
75/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
76/// 144MB.
77pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
78
79/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
80/// prewarm workers exceeds the execution time saved.
81pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
82
83/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
84type IteratorTx<Evm, I> = RecoveredTx<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
85
86type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
87    IteratorTx<Evm, I>,
88    <I as ExecutableTxTuple>::Error,
89    <N as NodePrimitives>::Receipt,
90>;
91
92type IteratorPrewarmTxReceiver<Evm, I> =
93    PrewarmTxReceiver<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
94
95type IteratorExecuteTxReceiver<Evm, I> = ExecuteTxReceiver<
96    TxEnvFor<Evm>,
97    <I as ExecutableTxIterator<Evm>>::Recovered,
98    <I as ExecutableTxTuple>::Error,
99>;
100
101type RecoveredTx<TxEnv, Recovered> = WithTxEnv<TxEnv, Recovered>;
102type IndexedTxResult<Tx, Err> = (usize, Result<Tx, Err>);
103type IndexedTxReceiver<Tx, Err> = CrossbeamReceiver<IndexedTxResult<Tx, Err>>;
104type IndexedTxSender<Tx, Err> = CrossbeamSender<IndexedTxResult<Tx, Err>>;
105type PrewarmTxReceiver<TxEnv, Recovered> = mpsc::Receiver<(usize, RecoveredTx<TxEnv, Recovered>)>;
106type ExecuteTxReceiver<TxEnv, Recovered, Err> =
107    IndexedTxReceiver<RecoveredTx<TxEnv, Recovered>, Err>;
108type ExecuteTxSender<TxEnv, Recovered, Err> = IndexedTxSender<RecoveredTx<TxEnv, Recovered>, Err>;
109
110/// Entrypoint for executing the payload.
111#[derive(Debug)]
112pub struct PayloadProcessor<Evm>
113where
114    Evm: ConfigureEvm,
115{
116    /// The executor used by to spawn tasks.
117    executor: Runtime,
118    /// The most recent cache used for execution.
119    execution_cache: PayloadExecutionCache,
120    /// Metrics for the execution cache.
121    cache_metrics: Option<CachedStateMetrics>,
122    /// Metrics for trie operations
123    trie_metrics: MultiProofTaskMetrics,
124    /// Cross-block cache size in bytes.
125    cross_block_cache_size: usize,
126    /// Whether transactions should not be executed on prewarming task.
127    disable_transaction_prewarming: bool,
128    /// Whether state cache should be disable
129    disable_state_cache: bool,
130    /// Determines how to configure the evm for execution.
131    evm_config: Evm,
132    /// Whether precompile cache should be disabled.
133    precompile_cache_disabled: bool,
134    /// Precompile cache map.
135    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
136    /// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
137    /// re-use allocated memory. Stored with the block hash it was computed for to enable trie
138    /// preservation across sequential payload validations.
139    sparse_state_trie: SharedPreservedSparseTrie,
140    /// LFU hot-slot capacity: max storage slots retained across prune cycles.
141    sparse_trie_max_hot_slots: usize,
142    /// LFU hot-account capacity: max account addresses retained across prune cycles.
143    sparse_trie_max_hot_accounts: usize,
144    /// Whether sparse trie cache pruning is fully disabled.
145    disable_sparse_trie_cache_pruning: bool,
146    /// Whether to disable BAL-driven parallel state root computation.
147    /// Only valid when BAL parallel execution is also disabled.
148    disable_bal_parallel_state_root: bool,
149    /// Whether BAL state prefetching during prewarm is disabled.
150    disable_bal_batch_io: bool,
151}
152
153impl<N, Evm> PayloadProcessor<Evm>
154where
155    N: NodePrimitives,
156    Evm: ConfigureEvm<Primitives = N>,
157{
158    /// Returns a reference to the workload executor driving payload tasks.
159    pub const fn executor(&self) -> &Runtime {
160        &self.executor
161    }
162
163    /// Creates a new payload processor.
164    pub fn new(
165        executor: Runtime,
166        evm_config: Evm,
167        config: &TreeConfig,
168        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
169    ) -> Self {
170        Self {
171            executor,
172            execution_cache: Default::default(),
173            trie_metrics: Default::default(),
174            cross_block_cache_size: config.cross_block_cache_size(),
175            disable_transaction_prewarming: config.disable_prewarming(),
176            evm_config,
177            disable_state_cache: config.disable_state_cache(),
178            precompile_cache_disabled: config.precompile_cache_disabled(),
179            precompile_cache_map,
180            sparse_state_trie: SharedPreservedSparseTrie::default(),
181            sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
182            sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
183            disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
184            cache_metrics: (!config.disable_cache_metrics())
185                .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
186            disable_bal_parallel_state_root: config.disable_bal_parallel_state_root(),
187            disable_bal_batch_io: config.disable_bal_batch_io(),
188        }
189    }
190}
191
192impl<Evm> WaitForCaches for PayloadProcessor<Evm>
193where
194    Evm: ConfigureEvm,
195{
196    fn wait_for_caches(&self) -> CacheWaitDurations {
197        debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
198
199        // Wait for both caches in parallel using std threads
200        let execution_cache = self.execution_cache.clone();
201        let sparse_trie = self.sparse_state_trie.clone();
202
203        // Use channels and spawn_blocking instead of std::thread::spawn
204        let (execution_tx, execution_rx) = std::sync::mpsc::channel();
205        let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
206
207        self.executor.spawn_blocking_named("wait-exec-cache", move || {
208            let _ = execution_tx.send(execution_cache.wait_for_availability());
209        });
210        self.executor.spawn_blocking_named("wait-sparse-tri", move || {
211            let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
212        });
213
214        let execution_cache_duration =
215            execution_rx.recv().expect("execution cache wait task failed to send result");
216        let sparse_trie_duration =
217            sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
218
219        debug!(
220            target: "engine::tree::payload_processor",
221            ?execution_cache_duration,
222            ?sparse_trie_duration,
223            "Execution cache and sparse trie locks acquired"
224        );
225        CacheWaitDurations {
226            execution_cache: execution_cache_duration,
227            sparse_trie: sparse_trie_duration,
228        }
229    }
230}
231
232impl<N, Evm> PayloadProcessor<Evm>
233where
234    N: NodePrimitives,
235    Evm: ConfigureEvm<Primitives = N> + 'static,
236{
237    /// Spawns all background tasks and returns a handle connected to the tasks.
238    ///
239    /// - Transaction prewarming task
240    /// - State root task
241    /// - Sparse trie task
242    ///
243    /// # Transaction prewarming task
244    ///
245    /// Responsible for feeding state updates to the sparse trie task.
246    ///
247    /// This task runs until:
248    ///  - externally cancelled (e.g. sequential block execution is complete)
249    ///
250    /// ## Sparse trie task
251    ///
252    /// Responsible for calculating the state root.
253    ///
254    /// This task runs until there are no further updates to process.
255    ///
256    ///
257    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
258    /// canceling)
259    #[instrument(
260        level = "debug",
261        target = "engine::tree::payload_processor",
262        name = "payload processor",
263        skip_all
264    )]
265    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
266        &mut self,
267        env: ExecutionEnv<Evm>,
268        transactions: I,
269        provider_builder: StateProviderBuilder<N, P>,
270        multiproof_provider_factory: F,
271        config: &TreeConfig,
272    ) -> IteratorPayloadHandle<Evm, I, N>
273    where
274        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
275        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
276            + Clone
277            + Send
278            + Sync
279            + 'static,
280    {
281        // start preparing transactions immediately
282        let (prewarm_rx, execution_rx) =
283            self.spawn_tx_iterator(transactions, env.transaction_count);
284
285        let span = Span::current();
286
287        let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
288        let state_root_handle = self.spawn_state_root(
289            multiproof_provider_factory,
290            env.parent_state_root,
291            halve_workers,
292            config,
293        );
294        // BAL blocks only bypass the normal execution state hook when the parallel BAL executor
295        // consumes the BAL. If parallel BAL execution is disabled, or if state caching is
296        // disabled so the BAL executor cannot use a shared cache, treat the BAL as absent here so
297        // the block follows today's sequential execution and transaction-prewarm path.
298        //
299        // In the parallel BAL path, prewarm owns BAL-derived sparse-trie updates and optional
300        // BAL state prefetching. `disable_bal_batch_io` controls the prefetch half inside
301        // prewarm, not this dispatch decision.
302        let parallel_bal_execution = !config.disable_state_cache() &&
303            !config.disable_bal_parallel_execution() &&
304            env.decoded_bal.is_some();
305        let install_state_hook = !parallel_bal_execution;
306        let prewarm_handle = self.spawn_caching_with(
307            env,
308            prewarm_rx,
309            provider_builder,
310            Some(state_root_handle.updates_tx().clone()),
311            parallel_bal_execution,
312        );
313
314        PayloadHandle {
315            state_root_handle: Some(state_root_handle),
316            install_state_hook,
317            prewarm_handle,
318            transactions: execution_rx,
319            _span: span,
320        }
321    }
322
323    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
324    ///
325    /// Returns a [`PayloadHandle`] to communicate with the task.
326    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
327    pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
328        &self,
329        env: ExecutionEnv<Evm>,
330        transactions: I,
331        provider_builder: StateProviderBuilder<N, P>,
332    ) -> IteratorPayloadHandle<Evm, I, N>
333    where
334        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
335    {
336        let (prewarm_rx, execution_rx) =
337            self.spawn_tx_iterator(transactions, env.transaction_count);
338        let prewarm_handle =
339            self.spawn_caching_with(env, prewarm_rx, provider_builder, None, false);
340        PayloadHandle {
341            state_root_handle: None,
342            install_state_hook: false,
343            prewarm_handle,
344            transactions: execution_rx,
345            _span: Span::current(),
346        }
347    }
348
349    /// Spawns state root computation pipeline (multiproof + sparse trie tasks).
350    ///
351    /// The returned [`StateRootHandle`] provides:
352    /// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
353    ///   execution.
354    /// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
355    ///   state root.
356    ///
357    /// The state hook **must** be dropped after execution to signal the end of state updates.
358    ///
359    /// When `halve_workers` is true, the proof worker pool is halved (for small blocks where
360    /// fewer transactions produce fewer state changes and most workers would be idle).
361    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
362    pub fn spawn_state_root<F>(
363        &self,
364        multiproof_provider_factory: F,
365        parent_state_root: B256,
366        halve_workers: bool,
367        config: &TreeConfig,
368    ) -> StateRootHandle
369    where
370        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
371            + Clone
372            + Send
373            + Sync
374            + 'static,
375    {
376        let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
377
378        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
379        #[cfg(feature = "trie-debug")]
380        let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
381        let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
382
383        let (state_root_tx, state_root_rx) = channel();
384
385        self.spawn_sparse_trie_task(
386            proof_handle,
387            state_root_tx,
388            from_multi_proof,
389            parent_state_root,
390            config.multiproof_chunk_size(),
391        );
392
393        StateRootHandle::new(parent_state_root, updates_tx, state_root_rx)
394    }
395
396    /// Transaction count threshold below which proof workers are halved, since fewer transactions
397    /// produce fewer state changes and most workers would be idle overhead.
398    const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
399
400    /// Transaction count threshold below which sequential conversion is used.
401    ///
402    /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
403    /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
404    /// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
405    /// blocks.
406    const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
407
408    /// Number of leading transactions to convert sequentially before entering the rayon
409    /// parallel path.
410    ///
411    /// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
412    /// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
413    /// a small head sequentially and sending it immediately, execution can start without
414    /// waiting for rayon scheduling.
415    const PARALLEL_PREFETCH_COUNT: usize = 4;
416
417    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
418    ///
419    /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
420    /// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
421    /// iteration with [`ForEachOrdered`] to convert transactions in parallel while streaming
422    /// results to execution in the original transaction order.
423    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
424    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
425        &self,
426        transactions: I,
427        transaction_count: usize,
428    ) -> (IteratorPrewarmTxReceiver<Evm, I>, IteratorExecuteTxReceiver<Evm, I>) {
429        let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
430        let (execute_tx, execute_rx) = crossbeam_channel::bounded(transaction_count);
431
432        if transaction_count == 0 {
433            // Empty block — nothing to do.
434        } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
435            // Sequential path for small blocks — avoids rayon work-stealing setup and
436            // channel-based reorder overhead when it costs more than sequential conversion.
437            debug!(
438                target: "engine::tree::payload_processor",
439                transaction_count,
440                "using sequential sig recovery for small block"
441            );
442            self.executor.spawn_blocking_named("tx-iterator", move || {
443                let (transactions, convert) = transactions.into_parts();
444                convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
445            });
446        } else {
447            // Parallel path — recover signatures in parallel on rayon, stream results
448            // to execution in order via `for_each_ordered`.
449            //
450            // To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
451            // few transactions are recovered sequentially and sent immediately before
452            // entering the parallel iterator for the remainder.
453            let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
454            let executor = self.executor.clone();
455            self.executor.spawn_blocking_named("tx-iterator", move || {
456                let (transactions, convert) = transactions.into_parts();
457                let mut all: Vec<_> = transactions.into_iter().collect();
458                let rest = all.split_off(prefetch.min(all.len()));
459
460                // Convert the first few transactions sequentially so execution can
461                // start immediately without waiting for rayon work-stealing.
462                convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
463
464                // Convert the remaining transactions in parallel.
465                rest.into_par_iter()
466                    .enumerate()
467                    .map(|(i, tx)| {
468                        let idx = i + prefetch;
469                        let tx = convert.convert(tx);
470                        (idx, tx)
471                    })
472                    .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
473                        let tx = tx.map(|tx| {
474                            let (tx_env, tx) = tx.into_parts();
475                            let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
476                            let _ = prewarm_tx.send((idx, tx.clone()));
477                            tx
478                        });
479                        let _ = execute_tx.send((idx, tx));
480                        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
481                        });
482                        });
483        }
484
485        (prewarm_rx, execute_rx)
486    }
487
488    /// Spawn prewarming optionally wired to the sparse trie task for target updates.
489    ///
490    /// `parallel_bal_execution` is true when the BAL execute path will execute this block. In
491    /// that case prewarm runs in BAL mode: it streams BAL-derived sparse-trie updates and,
492    /// unless `disable_bal_batch_io` is set, prefetches BAL-declared state into the shared cache.
493    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
494    fn spawn_caching_with<P>(
495        &self,
496        env: ExecutionEnv<Evm>,
497        transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
498        provider_builder: StateProviderBuilder<N, P>,
499        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
500        parallel_bal_execution: bool,
501    ) -> CacheTaskHandle<N::Receipt>
502    where
503        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
504    {
505        let mode = if parallel_bal_execution {
506            PrewarmMode::BlockAccessList(
507                env.decoded_bal.clone().expect("BAL dispatch implies decoded BAL"),
508            )
509        } else if self.disable_transaction_prewarming ||
510            env.transaction_count < SMALL_BLOCK_TX_THRESHOLD
511        {
512            PrewarmMode::Skipped
513        } else {
514            PrewarmMode::Transactions(transactions)
515        };
516        let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
517
518        let executed_tx_index = Arc::new(AtomicUsize::new(0));
519        // configure prewarming
520        let prewarm_ctx = PrewarmContext {
521            env,
522            evm_config: self.evm_config.clone(),
523            saved_cache: saved_cache.clone(),
524            provider: provider_builder,
525            metrics: PrewarmMetrics::default(),
526            cache_metrics: self.cache_metrics.clone(),
527            terminate_execution: Arc::new(AtomicBool::new(false)),
528            executed_tx_index: Arc::clone(&executed_tx_index),
529            precompile_cache_disabled: self.precompile_cache_disabled,
530            precompile_cache_map: self.precompile_cache_map.clone(),
531            disable_bal_parallel_state_root: self.disable_bal_parallel_state_root,
532            disable_bal_batch_io: self.disable_bal_batch_io,
533        };
534
535        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
536            self.executor.clone(),
537            self.execution_cache.clone(),
538            prewarm_ctx,
539            to_sparse_trie_task,
540        );
541        {
542            let to_prewarm_task = to_prewarm_task.clone();
543            self.executor.spawn_blocking_named("prewarm", move || {
544                prewarm_task.run(mode, to_prewarm_task);
545            });
546        }
547
548        CacheTaskHandle {
549            saved_cache,
550            to_prewarm_task: Some(to_prewarm_task),
551            executed_tx_index,
552            cache_metrics: self.cache_metrics.clone(),
553        }
554    }
555
556    /// Returns the cache for the given parent hash.
557    ///
558    /// If the given hash is different then what is recently cached, then this will create a new
559    /// instance.
560    #[instrument(level = "debug", target = "engine::caching", skip(self))]
561    pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
562        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
563            debug!("reusing execution cache");
564            cache
565        } else {
566            debug!("creating new execution cache on cache miss");
567            let start = Instant::now();
568            let cache = ExecutionCache::new(self.cross_block_cache_size);
569            if let Some(metrics) = &self.cache_metrics {
570                metrics.record_cache_creation(start.elapsed());
571            }
572            SavedCache::new(parent_hash, cache)
573        }
574    }
575
576    /// Spawns the [`SparseTrieCacheTask`] for this payload processor.
577    ///
578    /// The trie is preserved when the new payload is a child of the previous one.
579    fn spawn_sparse_trie_task(
580        &self,
581        proof_worker_handle: ProofWorkerHandle,
582        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
583        from_multi_proof: CrossbeamReceiver<StateRootMessage>,
584        parent_state_root: B256,
585        chunk_size: usize,
586    ) {
587        let preserved_sparse_trie = self.sparse_state_trie.clone();
588        let trie_metrics = self.trie_metrics.clone();
589        let max_hot_slots = self.sparse_trie_max_hot_slots;
590        let max_hot_accounts = self.sparse_trie_max_hot_accounts;
591        let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
592        let executor = self.executor.clone();
593
594        let parent_span = Span::current();
595        self.executor.spawn_blocking_named("sparse-trie", move || {
596            reth_tasks::once!(increase_thread_priority);
597
598            let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
599                .entered();
600
601            // Reuse a stored SparseStateTrie if available, applying continuation logic.
602            // If this payload's parent state root matches the preserved trie's anchor,
603            // we can reuse the pruned trie structure. Otherwise, we clear the trie but
604            // keep allocations.
605            let start = Instant::now();
606            let preserved = preserved_sparse_trie.take();
607            trie_metrics
608                .sparse_trie_cache_wait_duration_histogram
609                .record(start.elapsed().as_secs_f64());
610
611            let mut sparse_state_trie = preserved
612                .map(|preserved| preserved.into_trie_for(parent_state_root))
613                .unwrap_or_else(|| {
614                    debug!(
615                        target: "engine::tree::payload_processor",
616                        "Creating new sparse trie - no preserved trie available"
617                    );
618                    let default_trie = RevealableSparseTrie::blind_from(
619                        ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
620                    );
621                    SparseStateTrie::default()
622                        .with_accounts_trie(default_trie.clone())
623                        .with_default_storage_trie(default_trie)
624                        .with_updates(true)
625                });
626            sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
627
628            let mut task = SparseTrieCacheTask::new_with_trie(
629                &executor,
630                from_multi_proof,
631                proof_worker_handle,
632                trie_metrics.clone(),
633                sparse_state_trie,
634                parent_state_root,
635                chunk_size,
636            );
637
638            let result = task.run();
639
640            // Acquire the guard before sending the result to prevent a race condition:
641            // Without this, the next block could start after send() but before store(),
642            // causing take() to return None and forcing it to create a new empty trie
643            // instead of reusing the preserved one. Holding the guard ensures the next
644            // block's take() blocks until we've stored the trie for reuse.
645            let mut guard = preserved_sparse_trie.lock();
646
647            let task_result = result.as_ref().ok().cloned();
648            // Send state root computation result - next block may start but will block on take()
649            if state_root_tx.send(result).is_err() {
650                // Receiver dropped - payload was likely invalid or cancelled.
651                // Clear the trie instead of preserving potentially invalid state.
652                debug!(
653                    target: "engine::tree::payload_processor",
654                    "State root receiver dropped, clearing trie"
655                );
656                let (trie, deferred) = task.into_cleared_trie(
657                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
658                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
659                );
660                guard.store(PreservedSparseTrie::cleared(trie));
661                drop(guard);
662                executor.spawn_drop(deferred);
663                return;
664            }
665
666            // Only preserve the trie as anchored if computation succeeded.
667            // A failed computation may have left the trie in a partially updated state.
668            let _enter =
669                debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
670            let deferred = if let Some(result) = task_result {
671                let start = Instant::now();
672                let (trie, deferred) = task.into_trie_for_reuse(
673                    max_hot_slots,
674                    max_hot_accounts,
675                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
676                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
677                    disable_cache_pruning,
678                    &result.trie_updates,
679                );
680                trie_metrics
681                    .into_trie_for_reuse_duration_histogram
682                    .record(start.elapsed().as_secs_f64());
683                trie_metrics
684                    .sparse_trie_retained_memory_bytes
685                    .set(trie.memory_size() as f64);
686                trie_metrics
687                    .sparse_trie_retained_storage_tries
688                    .set(trie.retained_storage_tries_count() as f64);
689                guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
690                deferred
691            } else {
692                debug!(
693                    target: "engine::tree::payload_processor",
694                    "State root computation failed, clearing trie"
695                );
696                let (trie, deferred) = task.into_cleared_trie(
697                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
698                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
699                );
700                guard.store(PreservedSparseTrie::cleared(trie));
701                deferred
702            };
703            drop(guard);
704            executor.spawn_drop(deferred);
705        });
706    }
707
708    /// Updates the execution cache with the post-execution state from an inserted block.
709    ///
710    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
711    /// to ensure the cache remains warm for subsequent block execution.
712    ///
713    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
714    /// hitting the database, maintaining performance consistency.
715    pub fn on_inserted_executed_block(
716        &self,
717        block_with_parent: BlockWithParent,
718        bundle_state: &BundleState,
719    ) {
720        let cache_metrics = self.cache_metrics.clone();
721        self.execution_cache.update_with_guard(|cached| {
722            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
723                debug!(
724                    target: "engine::caching",
725                    parent_hash = %block_with_parent.parent,
726                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
727                );
728                return
729            }
730
731            // Take existing cache (if any) or create fresh caches
732            let caches = match cached.take() {
733                Some(existing) => existing.cache().clone(),
734                None => ExecutionCache::new(self.cross_block_cache_size),
735            };
736
737            // Insert the block's bundle state into cache
738            let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
739            if new_cache.cache().insert_state(bundle_state).is_err() {
740                *cached = None;
741                debug!(target: "engine::caching", "cleared execution cache on update error");
742                return
743            }
744            new_cache.update_metrics(cache_metrics.as_ref());
745
746            // Replace with the updated cache
747            *cached = Some(new_cache);
748            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
749        });
750    }
751}
752
753/// Converts transactions sequentially and sends them to the prewarm and execute channels.
754fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
755    iter: impl Iterator<Item = RawTx>,
756    convert: &C,
757    prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
758    execute_tx: &ExecuteTxSender<TxEnv, Recovered, Err>,
759) where
760    Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
761    TxEnv: Clone,
762    C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
763{
764    for (idx, raw_tx) in iter.enumerate() {
765        let tx = convert.convert(raw_tx);
766        let tx = tx.map(|tx| {
767            let (tx_env, tx) = tx.into_parts();
768            WithTxEnv { tx_env, tx: Arc::new(tx) }
769        });
770        if let Ok(tx) = &tx {
771            let _ = prewarm_tx.send((idx, tx.clone()));
772        }
773        let _ = execute_tx.send((idx, tx));
774        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
775    }
776}
777
778/// Handle to all the spawned tasks.
779///
780/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
781/// caching task without cloning the expensive `BundleState`.
782#[derive(Debug)]
783pub struct PayloadHandle<Tx, Err, R> {
784    /// Handle to the background state root computation, if spawned.
785    state_root_handle: Option<StateRootHandle>,
786    /// Whether main execution should stream per-tx state updates into the sparse trie task.
787    install_state_hook: bool,
788    // must include the receiver of the state root wired to the sparse trie
789    prewarm_handle: CacheTaskHandle<R>,
790    /// Stream of block transactions and their indices in the block.
791    transactions: IndexedTxReceiver<Tx, Err>,
792    /// Span for tracing
793    _span: Span,
794}
795
796impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
797    /// Awaits the state root
798    ///
799    /// # Panics
800    ///
801    /// If payload processing was started without background tasks.
802    #[instrument(
803        level = "debug",
804        target = "engine::tree::payload_processor",
805        name = "await_state_root",
806        skip_all
807    )]
808    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
809        self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
810    }
811
812    /// Takes the state root receiver out of the handle for use with custom waiting logic
813    /// (e.g., timeout-based waiting).
814    ///
815    /// # Panics
816    ///
817    /// If payload processing was started without background tasks.
818    pub const fn take_state_root_rx(
819        &mut self,
820    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
821        self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
822    }
823
824    /// Returns a state hook to stream execution state updates to the sparse trie cache task.
825    ///
826    /// Returns `None` when BAL-driven hashed state streaming feeds the sparse trie task.
827    pub fn state_hook(&self) -> Option<impl OnStateHook> {
828        self.install_state_hook
829            .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
830            .flatten()
831    }
832
833    /// Returns a clone of the sender that streams updates into the sparse-trie task. The BAL
834    /// execute path uses this to spawn its own sparse-trie streaming task fed from the
835    /// snapshot.
836    pub fn sparse_trie_updates_tx(&self) -> Option<CrossbeamSender<StateRootMessage>> {
837        self.state_root_handle.as_ref().map(|handle| handle.updates_tx().clone())
838    }
839
840    /// Returns a clone of the caches used by prewarming
841    pub fn caches(&self) -> Option<ExecutionCache> {
842        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
843    }
844
845    /// Returns engine cache metrics if a cache exists for prewarming.
846    pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
847        self.prewarm_handle.cache_metrics.clone()
848    }
849
850    /// Returns a reference to the shared executed transaction index counter.
851    ///
852    /// The main execution loop should store `index + 1` after executing each transaction so that
853    /// prewarm workers can skip transactions that have already been processed.
854    pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
855        &self.prewarm_handle.executed_tx_index
856    }
857
858    /// Terminates the pre-warming transaction processing.
859    ///
860    /// Note: This does not terminate the task yet.
861    pub fn stop_prewarming_execution(&self) {
862        self.prewarm_handle.stop_prewarming_execution()
863    }
864
865    /// Terminates the entire caching task.
866    ///
867    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
868    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
869    /// path without cloning the expensive `BundleState`.
870    ///
871    /// Returns a sender for the channel that should be notified on block validation success.
872    pub fn terminate_caching(
873        &mut self,
874        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
875    ) -> Option<mpsc::Sender<()>> {
876        self.prewarm_handle.terminate_caching(execution_outcome)
877    }
878
879    /// Returns iterator yielding transactions from the stream.
880    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
881        self.transactions.iter().map(|(_, tx)| tx)
882    }
883
884    /// Returns a clone of the indexed transaction receiver.
885    pub fn clone_transaction_receiver(&self) -> IndexedTxReceiver<Tx, Err> {
886        self.transactions.clone()
887    }
888}
889
890/// Access to the spawned [`PrewarmCacheTask`].
891///
892/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
893/// prewarm task without cloning the expensive `BundleState`.
894#[derive(Debug)]
895pub struct CacheTaskHandle<R> {
896    /// The shared cache the task operates with.
897    saved_cache: Option<SavedCache>,
898    /// Channel to the spawned prewarm task if any
899    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
900    /// Shared counter tracking the next transaction index to be executed by the main execution
901    /// loop. Prewarm workers skip transactions below this index.
902    executed_tx_index: Arc<AtomicUsize>,
903    /// Metrics for the execution cache.
904    cache_metrics: Option<CachedStateMetrics>,
905}
906
907impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
908    /// Terminates the pre-warming transaction processing.
909    ///
910    /// Note: This does not terminate the task yet.
911    pub fn stop_prewarming_execution(&self) {
912        self.to_prewarm_task
913            .as_ref()
914            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
915    }
916
917    /// Terminates the entire pre-warming task.
918    ///
919    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
920    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
921    #[must_use = "sender must be used and notified on block validation success"]
922    pub fn terminate_caching(
923        &mut self,
924        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
925    ) -> Option<mpsc::Sender<()>> {
926        if let Some(tx) = self.to_prewarm_task.take() {
927            let (valid_block_tx, valid_block_rx) = mpsc::channel();
928            let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
929            let _ = tx.send(event);
930
931            Some(valid_block_tx)
932        } else {
933            None
934        }
935    }
936}
937
938impl<R> Drop for CacheTaskHandle<R> {
939    fn drop(&mut self) {
940        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
941        if let Some(tx) = self.to_prewarm_task.take() {
942            let _ = tx.send(PrewarmTaskEvent::Terminate {
943                execution_outcome: None,
944                valid_block_rx: mpsc::channel().1,
945            });
946        }
947    }
948}
949
950/// EVM context required to execute a block.
951#[derive(Debug, Clone)]
952pub struct ExecutionEnv<Evm: ConfigureEvm> {
953    /// Evm environment.
954    pub evm_env: EvmEnvFor<Evm>,
955    /// Hash of the block being executed.
956    pub hash: B256,
957    /// Hash of the parent block.
958    pub parent_hash: B256,
959    /// State root of the parent block.
960    /// Used for sparse trie continuation: if the preserved trie's anchor matches this,
961    /// the trie can be reused directly.
962    pub parent_state_root: B256,
963    /// Number of transactions in the block.
964    /// Used to determine parallel worker count for prewarming.
965    /// A value of 0 indicates the count is unknown.
966    pub transaction_count: usize,
967    /// Total gas used by all transactions in the block.
968    /// Used to adaptively select multiproof chunk size for optimal throughput.
969    pub gas_used: u64,
970    /// Withdrawals included in the block.
971    /// Used to generate prefetch targets for withdrawal addresses.
972    pub withdrawals: Option<Vec<Withdrawal>>,
973    /// Optional decoded BAL for the block.
974    /// Used to validate and optimize execution.
975    pub decoded_bal: Option<Arc<DecodedBal>>,
976}
977
978impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
979where
980    EvmEnvFor<Evm>: Default,
981{
982    /// Creates a new [`ExecutionEnv`] with default values for testing.
983    #[cfg(any(test, feature = "test-utils"))]
984    pub fn test_default() -> Self {
985        Self {
986            evm_env: Default::default(),
987            hash: Default::default(),
988            parent_hash: Default::default(),
989            parent_state_root: Default::default(),
990            transaction_count: 0,
991            gas_used: 0,
992            withdrawals: None,
993            decoded_bal: None,
994        }
995    }
996}
997
998#[cfg(test)]
999mod tests {
1000    use crate::tree::{
1001        payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1002        precompile_cache::PrecompileCacheMap,
1003        ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
1004    };
1005    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1006    use alloy_evm::block::StateChangeSource;
1007    use rand::Rng;
1008    use reth_chainspec::ChainSpec;
1009    use reth_db_common::init::init_genesis;
1010    use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
1011    use reth_evm::OnStateHook;
1012    use reth_evm_ethereum::EthEvmConfig;
1013    use reth_primitives_traits::{Account, Recovered, StorageEntry};
1014    use reth_provider::{
1015        providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
1016        test_utils::create_test_provider_factory_with_chain_spec,
1017        ChainSpecProvider, HashingWriter,
1018    };
1019    use reth_revm::db::BundleState;
1020    use reth_testing_utils::generators;
1021    use reth_trie::{test_utils::state_root, HashedPostState};
1022    use reth_trie_db::ChangesetCache;
1023    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1024    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1025    use std::sync::Arc;
1026
1027    fn make_saved_cache(hash: B256) -> SavedCache {
1028        let execution_cache = ExecutionCache::new(1_000);
1029        SavedCache::new(hash, execution_cache)
1030    }
1031
1032    #[test]
1033    fn execution_cache_allows_single_checkout() {
1034        let execution_cache = PayloadExecutionCache::default();
1035        let hash = B256::from([1u8; 32]);
1036
1037        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1038
1039        let first = execution_cache.get_cache_for(hash);
1040        assert!(first.is_some(), "expected initial checkout to succeed");
1041
1042        let second = execution_cache.get_cache_for(hash);
1043        assert!(second.is_none(), "second checkout should be blocked while guard is active");
1044
1045        drop(first);
1046
1047        let third = execution_cache.get_cache_for(hash);
1048        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1049    }
1050
1051    #[test]
1052    fn execution_cache_checkout_releases_on_drop() {
1053        let execution_cache = PayloadExecutionCache::default();
1054        let hash = B256::from([2u8; 32]);
1055
1056        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1057
1058        {
1059            let guard = execution_cache.get_cache_for(hash);
1060            assert!(guard.is_some(), "expected checkout to succeed");
1061            // Guard dropped at end of scope
1062        }
1063
1064        let retry = execution_cache.get_cache_for(hash);
1065        assert!(retry.is_some(), "checkout should succeed after guard drop");
1066    }
1067
1068    #[test]
1069    fn execution_cache_mismatch_parent_clears_and_returns() {
1070        let execution_cache = PayloadExecutionCache::default();
1071        let hash = B256::from([3u8; 32]);
1072
1073        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1074
1075        // When the parent hash doesn't match (fork block), the cache is cleared,
1076        // hash updated on the original, and clone returned for reuse
1077        let different_hash = B256::from([4u8; 32]);
1078        let cache = execution_cache.get_cache_for(different_hash);
1079        assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1080
1081        drop(cache);
1082
1083        // The stored cache now has the fork block's parent hash.
1084        // Canonical chain looking for original hash sees a mismatch → clears and reuses.
1085        let original = execution_cache.get_cache_for(hash);
1086        assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1087    }
1088
1089    #[test]
1090    fn execution_cache_update_after_release_succeeds() {
1091        let execution_cache = PayloadExecutionCache::default();
1092        let initial = B256::from([5u8; 32]);
1093
1094        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1095
1096        let guard =
1097            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1098
1099        drop(guard);
1100
1101        let updated = B256::from([6u8; 32]);
1102        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1103
1104        let new_checkout = execution_cache.get_cache_for(updated);
1105        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1106    }
1107
1108    #[test]
1109    fn on_inserted_executed_block_populates_cache() {
1110        let payload_processor = PayloadProcessor::new(
1111            reth_tasks::Runtime::test(),
1112            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1113            &TreeConfig::default(),
1114            PrecompileCacheMap::default(),
1115        );
1116
1117        let parent_hash = B256::from([1u8; 32]);
1118        let block_hash = B256::from([10u8; 32]);
1119        let block_with_parent = BlockWithParent {
1120            block: BlockNumHash { hash: block_hash, number: 1 },
1121            parent: parent_hash,
1122        };
1123        let bundle_state = BundleState::default();
1124
1125        // Cache should be empty initially
1126        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1127
1128        // Update cache with inserted block
1129        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1130
1131        // Cache should now exist for the block hash
1132        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1133        assert!(cached.is_some());
1134        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1135    }
1136
1137    #[test]
1138    fn on_inserted_executed_block_skips_on_parent_mismatch() {
1139        let payload_processor = PayloadProcessor::new(
1140            reth_tasks::Runtime::test(),
1141            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1142            &TreeConfig::default(),
1143            PrecompileCacheMap::default(),
1144        );
1145
1146        // Setup: populate cache with block 1
1147        let block1_hash = B256::from([1u8; 32]);
1148        payload_processor
1149            .execution_cache
1150            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1151
1152        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
1153        let wrong_parent = B256::from([99u8; 32]);
1154        let block3_hash = B256::from([3u8; 32]);
1155        let block_with_parent = BlockWithParent {
1156            block: BlockNumHash { hash: block3_hash, number: 3 },
1157            parent: wrong_parent,
1158        };
1159        let bundle_state = BundleState::default();
1160
1161        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1162
1163        // Cache should still be for block 1 (unchanged)
1164        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1165        assert!(cached.is_some(), "Original cache should be preserved");
1166
1167        // Cache for block 3 should not exist
1168        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1169        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1170    }
1171
1172    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1173        let mut rng = generators::rng();
1174        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1175        let mut updates = Vec::with_capacity(updates_per_account);
1176
1177        for _ in 0..updates_per_account {
1178            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1179            let mut state_update = EvmState::default();
1180
1181            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1182
1183            for &address in selected_addresses {
1184                let mut storage = HashMap::default();
1185                if rng.random_bool(0.7) {
1186                    for _ in 0..rng.random_range(1..10) {
1187                        let slot = U256::from(rng.random::<u64>());
1188                        storage.insert(
1189                            slot,
1190                            EvmStorageSlot::new_changed(
1191                                U256::ZERO,
1192                                U256::from(rng.random::<u64>()),
1193                                0,
1194                            ),
1195                        );
1196                    }
1197                }
1198
1199                let account = revm_state::Account {
1200                    info: AccountInfo {
1201                        balance: U256::from(rng.random::<u64>()),
1202                        nonce: rng.random::<u64>(),
1203                        code_hash: KECCAK_EMPTY,
1204                        code: Some(Default::default()),
1205                        account_id: None,
1206                    },
1207                    original_info: Box::new(AccountInfo::default()),
1208                    storage,
1209                    status: AccountStatus::Touched,
1210                    transaction_id: 0,
1211                };
1212
1213                state_update.insert(address, account);
1214            }
1215
1216            updates.push(state_update);
1217        }
1218
1219        updates
1220    }
1221
1222    #[test]
1223    fn test_state_root() {
1224        reth_tracing::init_test_tracing();
1225
1226        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1227        let genesis_hash = init_genesis(&factory).unwrap();
1228
1229        let state_updates = create_mock_state_updates(10, 10);
1230        let mut hashed_state = HashedPostState::default();
1231        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1232            HashMap::default();
1233
1234        {
1235            let provider_rw = factory.provider_rw().expect("failed to get provider");
1236
1237            for update in &state_updates {
1238                let account_updates = update.iter().map(|(address, account)| {
1239                    (*address, Some(Account::from_revm_account(account)))
1240                });
1241                provider_rw
1242                    .insert_account_for_hashing(account_updates)
1243                    .expect("failed to insert accounts");
1244
1245                let storage_updates = update.iter().map(|(address, account)| {
1246                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1247                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1248                    });
1249                    (*address, storage_entries)
1250                });
1251                provider_rw
1252                    .insert_storage_for_hashing(storage_updates)
1253                    .expect("failed to insert storage");
1254            }
1255            provider_rw.commit().expect("failed to commit changes");
1256        }
1257
1258        for update in &state_updates {
1259            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1260
1261            for (address, account) in update {
1262                let storage: HashMap<B256, U256> = account
1263                    .storage
1264                    .iter()
1265                    .map(|(k, v)| (B256::from(*k), v.present_value))
1266                    .collect();
1267
1268                let entry = accumulated_state.entry(*address).or_default();
1269                entry.0 = Account::from_revm_account(account);
1270                entry.1.extend(storage);
1271            }
1272        }
1273
1274        let mut payload_processor = PayloadProcessor::new(
1275            reth_tasks::Runtime::test(),
1276            EthEvmConfig::new(factory.chain_spec()),
1277            &TreeConfig::default(),
1278            PrecompileCacheMap::default(),
1279        );
1280
1281        let provider_factory = BlockchainProvider::new(factory).unwrap();
1282
1283        let mut handle = payload_processor.spawn(
1284            ExecutionEnv::test_default(),
1285            (
1286                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1287                std::convert::identity,
1288            ),
1289            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1290            OverlayStateProviderFactory::new(
1291                provider_factory,
1292                OverlayBuilder::<EthPrimitives>::new(genesis_hash, ChangesetCache::new()),
1293            ),
1294            &TreeConfig::default(),
1295        );
1296
1297        let mut state_hook = handle.state_hook().expect("state hook is None");
1298
1299        for (i, update) in state_updates.into_iter().enumerate() {
1300            state_hook.on_state(StateChangeSource::Transaction(i), &update);
1301        }
1302        drop(state_hook);
1303
1304        let root_from_task = handle.state_root().expect("task failed").state_root;
1305        let root_from_regular = state_root(accumulated_state);
1306
1307        assert_eq!(
1308            root_from_task, root_from_regular,
1309            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1310        );
1311    }
1312
1313    /// Tests the full prewarm lifecycle for a fork block:
1314    ///
1315    /// 1. Cache is at canonical block 4.
1316    /// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
1317    ///    `PrewarmCacheTask` does when it receives a `SavedCache`.
1318    /// 3. Prewarm populates the shared cache with fork-specific state.
1319    /// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
1320    /// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
1321    /// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
1322    ///    data.
1323    #[test]
1324    fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1325        let execution_cache = PayloadExecutionCache::default();
1326
1327        // Canonical chain at block 4.
1328        let block4_hash = B256::from([4u8; 32]);
1329        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1330
1331        // Fork block arrives with parent = block 2. Prewarm task checks out the cache.
1332        // This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
1333        let fork_parent = B256::from([2u8; 32]);
1334        let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1335        assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1336        let prewarm_cache = prewarm_cache.unwrap();
1337        assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1338
1339        // Prewarm populates cache with fork-specific state (ancestor data for block 2).
1340        // Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
1341        let fork_addr = Address::from([0xBB; 20]);
1342        let fork_key = B256::from([0xCC; 32]);
1343        prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1344
1345        // While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
1346        let during_prewarm = execution_cache.get_cache_for(block4_hash);
1347        assert!(
1348            during_prewarm.is_none(),
1349            "cache must be unavailable while prewarm holds a reference"
1350        );
1351
1352        // Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
1353        drop(prewarm_cache);
1354
1355        // Canonical block 5 arrives (parent = block 4).
1356        // Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
1357        // clears the stale fork data, and returns a cache with hash = block4_hash.
1358        let block5_cache = execution_cache.get_cache_for(block4_hash);
1359        assert!(
1360            block5_cache.is_some(),
1361            "canonical chain must get cache after fork prewarm is dropped"
1362        );
1363        assert_eq!(
1364            block5_cache.as_ref().unwrap().executed_block_hash(),
1365            block4_hash,
1366            "cache must carry the canonical parent hash, not the fork parent"
1367        );
1368    }
1369}