Skip to main content

reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6    sparse_trie::SparseTrieCacheTask,
7    CacheWaitDurations, CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource,
8    ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
9    WaitForCaches,
10};
11use alloy_eip7928::bal::DecodedBal;
12use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
13use alloy_primitives::B256;
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use multiproof::*;
16use prewarm::PrewarmMetrics;
17use rayon::prelude::*;
18use reth_evm::{
19    block::ExecutableTxParts,
20    execute::{ExecutableTxFor, WithTxEnv},
21    ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
22    SpecFor, TxEnvFor,
23};
24use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
25use reth_provider::{
26    BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
27};
28use reth_revm::db::BundleState;
29use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
30use reth_trie::{
31    hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory, HashedPostState,
32};
33use reth_trie_parallel::{
34    proof_task::{ProofTaskCtx, ProofWorkerHandle},
35    root::ParallelStateRootError,
36};
37use reth_trie_sparse::{
38    ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
39};
40use std::{
41    ops::Not,
42    sync::{
43        atomic::{AtomicBool, AtomicUsize},
44        mpsc::{self, channel},
45        Arc,
46    },
47};
48use tracing::{debug, debug_span, instrument, trace, warn, Span};
49
50pub mod bal;
51pub mod multiproof;
52mod preserved_sparse_trie;
53pub mod prewarm;
54pub mod receipt_root_task;
55pub mod sparse_trie;
56
57use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
58
59/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
60/// nodes in allocated sparse tries.
61///
62/// Node maps have a key of `Nibbles` and value of `SparseNode`.
63/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
64///
65/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
66/// 120MB.
67pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
68
69/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
70/// in allocated sparse tries.
71///
72/// There are storage and account values, the largest of the two being account values, which are
73/// essentially `TrieAccount`s.
74///
75/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
76/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
77///
78/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
79/// 144MB.
80pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
81
82/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
83/// prewarm workers exceeds the execution time saved.
84pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
85
86/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
87type IteratorTx<Evm, I> = RecoveredTx<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
88
89type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
90    IteratorTx<Evm, I>,
91    <I as ExecutableTxTuple>::Error,
92    <N as NodePrimitives>::Receipt,
93>;
94
95type IteratorPrewarmTxReceiver<Evm, I> =
96    PrewarmTxReceiver<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
97
98type IteratorExecuteTxReceiver<Evm, I> = ExecuteTxReceiver<
99    TxEnvFor<Evm>,
100    <I as ExecutableTxIterator<Evm>>::Recovered,
101    <I as ExecutableTxTuple>::Error,
102>;
103
104type RecoveredTx<TxEnv, Recovered> = WithTxEnv<TxEnv, Recovered>;
105type IndexedTxResult<Tx, Err> = (usize, Result<Tx, Err>);
106type IndexedTxReceiver<Tx, Err> = CrossbeamReceiver<IndexedTxResult<Tx, Err>>;
107type IndexedTxSender<Tx, Err> = CrossbeamSender<IndexedTxResult<Tx, Err>>;
108type PrewarmTxReceiver<TxEnv, Recovered> = mpsc::Receiver<(usize, RecoveredTx<TxEnv, Recovered>)>;
109type ExecuteTxReceiver<TxEnv, Recovered, Err> =
110    IndexedTxReceiver<RecoveredTx<TxEnv, Recovered>, Err>;
111type ExecuteTxSender<TxEnv, Recovered, Err> = IndexedTxSender<RecoveredTx<TxEnv, Recovered>, Err>;
112
113/// Entrypoint for executing the payload.
114#[derive(Debug)]
115pub struct PayloadProcessor<Evm>
116where
117    Evm: ConfigureEvm,
118{
119    /// The executor used by to spawn tasks.
120    executor: Runtime,
121    /// The most recent cache used for execution.
122    execution_cache: PayloadExecutionCache,
123    /// Metrics for the execution cache.
124    cache_metrics: Option<CachedStateMetrics>,
125    /// Metrics for shared execution cache state.
126    cache_state_metrics: Option<CachedStateCacheMetrics>,
127    /// Metrics for trie operations
128    trie_metrics: MultiProofTaskMetrics,
129    /// Cross-block cache size in bytes.
130    cross_block_cache_size: usize,
131    /// Whether transactions should not be executed on prewarming task.
132    disable_transaction_prewarming: bool,
133    /// Whether state cache should be disable
134    disable_state_cache: bool,
135    /// Determines how to configure the evm for execution.
136    evm_config: Evm,
137    /// Whether precompile cache should be disabled.
138    precompile_cache_disabled: bool,
139    /// Precompile cache map.
140    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
141    /// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
142    /// re-use allocated memory. Stored with the block hash it was computed for to enable trie
143    /// preservation across sequential payload validations.
144    sparse_state_trie: SharedPreservedSparseTrie,
145    /// LFU hot-slot capacity: max storage slots retained across prune cycles.
146    sparse_trie_max_hot_slots: usize,
147    /// LFU hot-account capacity: max account addresses retained across prune cycles.
148    sparse_trie_max_hot_accounts: usize,
149    /// Whether sparse trie cache pruning is fully disabled.
150    disable_sparse_trie_cache_pruning: bool,
151    /// Whether to disable BAL-driven parallel state root computation.
152    /// Only valid when BAL parallel execution is also disabled.
153    disable_bal_parallel_state_root: bool,
154    /// Whether BAL state prefetching during prewarm is disabled.
155    disable_bal_batch_io: bool,
156}
157
158impl<N, Evm> PayloadProcessor<Evm>
159where
160    N: NodePrimitives,
161    Evm: ConfigureEvm<Primitives = N>,
162{
163    /// Returns a reference to the workload executor driving payload tasks.
164    pub const fn executor(&self) -> &Runtime {
165        &self.executor
166    }
167
168    /// Creates a new payload processor.
169    pub fn new(
170        executor: Runtime,
171        evm_config: Evm,
172        config: &TreeConfig,
173        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
174    ) -> Self {
175        Self {
176            executor,
177            execution_cache: Default::default(),
178            trie_metrics: Default::default(),
179            cross_block_cache_size: config.cross_block_cache_size(),
180            disable_transaction_prewarming: config.disable_prewarming(),
181            evm_config,
182            disable_state_cache: config.disable_state_cache(),
183            precompile_cache_disabled: config.precompile_cache_disabled(),
184            precompile_cache_map,
185            sparse_state_trie: SharedPreservedSparseTrie::default(),
186            sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
187            sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
188            disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
189            cache_metrics: (!config.disable_cache_metrics())
190                .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
191            cache_state_metrics: (!config.disable_cache_metrics())
192                .then(CachedStateCacheMetrics::default),
193            disable_bal_parallel_state_root: config.disable_bal_parallel_state_root(),
194            disable_bal_batch_io: config.disable_bal_batch_io(),
195        }
196    }
197}
198
199impl<Evm> WaitForCaches for PayloadProcessor<Evm>
200where
201    Evm: ConfigureEvm,
202{
203    fn wait_for_caches(&self) -> CacheWaitDurations {
204        debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
205
206        // Wait for both caches in parallel using std threads
207        let execution_cache = self.execution_cache.clone();
208        let sparse_trie = self.sparse_state_trie.clone();
209
210        // Use channels and spawn_blocking instead of std::thread::spawn
211        let (execution_tx, execution_rx) = std::sync::mpsc::channel();
212        let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
213
214        self.executor.spawn_blocking_named("wait-exec-cache", move || {
215            let _ = execution_tx.send(execution_cache.wait_for_availability());
216        });
217        self.executor.spawn_blocking_named("wait-sparse-tri", move || {
218            let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
219        });
220
221        let execution_cache_duration =
222            execution_rx.recv().expect("execution cache wait task failed to send result");
223        let sparse_trie_duration =
224            sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
225
226        debug!(
227            target: "engine::tree::payload_processor",
228            ?execution_cache_duration,
229            ?sparse_trie_duration,
230            "Execution cache and sparse trie locks acquired"
231        );
232        CacheWaitDurations {
233            execution_cache: execution_cache_duration,
234            sparse_trie: sparse_trie_duration,
235        }
236    }
237}
238
239impl<N, Evm> PayloadProcessor<Evm>
240where
241    N: NodePrimitives,
242    Evm: ConfigureEvm<Primitives = N> + 'static,
243{
244    /// Spawns all background tasks and returns a handle connected to the tasks.
245    ///
246    /// - Transaction prewarming task
247    /// - State root task
248    /// - Sparse trie task
249    ///
250    /// # Transaction prewarming task
251    ///
252    /// Responsible for feeding state updates to the sparse trie task.
253    ///
254    /// This task runs until:
255    ///  - externally cancelled (e.g. sequential block execution is complete)
256    ///
257    /// ## Sparse trie task
258    ///
259    /// Responsible for calculating the state root.
260    ///
261    /// This task runs until there are no further updates to process.
262    ///
263    ///
264    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
265    /// canceling)
266    #[instrument(
267        level = "debug",
268        target = "engine::tree::payload_processor",
269        name = "payload processor",
270        skip_all
271    )]
272    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
273        &mut self,
274        env: ExecutionEnv<Evm>,
275        transactions: I,
276        provider_builder: StateProviderBuilder<N, P>,
277        multiproof_provider_factory: F,
278        config: &TreeConfig,
279        parallel_bal_execution: bool,
280    ) -> IteratorPayloadHandle<Evm, I, N>
281    where
282        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
283        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
284            + Clone
285            + Send
286            + Sync
287            + 'static,
288    {
289        // start preparing transactions immediately
290        let (prewarm_rx, execution_rx) =
291            self.spawn_tx_iterator(transactions, env.transaction_count);
292
293        let span = Span::current();
294
295        let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
296        let state_root_handle = self.spawn_state_root(
297            multiproof_provider_factory,
298            env.parent_state_root,
299            halve_workers,
300            config,
301        );
302        // BAL blocks only bypass the normal execution state hook when the validator decided that
303        // the parallel BAL executor will consume this block. If not, treat the BAL as absent here
304        // so the block follows today's sequential execution and transaction-prewarm path.
305        //
306        // In the parallel BAL path, prewarm owns BAL-derived sparse-trie updates and optional
307        // BAL state prefetching. State-cache disabled mode still uses the BAL executor, but
308        // `saved_cache` is absent below, so prewarm skips cache-backed state prefetching.
309        // `disable_bal_batch_io` controls the prefetch half when a cache exists.
310        let install_state_hook = !parallel_bal_execution;
311        let prewarm_handle = self.spawn_caching_with(
312            env,
313            prewarm_rx,
314            provider_builder,
315            Some(state_root_handle.updates_tx().clone()),
316            parallel_bal_execution,
317        );
318
319        PayloadHandle {
320            state_root_handle: Some(state_root_handle),
321            install_state_hook,
322            prewarm_handle,
323            transactions: execution_rx,
324            _span: span,
325        }
326    }
327
328    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
329    ///
330    /// Returns a [`PayloadHandle`] to communicate with the task.
331    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
332    pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
333        &self,
334        env: ExecutionEnv<Evm>,
335        transactions: I,
336        provider_builder: StateProviderBuilder<N, P>,
337    ) -> IteratorPayloadHandle<Evm, I, N>
338    where
339        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
340    {
341        let (prewarm_rx, execution_rx) =
342            self.spawn_tx_iterator(transactions, env.transaction_count);
343        let prewarm_handle =
344            self.spawn_caching_with(env, prewarm_rx, provider_builder, None, false);
345        PayloadHandle {
346            state_root_handle: None,
347            install_state_hook: false,
348            prewarm_handle,
349            transactions: execution_rx,
350            _span: Span::current(),
351        }
352    }
353
354    /// Spawns state root computation pipeline (multiproof + sparse trie tasks).
355    ///
356    /// The returned [`StateRootHandle`] provides:
357    /// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
358    ///   execution.
359    /// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
360    ///   state root.
361    ///
362    /// The state hook **must** be dropped after execution to signal the end of state updates.
363    ///
364    /// When `halve_workers` is true, the proof worker pool is halved (for small blocks where
365    /// fewer transactions produce fewer state changes and most workers would be idle).
366    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
367    pub fn spawn_state_root<F>(
368        &self,
369        multiproof_provider_factory: F,
370        parent_state_root: B256,
371        halve_workers: bool,
372        config: &TreeConfig,
373    ) -> StateRootHandle
374    where
375        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
376            + Clone
377            + Send
378            + Sync
379            + 'static,
380    {
381        let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
382
383        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
384        #[cfg(feature = "trie-debug")]
385        let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
386        let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
387
388        let (state_root_tx, state_root_rx) = channel();
389        let (hashed_state_tx, hashed_state_rx) = channel();
390
391        self.spawn_sparse_trie_task(
392            proof_handle,
393            state_root_tx,
394            hashed_state_tx,
395            from_multi_proof,
396            parent_state_root,
397            config.multiproof_chunk_size(),
398        );
399
400        StateRootHandle::new(parent_state_root, updates_tx, state_root_rx, hashed_state_rx)
401    }
402
403    /// Transaction count threshold below which proof workers are halved, since fewer transactions
404    /// produce fewer state changes and most workers would be idle overhead.
405    const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
406
407    /// Transaction count threshold below which sequential conversion is used.
408    ///
409    /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
410    /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
411    /// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
412    /// blocks.
413    const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
414
415    /// Number of leading transactions to convert sequentially before entering the rayon
416    /// parallel path.
417    ///
418    /// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
419    /// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
420    /// a small head sequentially and sending it immediately, execution can start without
421    /// waiting for rayon scheduling.
422    const PARALLEL_PREFETCH_COUNT: usize = 4;
423
424    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
425    ///
426    /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
427    /// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
428    /// iteration with [`ForEachOrdered`] to convert transactions in parallel while streaming
429    /// results to execution in the original transaction order.
430    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
431    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
432        &self,
433        transactions: I,
434        transaction_count: usize,
435    ) -> (IteratorPrewarmTxReceiver<Evm, I>, IteratorExecuteTxReceiver<Evm, I>) {
436        let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
437        let (execute_tx, execute_rx) = crossbeam_channel::bounded(transaction_count);
438
439        if transaction_count == 0 {
440            // Empty block — nothing to do.
441        } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
442            // Sequential path for small blocks — avoids rayon work-stealing setup and
443            // channel-based reorder overhead when it costs more than sequential conversion.
444            debug!(
445                target: "engine::tree::payload_processor",
446                transaction_count,
447                "using sequential sig recovery for small block"
448            );
449            self.executor.spawn_blocking_named("tx-iterator", move || {
450                let (transactions, convert) = transactions.into_parts();
451                convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
452            });
453        } else {
454            // Parallel path — recover signatures in parallel on rayon, stream results
455            // to execution in order via `for_each_ordered`.
456            //
457            // To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
458            // few transactions are recovered sequentially and sent immediately before
459            // entering the parallel iterator for the remainder.
460            let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
461            let executor = self.executor.clone();
462            self.executor.spawn_blocking_named("tx-iterator", move || {
463                let (transactions, convert) = transactions.into_parts();
464                let mut all: Vec<_> = transactions.into_iter().collect();
465                let rest = all.split_off(prefetch.min(all.len()));
466
467                // Convert the first few transactions sequentially so execution can
468                // start immediately without waiting for rayon work-stealing.
469                convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
470
471                // Convert the remaining transactions in parallel.
472                rest.into_par_iter()
473                    .enumerate()
474                    .map(|(i, tx)| {
475                        let idx = i + prefetch;
476                        let tx = convert.convert(tx);
477                        (idx, tx)
478                    })
479                    .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
480                        let tx = tx.map(|tx| {
481                            let tx = WithTxEnv::new(tx);
482                            let _ = prewarm_tx.send((idx, tx.clone()));
483                            tx
484                        });
485                        let _ = execute_tx.send((idx, tx));
486                        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
487                        });
488                        });
489        }
490
491        (prewarm_rx, execute_rx)
492    }
493
494    /// Spawn prewarming optionally wired to the sparse trie task for target updates.
495    ///
496    /// `parallel_bal_execution` is true when the BAL execute path will execute this block. In
497    /// that case prewarm runs in BAL mode: it streams BAL-derived sparse-trie updates and,
498    /// unless `disable_bal_batch_io` is set, prefetches BAL-declared state into the shared cache.
499    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
500    fn spawn_caching_with<P>(
501        &self,
502        env: ExecutionEnv<Evm>,
503        transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
504        provider_builder: StateProviderBuilder<N, P>,
505        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
506        parallel_bal_execution: bool,
507    ) -> CacheTaskHandle<N::Receipt>
508    where
509        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
510    {
511        let mode = if parallel_bal_execution {
512            PrewarmMode::BlockAccessList(
513                env.decoded_bal.clone().expect("BAL dispatch implies decoded BAL"),
514            )
515        } else if self.disable_transaction_prewarming ||
516            env.transaction_count < SMALL_BLOCK_TX_THRESHOLD
517        {
518            PrewarmMode::Skipped
519        } else {
520            PrewarmMode::Transactions(transactions)
521        };
522        let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
523
524        let executed_tx_index = Arc::new(AtomicUsize::new(0));
525        // configure prewarming
526        let prewarm_ctx = PrewarmContext {
527            env,
528            evm_config: self.evm_config.clone(),
529            saved_cache: saved_cache.clone(),
530            provider: provider_builder,
531            metrics: PrewarmMetrics::default(),
532            cache_metrics: self.cache_metrics.clone(),
533            cache_state_metrics: self.cache_state_metrics.clone(),
534            terminate_execution: Arc::new(AtomicBool::new(false)),
535            executed_tx_index: Arc::clone(&executed_tx_index),
536            precompile_cache_disabled: self.precompile_cache_disabled,
537            precompile_cache_map: self.precompile_cache_map.clone(),
538            disable_bal_parallel_state_root: self.disable_bal_parallel_state_root,
539            disable_bal_batch_io: self.disable_bal_batch_io,
540        };
541
542        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
543            self.executor.clone(),
544            self.execution_cache.clone(),
545            prewarm_ctx,
546            to_sparse_trie_task,
547        );
548        {
549            let to_prewarm_task = to_prewarm_task.clone();
550            self.executor.spawn_blocking_named("prewarm", move || {
551                prewarm_task.run(mode, to_prewarm_task);
552            });
553        }
554
555        CacheTaskHandle {
556            saved_cache,
557            to_prewarm_task: Some(to_prewarm_task),
558            executed_tx_index,
559            cache_metrics: self.cache_metrics.clone(),
560        }
561    }
562
563    /// Returns the cache for the given parent hash.
564    ///
565    /// If the given hash is different then what is recently cached, then this will create a new
566    /// instance.
567    #[instrument(level = "debug", target = "engine::caching", skip(self))]
568    pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
569        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
570            debug!("reusing execution cache");
571            cache
572        } else {
573            debug!("creating new execution cache on cache miss");
574            let start = Instant::now();
575            let cache = ExecutionCache::new(self.cross_block_cache_size);
576            if let Some(metrics) = &self.cache_metrics {
577                metrics.record_cache_creation(start.elapsed());
578            }
579            SavedCache::new(parent_hash, cache)
580        }
581    }
582
583    /// Spawns the [`SparseTrieCacheTask`] for this payload processor.
584    ///
585    /// The trie is preserved when the new payload is a child of the previous one.
586    fn spawn_sparse_trie_task(
587        &self,
588        proof_worker_handle: ProofWorkerHandle,
589        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
590        hashed_state_tx: mpsc::Sender<HashedPostState>,
591        from_multi_proof: CrossbeamReceiver<StateRootMessage>,
592        parent_state_root: B256,
593        chunk_size: usize,
594    ) {
595        let preserved_sparse_trie = self.sparse_state_trie.clone();
596        let trie_metrics = self.trie_metrics.clone();
597        let max_hot_slots = self.sparse_trie_max_hot_slots;
598        let max_hot_accounts = self.sparse_trie_max_hot_accounts;
599        let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
600        let executor = self.executor.clone();
601
602        let parent_span = Span::current();
603        self.executor.spawn_blocking_named("sparse-trie", move || {
604            reth_tasks::once!(increase_thread_priority);
605
606            let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
607                .entered();
608
609            // Reuse a stored SparseStateTrie if available, applying continuation logic.
610            // If this payload's parent state root matches the preserved trie's anchor,
611            // we can reuse the pruned trie structure. Otherwise, we clear the trie but
612            // keep allocations.
613            let start = Instant::now();
614            let preserved = preserved_sparse_trie.take();
615            trie_metrics
616                .sparse_trie_cache_wait_duration_histogram
617                .record(start.elapsed().as_secs_f64());
618
619            let mut sparse_state_trie = preserved
620                .map(|preserved| preserved.into_trie_for(parent_state_root))
621                .unwrap_or_else(|| {
622                    debug!(
623                        target: "engine::tree::payload_processor",
624                        "Creating new sparse trie - no preserved trie available"
625                    );
626                    let default_trie = RevealableSparseTrie::blind_from(
627                        ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
628                    );
629                    SparseStateTrie::default()
630                        .with_accounts_trie(default_trie.clone())
631                        .with_default_storage_trie(default_trie)
632                        .with_updates(true)
633                });
634            sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
635
636            let mut task = SparseTrieCacheTask::new_with_trie(
637                &executor,
638                from_multi_proof,
639                hashed_state_tx,
640                proof_worker_handle,
641                trie_metrics.clone(),
642                sparse_state_trie,
643                parent_state_root,
644                chunk_size,
645            );
646
647            let result = task.run();
648
649            // Acquire the guard before sending the result to prevent a race condition:
650            // Without this, the next block could start after send() but before store(),
651            // causing take() to return None and forcing it to create a new empty trie
652            // instead of reusing the preserved one. Holding the guard ensures the next
653            // block's take() blocks until we've stored the trie for reuse.
654            let mut guard = preserved_sparse_trie.lock();
655
656            let task_result = result.as_ref().ok().cloned();
657            // Send state root computation result - next block may start but will block on take()
658            if state_root_tx.send(result).is_err() {
659                // Receiver dropped - payload was likely invalid or cancelled.
660                // Clear the trie instead of preserving potentially invalid state.
661                debug!(
662                    target: "engine::tree::payload_processor",
663                    "State root receiver dropped, clearing trie"
664                );
665                let (trie, deferred) = task.into_cleared_trie(
666                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
667                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
668                );
669                guard.store(PreservedSparseTrie::cleared(trie));
670                drop(guard);
671                executor.spawn_drop(deferred);
672                return;
673            }
674
675            // Only preserve the trie as anchored if computation succeeded.
676            // A failed computation may have left the trie in a partially updated state.
677            let _enter =
678                debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
679            let deferred = if let Some(result) = task_result {
680                let start = Instant::now();
681                let (trie, deferred) = task.into_trie_for_reuse(
682                    max_hot_slots,
683                    max_hot_accounts,
684                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
685                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
686                    disable_cache_pruning,
687                    &result.trie_updates,
688                );
689                trie_metrics
690                    .into_trie_for_reuse_duration_histogram
691                    .record(start.elapsed().as_secs_f64());
692                trie_metrics
693                    .sparse_trie_retained_memory_bytes
694                    .set(trie.memory_size() as f64);
695                trie_metrics
696                    .sparse_trie_retained_storage_tries
697                    .set(trie.retained_storage_tries_count() as f64);
698                guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
699                deferred
700            } else {
701                debug!(
702                    target: "engine::tree::payload_processor",
703                    "State root computation failed, clearing trie"
704                );
705                let (trie, deferred) = task.into_cleared_trie(
706                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
707                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
708                );
709                guard.store(PreservedSparseTrie::cleared(trie));
710                deferred
711            };
712            drop(guard);
713            executor.spawn_drop(deferred);
714        });
715    }
716
717    /// Updates the execution cache with the post-execution state from an inserted block.
718    ///
719    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
720    /// to ensure the cache remains warm for subsequent block execution.
721    ///
722    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
723    /// hitting the database, maintaining performance consistency.
724    pub fn on_inserted_executed_block(
725        &self,
726        block_with_parent: BlockWithParent,
727        bundle_state: &BundleState,
728    ) {
729        let cache_state_metrics = self.cache_state_metrics.clone();
730        self.execution_cache.update_with_guard(|cached| {
731            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
732                debug!(
733                    target: "engine::caching",
734                    parent_hash = %block_with_parent.parent,
735                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
736                );
737                return
738            }
739
740            if let Some(cache) = cached.as_ref().filter(|cache| !cache.is_available()) {
741                debug!(
742                    target: "engine::caching",
743                    parent_hash = %block_with_parent.parent,
744                    usage_count = cache.usage_count(),
745                    "Execution cache is in use, skip updating cache with new state for inserted executed block",
746                );
747                return
748            }
749
750            // Take existing cache (if any) or create fresh caches
751            let caches = match cached.take() {
752                Some(existing) => existing.cache().clone(),
753                None => ExecutionCache::new(self.cross_block_cache_size),
754            };
755
756            // Insert the block's bundle state into cache
757            let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
758            if new_cache.cache().insert_state(bundle_state).is_err() {
759                *cached = None;
760                debug!(target: "engine::caching", "cleared execution cache on update error");
761                return
762            }
763            new_cache.update_metrics(cache_state_metrics.as_ref());
764
765            // Replace with the updated cache
766            *cached = Some(new_cache);
767            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
768        });
769    }
770}
771
772/// Converts transactions sequentially and sends them to the prewarm and execute channels.
773fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
774    iter: impl Iterator<Item = RawTx>,
775    convert: &C,
776    prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
777    execute_tx: &ExecuteTxSender<TxEnv, Recovered, Err>,
778) where
779    Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
780    TxEnv: Clone,
781    C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
782{
783    for (idx, raw_tx) in iter.enumerate() {
784        let tx = convert.convert(raw_tx);
785        let tx = tx.map(|tx| WithTxEnv::new(tx));
786        if let Ok(tx) = &tx {
787            let _ = prewarm_tx.send((idx, tx.clone()));
788        }
789        let _ = execute_tx.send((idx, tx));
790        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
791    }
792}
793
794/// Handle to all the spawned tasks.
795///
796/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
797/// caching task without cloning the expensive `BundleState`.
798#[derive(Debug)]
799pub struct PayloadHandle<Tx, Err, R> {
800    /// Handle to the background state root computation, if spawned.
801    state_root_handle: Option<StateRootHandle>,
802    /// Whether main execution should stream per-tx state updates into the sparse trie task.
803    install_state_hook: bool,
804    // must include the receiver of the state root wired to the sparse trie
805    prewarm_handle: CacheTaskHandle<R>,
806    /// Stream of block transactions and their indices in the block.
807    transactions: IndexedTxReceiver<Tx, Err>,
808    /// Span for tracing
809    _span: Span,
810}
811
812impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
813    /// Awaits the state root
814    ///
815    /// # Panics
816    ///
817    /// If payload processing was started without background tasks.
818    #[instrument(
819        level = "debug",
820        target = "engine::tree::payload_processor",
821        name = "await_state_root",
822        skip_all
823    )]
824    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
825        self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
826    }
827
828    /// Takes the state root receiver out of the handle for use with custom waiting logic
829    /// (e.g., timeout-based waiting).
830    ///
831    /// # Panics
832    ///
833    /// If payload processing was started without background tasks.
834    pub const fn take_state_root_rx(
835        &mut self,
836    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
837        self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
838    }
839
840    /// Returns a state hook to stream execution state updates to the sparse trie cache task.
841    ///
842    /// Returns `None` when BAL-driven hashed state streaming feeds the sparse trie task.
843    pub fn state_hook(&self) -> Option<impl OnStateHook> {
844        self.install_state_hook
845            .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
846            .flatten()
847    }
848
849    /// Returns a clone of the sender that streams updates into the sparse-trie task. The BAL
850    /// execute path uses this to spawn its own sparse-trie streaming task fed from the
851    /// snapshot.
852    pub fn sparse_trie_updates_tx(&self) -> Option<CrossbeamSender<StateRootMessage>> {
853        self.state_root_handle.as_ref().map(|handle| handle.updates_tx().clone())
854    }
855
856    /// Returns a clone of the caches used by prewarming
857    pub fn caches(&self) -> Option<ExecutionCache> {
858        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
859    }
860
861    /// Returns engine cache metrics if a cache exists for prewarming.
862    pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
863        self.prewarm_handle.cache_metrics.clone()
864    }
865
866    /// Returns a reference to the shared executed transaction index counter.
867    ///
868    /// The main execution loop should store `index + 1` after executing each transaction so that
869    /// prewarm workers can skip transactions that have already been processed.
870    pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
871        &self.prewarm_handle.executed_tx_index
872    }
873
874    /// Terminates the pre-warming transaction processing.
875    ///
876    /// Note: This does not terminate the task yet.
877    pub fn stop_prewarming_execution(&self) {
878        self.prewarm_handle.stop_prewarming_execution()
879    }
880
881    /// Terminates the entire caching task.
882    ///
883    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
884    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
885    /// path without cloning the expensive `BundleState`.
886    ///
887    /// Returns a sender for the channel that should be notified on block validation success.
888    pub fn terminate_caching(
889        &mut self,
890        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
891    ) -> Option<mpsc::Sender<()>> {
892        self.prewarm_handle.terminate_caching(execution_outcome)
893    }
894
895    /// Returns iterator yielding transactions from the stream.
896    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
897        self.transactions.iter().map(|(_, tx)| tx)
898    }
899
900    /// Returns a clone of the indexed transaction receiver.
901    pub fn clone_transaction_receiver(&self) -> IndexedTxReceiver<Tx, Err> {
902        self.transactions.clone()
903    }
904
905    /// Takes the hashed state receiver out of the handle for use with custom waiting logic
906    pub fn take_hashed_state_rx(&mut self) -> Option<mpsc::Receiver<HashedPostState>> {
907        self.state_root_handle.as_mut().map(|handle| handle.take_hashed_state_rx())
908    }
909}
910
911/// Access to the spawned [`PrewarmCacheTask`].
912///
913/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
914/// prewarm task without cloning the expensive `BundleState`.
915#[derive(Debug)]
916pub struct CacheTaskHandle<R> {
917    /// The shared cache the task operates with.
918    saved_cache: Option<SavedCache>,
919    /// Channel to the spawned prewarm task if any
920    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
921    /// Shared counter tracking the next transaction index to be executed by the main execution
922    /// loop. Prewarm workers skip transactions below this index.
923    executed_tx_index: Arc<AtomicUsize>,
924    /// Metrics for the execution cache.
925    cache_metrics: Option<CachedStateMetrics>,
926}
927
928impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
929    /// Terminates the pre-warming transaction processing.
930    ///
931    /// Note: This does not terminate the task yet.
932    pub fn stop_prewarming_execution(&self) {
933        self.to_prewarm_task
934            .as_ref()
935            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
936    }
937
938    /// Terminates the entire pre-warming task.
939    ///
940    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
941    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
942    #[must_use = "sender must be used and notified on block validation success"]
943    pub fn terminate_caching(
944        &mut self,
945        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
946    ) -> Option<mpsc::Sender<()>> {
947        if let Some(tx) = self.to_prewarm_task.take() {
948            let (valid_block_tx, valid_block_rx) = mpsc::channel();
949            let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
950            let _ = tx.send(event);
951
952            Some(valid_block_tx)
953        } else {
954            None
955        }
956    }
957}
958
959impl<R> Drop for CacheTaskHandle<R> {
960    fn drop(&mut self) {
961        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
962        if let Some(tx) = self.to_prewarm_task.take() {
963            let _ = tx.send(PrewarmTaskEvent::Terminate {
964                execution_outcome: None,
965                valid_block_rx: mpsc::channel().1,
966            });
967        }
968    }
969}
970
971/// EVM context required to execute a block.
972#[derive(Debug, Clone)]
973pub struct ExecutionEnv<Evm: ConfigureEvm> {
974    /// Evm environment.
975    pub evm_env: EvmEnvFor<Evm>,
976    /// Hash of the block being executed.
977    pub hash: B256,
978    /// Hash of the parent block.
979    pub parent_hash: B256,
980    /// State root of the parent block.
981    /// Used for sparse trie continuation: if the preserved trie's anchor matches this,
982    /// the trie can be reused directly.
983    pub parent_state_root: B256,
984    /// Number of transactions in the block.
985    /// Used to determine parallel worker count for prewarming.
986    /// A value of 0 indicates the count is unknown.
987    pub transaction_count: usize,
988    /// Total gas used by all transactions in the block.
989    /// Used to adaptively select multiproof chunk size for optimal throughput.
990    pub gas_used: u64,
991    /// Withdrawals included in the block.
992    /// Used to generate prefetch targets for withdrawal addresses.
993    pub withdrawals: Option<Vec<Withdrawal>>,
994    /// Optional decoded BAL for the block.
995    /// Used to validate and optimize execution.
996    pub decoded_bal: Option<Arc<DecodedBal>>,
997}
998
999impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
1000where
1001    EvmEnvFor<Evm>: Default,
1002{
1003    /// Creates a new [`ExecutionEnv`] with default values for testing.
1004    #[cfg(any(test, feature = "test-utils"))]
1005    pub fn test_default() -> Self {
1006        Self {
1007            evm_env: Default::default(),
1008            hash: Default::default(),
1009            parent_hash: Default::default(),
1010            parent_state_root: Default::default(),
1011            transaction_count: 0,
1012            gas_used: 0,
1013            withdrawals: None,
1014            decoded_bal: None,
1015        }
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use crate::tree::{
1022        payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1023        precompile_cache::PrecompileCacheMap,
1024        ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
1025    };
1026    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1027    use rand::Rng;
1028    use reth_chainspec::ChainSpec;
1029    use reth_db_common::init::init_genesis;
1030    use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
1031    use reth_evm::OnStateHook;
1032    use reth_evm_ethereum::EthEvmConfig;
1033    use reth_execution_cache::CachedStatus;
1034    use reth_primitives_traits::{Account, Recovered, StorageEntry};
1035    use reth_provider::{
1036        providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
1037        test_utils::create_test_provider_factory_with_chain_spec,
1038        ChainSpecProvider, HashingWriter,
1039    };
1040    use reth_revm::db::BundleState;
1041    use reth_testing_utils::generators;
1042    use reth_trie::{test_utils::state_root, HashedPostState};
1043    use reth_trie_db::ChangesetCache;
1044    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1045    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot, TransactionId};
1046    use std::sync::Arc;
1047
1048    fn make_saved_cache(hash: B256) -> SavedCache {
1049        let execution_cache = ExecutionCache::new(1_000);
1050        SavedCache::new(hash, execution_cache)
1051    }
1052
1053    #[test]
1054    fn execution_cache_allows_single_checkout() {
1055        let execution_cache = PayloadExecutionCache::default();
1056        let hash = B256::from([1u8; 32]);
1057
1058        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1059
1060        let first = execution_cache.get_cache_for(hash);
1061        assert!(first.is_some(), "expected initial checkout to succeed");
1062
1063        let second = execution_cache.get_cache_for(hash);
1064        assert!(second.is_none(), "second checkout should be blocked while guard is active");
1065
1066        drop(first);
1067
1068        let third = execution_cache.get_cache_for(hash);
1069        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1070    }
1071
1072    #[test]
1073    fn execution_cache_checkout_releases_on_drop() {
1074        let execution_cache = PayloadExecutionCache::default();
1075        let hash = B256::from([2u8; 32]);
1076
1077        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1078
1079        {
1080            let guard = execution_cache.get_cache_for(hash);
1081            assert!(guard.is_some(), "expected checkout to succeed");
1082            // Guard dropped at end of scope
1083        }
1084
1085        let retry = execution_cache.get_cache_for(hash);
1086        assert!(retry.is_some(), "checkout should succeed after guard drop");
1087    }
1088
1089    #[test]
1090    fn execution_cache_mismatch_parent_clears_and_returns() {
1091        let execution_cache = PayloadExecutionCache::default();
1092        let hash = B256::from([3u8; 32]);
1093
1094        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1095
1096        // When the parent hash doesn't match (fork block), the cache is cleared,
1097        // hash updated on the original, and clone returned for reuse
1098        let different_hash = B256::from([4u8; 32]);
1099        let cache = execution_cache.get_cache_for(different_hash);
1100        assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1101
1102        drop(cache);
1103
1104        // The stored cache now has the fork block's parent hash.
1105        // Canonical chain looking for original hash sees a mismatch → clears and reuses.
1106        let original = execution_cache.get_cache_for(hash);
1107        assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1108    }
1109
1110    #[test]
1111    fn execution_cache_update_after_release_succeeds() {
1112        let execution_cache = PayloadExecutionCache::default();
1113        let initial = B256::from([5u8; 32]);
1114
1115        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1116
1117        let guard =
1118            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1119
1120        drop(guard);
1121
1122        let updated = B256::from([6u8; 32]);
1123        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1124
1125        let new_checkout = execution_cache.get_cache_for(updated);
1126        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1127    }
1128
1129    #[test]
1130    fn on_inserted_executed_block_populates_cache() {
1131        let payload_processor = PayloadProcessor::new(
1132            reth_tasks::Runtime::test(),
1133            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1134            &TreeConfig::default(),
1135            PrecompileCacheMap::default(),
1136        );
1137
1138        let parent_hash = B256::from([1u8; 32]);
1139        let block_hash = B256::from([10u8; 32]);
1140        let block_with_parent = BlockWithParent {
1141            block: BlockNumHash { hash: block_hash, number: 1 },
1142            parent: parent_hash,
1143        };
1144        let bundle_state = BundleState::default();
1145
1146        // Cache should be empty initially
1147        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1148
1149        // Update cache with inserted block
1150        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1151
1152        // Cache should now exist for the block hash
1153        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1154        assert!(cached.is_some());
1155        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1156    }
1157
1158    #[test]
1159    fn on_inserted_executed_block_skips_on_parent_mismatch() {
1160        let payload_processor = PayloadProcessor::new(
1161            reth_tasks::Runtime::test(),
1162            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1163            &TreeConfig::default(),
1164            PrecompileCacheMap::default(),
1165        );
1166
1167        // Setup: populate cache with block 1
1168        let block1_hash = B256::from([1u8; 32]);
1169        payload_processor
1170            .execution_cache
1171            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1172
1173        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
1174        let wrong_parent = B256::from([99u8; 32]);
1175        let block3_hash = B256::from([3u8; 32]);
1176        let block_with_parent = BlockWithParent {
1177            block: BlockNumHash { hash: block3_hash, number: 3 },
1178            parent: wrong_parent,
1179        };
1180        let bundle_state = BundleState::default();
1181
1182        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1183
1184        // Cache should still be for block 1 (unchanged)
1185        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1186        assert!(cached.is_some(), "Original cache should be preserved");
1187
1188        // Cache for block 3 should not exist
1189        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1190        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1191    }
1192
1193    #[test]
1194    fn on_inserted_executed_block_does_not_mutate_checked_out_parent_cache() {
1195        let payload_processor = PayloadProcessor::new(
1196            reth_tasks::Runtime::test(),
1197            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1198            &TreeConfig::default(),
1199            PrecompileCacheMap::default(),
1200        );
1201
1202        let parent_hash = B256::from([1u8; 32]);
1203        payload_processor
1204            .execution_cache
1205            .update_with_guard(|slot| *slot = Some(make_saved_cache(parent_hash)));
1206
1207        // Checking out the cache bumps its usage_guard refcount, marking the slot as in-use.
1208        // The returned SavedCache shares the same underlying ExecutionCache Arc as the slot,
1209        // so any writes through the slot are observable here
1210        let checked_out = payload_processor
1211            .execution_cache
1212            .get_cache_for(parent_hash)
1213            .expect("expected parent cache checkout to succeed");
1214
1215        let polluted_address = Address::random();
1216        let bundle_state = BundleState::builder(2..=2)
1217            .state_present_account_info(
1218                polluted_address,
1219                AccountInfo {
1220                    balance: U256::from(1337),
1221                    nonce: 7,
1222                    code_hash: KECCAK_EMPTY,
1223                    code: None,
1224                    account_id: None,
1225                },
1226            )
1227            .build();
1228
1229        // Make parent match the cached slot so we bypass the parent-mismatch guard and exercise
1230        // the in-use guard specifically.
1231        let block_with_parent = BlockWithParent {
1232            block: BlockNumHash { hash: B256::from([2u8; 32]), number: 2 },
1233            parent: parent_hash,
1234        };
1235
1236        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1237
1238        // The closure runs only on a cache miss, so NotCached(None) means polluted_address was
1239        // absent and Cached(Some(_)) means it was written by on_inserted_executed_block.
1240        let account = checked_out
1241            .cache()
1242            .get_or_try_insert_account_with(polluted_address, || Ok::<_, ()>(None))
1243            .expect("cache read should succeed");
1244
1245        assert_eq!(
1246            account,
1247            CachedStatus::NotCached(None),
1248            "checked-out parent cache should not observe state from inserted local block"
1249        );
1250    }
1251
1252    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1253        let mut rng = generators::rng();
1254        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1255        let mut updates = Vec::with_capacity(updates_per_account);
1256
1257        for _ in 0..updates_per_account {
1258            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1259            let mut state_update = EvmState::default();
1260
1261            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1262
1263            for &address in selected_addresses {
1264                let mut storage = HashMap::default();
1265                if rng.random_bool(0.7) {
1266                    for _ in 0..rng.random_range(1..10) {
1267                        let slot = U256::from(rng.random::<u64>());
1268                        storage.insert(
1269                            slot,
1270                            EvmStorageSlot::new_changed(
1271                                U256::ZERO,
1272                                U256::from(rng.random::<u64>()),
1273                                TransactionId::ZERO,
1274                            ),
1275                        );
1276                    }
1277                }
1278
1279                let mut account = revm_state::Account::default();
1280                account.info = AccountInfo {
1281                    balance: U256::from(rng.random::<u64>()),
1282                    nonce: rng.random::<u64>(),
1283                    code_hash: KECCAK_EMPTY,
1284                    code: Some(Default::default()),
1285                    account_id: None,
1286                };
1287                account.storage = storage;
1288                account.status = AccountStatus::Touched;
1289                account.transaction_id = TransactionId::ZERO;
1290
1291                state_update.insert(address, account);
1292            }
1293
1294            updates.push(state_update);
1295        }
1296
1297        updates
1298    }
1299
1300    #[test]
1301    fn test_state_root() {
1302        reth_tracing::init_test_tracing();
1303
1304        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1305        let genesis_hash = init_genesis(&factory).unwrap();
1306
1307        let state_updates = create_mock_state_updates(10, 10);
1308        let mut hashed_state = HashedPostState::default();
1309        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1310            HashMap::default();
1311
1312        {
1313            let provider_rw = factory.provider_rw().expect("failed to get provider");
1314
1315            for update in &state_updates {
1316                let account_updates = update.iter().map(|(address, account)| {
1317                    (*address, Some(Account::from_revm_account(account)))
1318                });
1319                provider_rw
1320                    .insert_account_for_hashing(account_updates)
1321                    .expect("failed to insert accounts");
1322
1323                let storage_updates = update.iter().map(|(address, account)| {
1324                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1325                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1326                    });
1327                    (*address, storage_entries)
1328                });
1329                provider_rw
1330                    .insert_storage_for_hashing(storage_updates)
1331                    .expect("failed to insert storage");
1332            }
1333            provider_rw.commit().expect("failed to commit changes");
1334        }
1335
1336        for update in &state_updates {
1337            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1338
1339            for (address, account) in update {
1340                let storage: HashMap<B256, U256> = account
1341                    .storage
1342                    .iter()
1343                    .map(|(k, v)| (B256::from(*k), v.present_value))
1344                    .collect();
1345
1346                let entry = accumulated_state.entry(*address).or_default();
1347                entry.0 = Account::from_revm_account(account);
1348                entry.1.extend(storage);
1349            }
1350        }
1351
1352        let mut payload_processor = PayloadProcessor::new(
1353            reth_tasks::Runtime::test(),
1354            EthEvmConfig::new(factory.chain_spec()),
1355            &TreeConfig::default(),
1356            PrecompileCacheMap::default(),
1357        );
1358
1359        let provider_factory = BlockchainProvider::new(factory).unwrap();
1360
1361        let mut handle = payload_processor.spawn(
1362            ExecutionEnv::test_default(),
1363            (
1364                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1365                std::convert::identity,
1366            ),
1367            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1368            OverlayStateProviderFactory::new(
1369                provider_factory,
1370                OverlayBuilder::<EthPrimitives>::new(genesis_hash, ChangesetCache::new()),
1371            ),
1372            &TreeConfig::default(),
1373            false,
1374        );
1375
1376        let mut state_hook = handle.state_hook().expect("state hook is None");
1377
1378        for update in state_updates {
1379            state_hook.on_state(&update);
1380        }
1381        drop(state_hook);
1382
1383        let root_from_task = handle.state_root().expect("task failed").state_root;
1384        let root_from_regular = state_root(accumulated_state);
1385
1386        assert_eq!(
1387            root_from_task, root_from_regular,
1388            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1389        );
1390    }
1391
1392    /// Tests the full prewarm lifecycle for a fork block:
1393    ///
1394    /// 1. Cache is at canonical block 4.
1395    /// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
1396    ///    `PrewarmCacheTask` does when it receives a `SavedCache`.
1397    /// 3. Prewarm populates the shared cache with fork-specific state.
1398    /// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
1399    /// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
1400    /// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
1401    ///    data.
1402    #[test]
1403    fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1404        let execution_cache = PayloadExecutionCache::default();
1405
1406        // Canonical chain at block 4.
1407        let block4_hash = B256::from([4u8; 32]);
1408        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1409
1410        // Fork block arrives with parent = block 2. Prewarm task checks out the cache.
1411        // This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
1412        let fork_parent = B256::from([2u8; 32]);
1413        let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1414        assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1415        let prewarm_cache = prewarm_cache.unwrap();
1416        assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1417
1418        // Prewarm populates cache with fork-specific state (ancestor data for block 2).
1419        // Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
1420        let fork_addr = Address::from([0xBB; 20]);
1421        let fork_key = B256::from([0xCC; 32]);
1422        prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1423
1424        // While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
1425        let during_prewarm = execution_cache.get_cache_for(block4_hash);
1426        assert!(
1427            during_prewarm.is_none(),
1428            "cache must be unavailable while prewarm holds a reference"
1429        );
1430
1431        // Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
1432        drop(prewarm_cache);
1433
1434        // Canonical block 5 arrives (parent = block 4).
1435        // Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
1436        // clears the stale fork data, and returns a cache with hash = block4_hash.
1437        let block5_cache = execution_cache.get_cache_for(block4_hash);
1438        assert!(
1439            block5_cache.is_some(),
1440            "canonical chain must get cache after fork prewarm is dropped"
1441        );
1442        assert_eq!(
1443            block5_cache.as_ref().unwrap().executed_block_hash(),
1444            block4_hash,
1445            "cache must carry the canonical parent hash, not the fork parent"
1446        );
1447    }
1448}