Skip to main content

reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6    sparse_trie::SparseTrieCacheTask,
7    CacheWaitDurations, CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource,
8    ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
9    WaitForCaches,
10};
11use alloy_eip7928::bal::DecodedBal;
12use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
13use alloy_primitives::B256;
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use multiproof::*;
16use prewarm::PrewarmMetrics;
17use rayon::prelude::*;
18use reth_evm::{
19    block::ExecutableTxParts,
20    execute::{ExecutableTxFor, WithTxEnv},
21    ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
22    SpecFor, TxEnvFor,
23};
24use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
25use reth_provider::{
26    BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
27};
28use reth_revm::db::BundleState;
29use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
30use reth_trie::{
31    hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory, HashedPostState,
32};
33use reth_trie_parallel::{
34    proof_task::{ProofTaskCtx, ProofWorkerHandle},
35    root::ParallelStateRootError,
36};
37use reth_trie_sparse::{ArenaParallelSparseTrie, RevealableSparseTrie, SparseStateTrie};
38use std::{
39    ops::Not,
40    sync::{
41        atomic::{AtomicBool, AtomicUsize},
42        mpsc::{self, channel},
43        Arc, OnceLock,
44    },
45};
46use tracing::{debug, debug_span, instrument, trace, warn, Span};
47
48pub mod bal;
49pub(crate) mod bal_prewarm_pool;
50pub mod multiproof;
51mod preserved_sparse_trie;
52pub mod prewarm;
53pub mod receipt_root_task;
54pub mod sparse_trie;
55
56use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
57
58/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
59/// prewarm workers exceeds the execution time saved.
60pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
61
62/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
63type IteratorTx<Evm, I> = RecoveredTx<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
64
65type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
66    IteratorTx<Evm, I>,
67    <I as ExecutableTxTuple>::Error,
68    <N as NodePrimitives>::Receipt,
69>;
70
71type IteratorPrewarmTxReceiver<Evm, I> =
72    PrewarmTxReceiver<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
73
74type IteratorExecuteTxReceiver<Evm, I> = ExecuteTxReceiver<
75    TxEnvFor<Evm>,
76    <I as ExecutableTxIterator<Evm>>::Recovered,
77    <I as ExecutableTxTuple>::Error,
78>;
79
80type RecoveredTx<TxEnv, Recovered> = WithTxEnv<TxEnv, Recovered>;
81type IndexedTxResult<Tx, Err> = (usize, Result<Tx, Err>);
82type IndexedTxReceiver<Tx, Err> = CrossbeamReceiver<IndexedTxResult<Tx, Err>>;
83type IndexedTxSender<Tx, Err> = CrossbeamSender<IndexedTxResult<Tx, Err>>;
84type PrewarmTxReceiver<TxEnv, Recovered> = mpsc::Receiver<(usize, RecoveredTx<TxEnv, Recovered>)>;
85type ExecuteTxReceiver<TxEnv, Recovered, Err> =
86    IndexedTxReceiver<RecoveredTx<TxEnv, Recovered>, Err>;
87type ExecuteTxSender<TxEnv, Recovered, Err> = IndexedTxSender<RecoveredTx<TxEnv, Recovered>, Err>;
88
89/// Entrypoint for executing the payload.
90#[derive(Debug)]
91pub struct PayloadProcessor<Evm>
92where
93    Evm: ConfigureEvm,
94{
95    /// The executor used by to spawn tasks.
96    executor: Runtime,
97    /// The most recent cache used for execution.
98    execution_cache: PayloadExecutionCache,
99    /// Metrics for the execution cache.
100    cache_metrics: Option<CachedStateMetrics>,
101    /// Metrics for shared execution cache state.
102    cache_state_metrics: Option<CachedStateCacheMetrics>,
103    /// Metrics for trie operations
104    trie_metrics: MultiProofTaskMetrics,
105    /// Cross-block cache size in bytes.
106    cross_block_cache_size: usize,
107    /// Whether transactions should not be executed on prewarming task.
108    disable_transaction_prewarming: bool,
109    /// Whether state cache should be disable
110    disable_state_cache: bool,
111    /// Determines how to configure the evm for execution.
112    evm_config: Evm,
113    /// Whether precompile cache should be disabled.
114    precompile_cache_disabled: bool,
115    /// Precompile cache map.
116    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
117    /// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
118    /// re-use allocated memory. Stored with the block hash it was computed for to enable trie
119    /// preservation across sequential payload validations.
120    sparse_state_trie: SharedPreservedSparseTrie,
121    /// LFU hot-slot capacity: max storage slots retained across prune cycles.
122    sparse_trie_max_hot_slots: usize,
123    /// LFU hot-account capacity: max account addresses retained across prune cycles.
124    sparse_trie_max_hot_accounts: usize,
125    /// Whether sparse trie cache pruning is fully disabled.
126    disable_sparse_trie_cache_pruning: bool,
127    /// Whether to disable BAL-driven parallel state root computation.
128    /// Only valid when BAL parallel execution is also disabled.
129    disable_bal_parallel_state_root: bool,
130    /// Whether BAL state prefetching during prewarm is disabled.
131    disable_bal_batch_io: bool,
132    /// Dedicated blocking pool for warming the BAL read-set, created lazily on the first BAL block
133    /// (see [`Self::bal_prewarm_pool`]). Its threads exit when the processor is dropped.
134    bal_prewarm_pool: OnceLock<Arc<bal_prewarm_pool::BalPrewarmPool>>,
135}
136
137impl<N, Evm> PayloadProcessor<Evm>
138where
139    N: NodePrimitives,
140    Evm: ConfigureEvm<Primitives = N>,
141{
142    /// Returns a reference to the workload executor driving payload tasks.
143    pub const fn executor(&self) -> &Runtime {
144        &self.executor
145    }
146
147    /// Creates a new payload processor.
148    pub fn new(
149        executor: Runtime,
150        evm_config: Evm,
151        config: &TreeConfig,
152        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
153    ) -> Self {
154        Self {
155            executor,
156            execution_cache: Default::default(),
157            trie_metrics: Default::default(),
158            cross_block_cache_size: config.cross_block_cache_size(),
159            disable_transaction_prewarming: config.disable_prewarming(),
160            evm_config,
161            disable_state_cache: config.disable_state_cache(),
162            precompile_cache_disabled: config.precompile_cache_disabled(),
163            precompile_cache_map,
164            sparse_state_trie: SharedPreservedSparseTrie::default(),
165            sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
166            sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
167            disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
168            cache_metrics: (!config.disable_cache_metrics())
169                .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
170            cache_state_metrics: (!config.disable_cache_metrics())
171                .then(CachedStateCacheMetrics::default),
172            disable_bal_parallel_state_root: config.disable_bal_parallel_state_root(),
173            disable_bal_batch_io: config.disable_bal_batch_io(),
174            bal_prewarm_pool: OnceLock::new(),
175        }
176    }
177
178    /// Returns the dedicated BAL read-set prewarm pool, spawning its blocking worker threads on
179    /// first use (only the BAL parallel execution path calls this).
180    fn bal_prewarm_pool(&self) -> Arc<bal_prewarm_pool::BalPrewarmPool> {
181        self.bal_prewarm_pool
182            .get_or_init(|| {
183                bal_prewarm_pool::BalPrewarmPool::new(bal_prewarm_pool::DEFAULT_BAL_PREWARM_THREADS)
184            })
185            .clone()
186    }
187}
188
189impl<Evm> WaitForCaches for PayloadProcessor<Evm>
190where
191    Evm: ConfigureEvm,
192{
193    fn wait_for_caches(&self) -> CacheWaitDurations {
194        debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
195
196        // Wait for both caches in parallel using std threads
197        let execution_cache = self.execution_cache.clone();
198        let sparse_trie = self.sparse_state_trie.clone();
199
200        // Use channels and spawn_blocking instead of std::thread::spawn
201        let (execution_tx, execution_rx) = std::sync::mpsc::channel();
202        let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
203
204        self.executor.spawn_blocking_named("wait-exec-cache", move || {
205            let _ = execution_tx.send(execution_cache.wait_for_availability());
206        });
207        self.executor.spawn_blocking_named("wait-sparse-tri", move || {
208            let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
209        });
210
211        let execution_cache_duration =
212            execution_rx.recv().expect("execution cache wait task failed to send result");
213        let sparse_trie_duration =
214            sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
215
216        debug!(
217            target: "engine::tree::payload_processor",
218            ?execution_cache_duration,
219            ?sparse_trie_duration,
220            "Execution cache and sparse trie locks acquired"
221        );
222        CacheWaitDurations {
223            execution_cache: execution_cache_duration,
224            sparse_trie: sparse_trie_duration,
225        }
226    }
227}
228
229impl<N, Evm> PayloadProcessor<Evm>
230where
231    N: NodePrimitives,
232    Evm: ConfigureEvm<Primitives = N> + 'static,
233{
234    /// Spawns all background tasks and returns a handle connected to the tasks.
235    ///
236    /// - Transaction prewarming task
237    /// - State root task
238    /// - Sparse trie task
239    ///
240    /// # Transaction prewarming task
241    ///
242    /// Responsible for feeding state updates to the sparse trie task.
243    ///
244    /// This task runs until:
245    ///  - externally cancelled (e.g. sequential block execution is complete)
246    ///
247    /// ## Sparse trie task
248    ///
249    /// Responsible for calculating the state root.
250    ///
251    /// This task runs until there are no further updates to process.
252    ///
253    ///
254    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
255    /// canceling)
256    #[instrument(
257        level = "debug",
258        target = "engine::tree::payload_processor",
259        name = "payload processor",
260        skip_all
261    )]
262    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
263        &mut self,
264        env: ExecutionEnv<Evm>,
265        transactions: I,
266        provider_builder: StateProviderBuilder<N, P>,
267        multiproof_provider_factory: F,
268        config: &TreeConfig,
269        parallel_bal_execution: bool,
270    ) -> IteratorPayloadHandle<Evm, I, N>
271    where
272        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
273        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
274            + Clone
275            + Send
276            + Sync
277            + 'static,
278    {
279        // start preparing transactions immediately
280        let (prewarm_rx, execution_rx) =
281            self.spawn_tx_iterator(transactions, env.transaction_count, parallel_bal_execution);
282
283        let span = Span::current();
284
285        let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
286        let state_root_handle = self.spawn_state_root(
287            multiproof_provider_factory,
288            env.parent_state_root,
289            halve_workers,
290            config,
291        );
292        // BAL blocks only bypass the normal execution state hook when the validator decided that
293        // the parallel BAL executor will consume this block. If not, treat the BAL as absent here
294        // so the block follows today's sequential execution and transaction-prewarm path.
295        //
296        // In the parallel BAL path, prewarm owns BAL-derived sparse-trie updates and optional
297        // BAL state prefetching. State-cache disabled mode still uses the BAL executor, but
298        // `saved_cache` is absent below, so prewarm skips cache-backed state prefetching.
299        // `disable_bal_batch_io` controls the prefetch half when a cache exists.
300        let install_state_hook = !parallel_bal_execution;
301        let prewarm_handle = self.spawn_caching_with(
302            env,
303            prewarm_rx,
304            provider_builder,
305            Some(state_root_handle.updates_tx().clone()),
306            parallel_bal_execution,
307        );
308
309        PayloadHandle {
310            state_root_handle: Some(state_root_handle),
311            install_state_hook,
312            prewarm_handle,
313            transactions: execution_rx,
314            _span: span,
315        }
316    }
317
318    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
319    ///
320    /// Returns a [`PayloadHandle`] to communicate with the task.
321    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
322    pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
323        &self,
324        env: ExecutionEnv<Evm>,
325        transactions: I,
326        provider_builder: StateProviderBuilder<N, P>,
327        parallel_bal_execution: bool,
328    ) -> IteratorPayloadHandle<Evm, I, N>
329    where
330        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
331    {
332        let (prewarm_rx, execution_rx) =
333            self.spawn_tx_iterator(transactions, env.transaction_count, parallel_bal_execution);
334        let prewarm_handle =
335            self.spawn_caching_with(env, prewarm_rx, provider_builder, None, false);
336        PayloadHandle {
337            state_root_handle: None,
338            install_state_hook: false,
339            prewarm_handle,
340            transactions: execution_rx,
341            _span: Span::current(),
342        }
343    }
344
345    /// Spawns state root computation pipeline (multiproof + sparse trie tasks).
346    ///
347    /// The returned [`StateRootHandle`] provides:
348    /// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
349    ///   execution.
350    /// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
351    ///   state root.
352    ///
353    /// The state hook **must** be dropped after execution to signal the end of state updates.
354    ///
355    /// When `halve_workers` is true, the proof worker pool is halved (for small blocks where
356    /// fewer transactions produce fewer state changes and most workers would be idle).
357    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
358    pub fn spawn_state_root<F>(
359        &self,
360        multiproof_provider_factory: F,
361        parent_state_root: B256,
362        halve_workers: bool,
363        config: &TreeConfig,
364    ) -> StateRootHandle
365    where
366        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
367            + Clone
368            + Send
369            + Sync
370            + 'static,
371    {
372        let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
373
374        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
375        #[cfg(feature = "trie-debug")]
376        let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
377        let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
378
379        let (state_root_tx, state_root_rx) = channel();
380        let (hashed_state_tx, hashed_state_rx) = channel();
381
382        self.spawn_sparse_trie_task(
383            proof_handle,
384            state_root_tx,
385            hashed_state_tx,
386            from_multi_proof,
387            parent_state_root,
388            config.multiproof_chunk_size(),
389        );
390
391        StateRootHandle::new(parent_state_root, updates_tx, state_root_rx, hashed_state_rx)
392    }
393
394    /// Transaction count threshold below which proof workers are halved, since fewer transactions
395    /// produce fewer state changes and most workers would be idle overhead.
396    const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
397
398    /// Transaction count threshold below which sequential conversion is used.
399    ///
400    /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
401    /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
402    /// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
403    /// blocks.
404    const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
405
406    /// Number of leading transactions to convert sequentially before entering the rayon
407    /// parallel path.
408    ///
409    /// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
410    /// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
411    /// a small head sequentially and sending it immediately, execution can start without
412    /// waiting for rayon scheduling.
413    const PARALLEL_PREFETCH_COUNT: usize = 4;
414
415    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
416    ///
417    /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
418    /// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
419    /// iteration to convert transactions in parallel while streaming results to execution.
420    ///
421    /// When `parallel_bal_execution` is disabled, uses [`ForEachOrdered`] to preserve the
422    /// original transaction order. Otherwise, streams results as they become available.
423    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
424    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
425        &self,
426        transactions: I,
427        transaction_count: usize,
428        parallel_bal_execution: bool,
429    ) -> (IteratorPrewarmTxReceiver<Evm, I>, IteratorExecuteTxReceiver<Evm, I>) {
430        let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
431        let (execute_tx, execute_rx) = crossbeam_channel::bounded(transaction_count);
432
433        if transaction_count == 0 {
434            // Empty block — nothing to do.
435        } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
436            // Sequential path for small blocks — avoids rayon work-stealing setup and
437            // channel-based reorder overhead when it costs more than sequential conversion.
438            debug!(
439                target: "engine::tree::payload_processor",
440                transaction_count,
441                "using sequential sig recovery for small block"
442            );
443            self.executor.spawn_blocking_named("tx-iterator", move || {
444                let (transactions, convert) = transactions.into_parts();
445                convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
446            });
447        } else {
448            // Parallel path — recover signatures in parallel on rayon, stream results
449            // to prewarming and execution.
450            let executor = self.executor.clone();
451            self.executor.spawn_blocking_named("tx-iterator", move || {
452                let (transactions, convert) = transactions.into_parts();
453                if parallel_bal_execution {
454                    // With BALs, we don't care about the order of transactions in execution and
455                    // prewarming, so we don't have to use `for_each_ordered_in`.
456                    executor.cpu_pool().install(|| {
457                        transactions
458                            .into_par_iter()
459                            .enumerate()
460                            .map(|(i, tx)| {
461                                let tx = convert.convert(tx);
462                                (i, tx)
463                            })
464                            .for_each(|(idx, tx)| {
465                                let tx = tx.map(|tx| {
466                                    let tx = WithTxEnv::new(tx);
467                                    let _ = prewarm_tx.send((idx, tx.clone()));
468                                    tx
469                                });
470                                let _ = execute_tx.send((idx, tx));
471                                trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
472                            });
473                    });
474                } else {
475                    // To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
476                    // few transactions are recovered sequentially and sent immediately before
477                    // entering the parallel iterator for the remainder.
478                    let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
479                    let mut all: Vec<_> = transactions.into_iter().collect();
480                    let rest = all.split_off(prefetch);
481
482                    // Convert the first few transactions sequentially so execution can
483                    // start immediately without waiting for rayon work-stealing.
484                    convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
485
486                    // Without BALs, we need to preserve the initial order of transactions,
487                    // so we have to use `for_each_ordered_in`.
488                    rest.into_par_iter()
489                        .enumerate()
490                        .map(|(i, tx)| {
491                            let idx = i + prefetch;
492                            let tx = convert.convert(tx);
493                            (idx, tx)
494                        })
495                        .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
496                            let tx = tx.map(|tx| {
497                                let tx = WithTxEnv::new(tx);
498                                let _ = prewarm_tx.send((idx, tx.clone()));
499                                tx
500                            });
501                            let _ = execute_tx.send((idx, tx));
502                            trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
503                        });
504                }
505            });
506        }
507
508        (prewarm_rx, execute_rx)
509    }
510
511    /// Spawn prewarming optionally wired to the sparse trie task for target updates.
512    ///
513    /// `parallel_bal_execution` is true when the BAL execute path will execute this block. In
514    /// that case prewarm runs in BAL mode: it streams BAL-derived sparse-trie updates and,
515    /// unless `disable_bal_batch_io` is set, prefetches BAL-declared state into the shared cache.
516    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
517    fn spawn_caching_with<P>(
518        &self,
519        env: ExecutionEnv<Evm>,
520        transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
521        provider_builder: StateProviderBuilder<N, P>,
522        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
523        parallel_bal_execution: bool,
524    ) -> CacheTaskHandle<N::Receipt>
525    where
526        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
527    {
528        let mode = if parallel_bal_execution {
529            PrewarmMode::BlockAccessList(
530                env.decoded_bal.clone().expect("BAL dispatch implies decoded BAL"),
531            )
532        } else if self.disable_transaction_prewarming ||
533            env.transaction_count < SMALL_BLOCK_TX_THRESHOLD
534        {
535            PrewarmMode::Skipped
536        } else {
537            PrewarmMode::Transactions(transactions)
538        };
539        let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
540
541        let executed_tx_index = Arc::new(AtomicUsize::new(0));
542        // configure prewarming
543        let prewarm_ctx = PrewarmContext {
544            env,
545            evm_config: self.evm_config.clone(),
546            saved_cache: saved_cache.clone(),
547            provider: provider_builder,
548            bal_prewarm_pool: parallel_bal_execution.then(|| self.bal_prewarm_pool()),
549            metrics: PrewarmMetrics::default(),
550            cache_metrics: self.cache_metrics.clone(),
551            cache_state_metrics: self.cache_state_metrics.clone(),
552            terminate_execution: Arc::new(AtomicBool::new(false)),
553            executed_tx_index: Arc::clone(&executed_tx_index),
554            precompile_cache_disabled: self.precompile_cache_disabled,
555            precompile_cache_map: self.precompile_cache_map.clone(),
556            disable_bal_parallel_state_root: self.disable_bal_parallel_state_root,
557            disable_bal_batch_io: self.disable_bal_batch_io,
558        };
559
560        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
561            self.executor.clone(),
562            self.execution_cache.clone(),
563            prewarm_ctx,
564            to_sparse_trie_task,
565        );
566        {
567            let to_prewarm_task = to_prewarm_task.clone();
568            self.executor.spawn_blocking_named("prewarm", move || {
569                prewarm_task.run(mode, to_prewarm_task);
570            });
571        }
572
573        CacheTaskHandle {
574            saved_cache,
575            to_prewarm_task: Some(to_prewarm_task),
576            executed_tx_index,
577            cache_metrics: self.cache_metrics.clone(),
578        }
579    }
580
581    /// Returns the cache for the given parent hash.
582    ///
583    /// If the given hash is different then what is recently cached, then this will create a new
584    /// instance.
585    #[instrument(level = "debug", target = "engine::caching", skip(self))]
586    pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
587        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
588            debug!("reusing execution cache");
589            cache
590        } else {
591            debug!("creating new execution cache on cache miss");
592            let start = Instant::now();
593            let cache = ExecutionCache::new(self.cross_block_cache_size);
594            if let Some(metrics) = &self.cache_metrics {
595                metrics.record_cache_creation(start.elapsed());
596            }
597            SavedCache::new(parent_hash, cache)
598        }
599    }
600
601    /// Spawns the [`SparseTrieCacheTask`] for this payload processor.
602    ///
603    /// The trie is preserved when the new payload is a child of the previous one.
604    fn spawn_sparse_trie_task(
605        &self,
606        proof_worker_handle: ProofWorkerHandle,
607        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
608        hashed_state_tx: mpsc::Sender<HashedPostState>,
609        from_multi_proof: CrossbeamReceiver<StateRootMessage>,
610        parent_state_root: B256,
611        chunk_size: usize,
612    ) {
613        let preserved_sparse_trie = self.sparse_state_trie.clone();
614        let trie_metrics = self.trie_metrics.clone();
615        let max_hot_slots = self.sparse_trie_max_hot_slots;
616        let max_hot_accounts = self.sparse_trie_max_hot_accounts;
617        let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
618        let executor = self.executor.clone();
619
620        let parent_span = Span::current();
621        self.executor.spawn_blocking_named("sparse-trie", move || {
622            reth_tasks::once!(increase_thread_priority);
623
624            let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
625                .entered();
626
627            // Reuse a stored SparseStateTrie if available, applying continuation logic.
628            // If this payload's parent state root matches the preserved trie's anchor,
629            // we can reuse the pruned trie structure. Otherwise, we clear the trie but
630            // keep allocations.
631            let start = Instant::now();
632            let preserved = preserved_sparse_trie.take();
633            trie_metrics
634                .sparse_trie_cache_wait_duration_histogram
635                .record(start.elapsed().as_secs_f64());
636
637            let mut sparse_state_trie = preserved
638                .map(|preserved| preserved.into_trie_for(parent_state_root))
639                .unwrap_or_else(|| {
640                    debug!(
641                        target: "engine::tree::payload_processor",
642                        "Creating new sparse trie - no preserved trie available"
643                    );
644                    let default_trie =
645                        RevealableSparseTrie::blind_from(ArenaParallelSparseTrie::default());
646                    SparseStateTrie::default()
647                        .with_accounts_trie(default_trie.clone())
648                        .with_default_storage_trie(default_trie)
649                        .with_updates(true)
650                });
651            sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
652
653            let mut task = SparseTrieCacheTask::new_with_trie(
654                &executor,
655                from_multi_proof,
656                hashed_state_tx,
657                proof_worker_handle,
658                trie_metrics.clone(),
659                sparse_state_trie,
660                parent_state_root,
661                chunk_size,
662            );
663
664            let result = task.run();
665
666            // Acquire the guard before sending the result to prevent a race condition:
667            // Without this, the next block could start after send() but before store(),
668            // causing take() to return None and forcing it to create a new empty trie
669            // instead of reusing the preserved one. Holding the guard ensures the next
670            // block's take() blocks until we've stored the trie for reuse.
671            let mut guard = preserved_sparse_trie.lock();
672
673            let task_result = result.as_ref().ok().cloned();
674            // Send state root computation result - next block may start but will block on take()
675            if state_root_tx.send(result).is_err() {
676                // Receiver dropped - payload was likely invalid or cancelled.
677                // Clear the trie instead of preserving potentially invalid state.
678                debug!(
679                    target: "engine::tree::payload_processor",
680                    "State root receiver dropped, clearing trie"
681                );
682                let (trie, deferred) = task.into_cleared_trie();
683                guard.store(PreservedSparseTrie::cleared(trie));
684                drop(guard);
685                executor.spawn_drop(deferred);
686                return;
687            }
688
689            // Only preserve the trie as anchored if computation succeeded.
690            // A failed computation may have left the trie in a partially updated state.
691            let _enter =
692                debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
693            let deferred = if let Some(result) = task_result {
694                let start = Instant::now();
695                let (trie, deferred) = task.into_trie_for_reuse(
696                    max_hot_slots,
697                    max_hot_accounts,
698                    disable_cache_pruning,
699                );
700                trie_metrics
701                    .into_trie_for_reuse_duration_histogram
702                    .record(start.elapsed().as_secs_f64());
703                trie_metrics
704                    .sparse_trie_retained_memory_bytes
705                    .set(trie.memory_size() as f64);
706                trie_metrics
707                    .sparse_trie_retained_storage_tries
708                    .set(trie.retained_storage_tries_count() as f64);
709                guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
710                deferred
711            } else {
712                debug!(
713                    target: "engine::tree::payload_processor",
714                    "State root computation failed, clearing trie"
715                );
716                let (trie, deferred) = task.into_cleared_trie();
717                guard.store(PreservedSparseTrie::cleared(trie));
718                deferred
719            };
720            drop(guard);
721            executor.spawn_drop(deferred);
722        });
723    }
724
725    /// Updates the execution cache with the post-execution state from an inserted block.
726    ///
727    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
728    /// to ensure the cache remains warm for subsequent block execution.
729    ///
730    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
731    /// hitting the database, maintaining performance consistency.
732    pub fn on_inserted_executed_block(
733        &self,
734        block_with_parent: BlockWithParent,
735        bundle_state: &BundleState,
736    ) {
737        let cache_state_metrics = self.cache_state_metrics.clone();
738        self.execution_cache.update_with_guard(|cached| {
739            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
740                debug!(
741                    target: "engine::caching",
742                    parent_hash = %block_with_parent.parent,
743                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
744                );
745                return
746            }
747
748            if let Some(cache) = cached.as_ref().filter(|cache| !cache.is_available()) {
749                debug!(
750                    target: "engine::caching",
751                    parent_hash = %block_with_parent.parent,
752                    usage_count = cache.usage_count(),
753                    "Execution cache is in use, skip updating cache with new state for inserted executed block",
754                );
755                return
756            }
757
758            // Take existing cache (if any) or create fresh caches
759            let caches = match cached.take() {
760                Some(existing) => existing.cache().clone(),
761                None => ExecutionCache::new(self.cross_block_cache_size),
762            };
763
764            // Insert the block's bundle state into cache
765            let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
766            if new_cache.cache().insert_state(bundle_state).is_err() {
767                *cached = None;
768                debug!(target: "engine::caching", "cleared execution cache on update error");
769                return
770            }
771            new_cache.update_metrics(cache_state_metrics.as_ref());
772
773            // Replace with the updated cache
774            *cached = Some(new_cache);
775            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
776        });
777    }
778}
779
780/// Converts transactions sequentially and sends them to the prewarm and execute channels.
781fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
782    iter: impl Iterator<Item = RawTx>,
783    convert: &C,
784    prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
785    execute_tx: &ExecuteTxSender<TxEnv, Recovered, Err>,
786) where
787    Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
788    TxEnv: Clone,
789    C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
790{
791    for (idx, raw_tx) in iter.enumerate() {
792        let tx = convert.convert(raw_tx);
793        let tx = tx.map(|tx| WithTxEnv::new(tx));
794        if let Ok(tx) = &tx {
795            let _ = prewarm_tx.send((idx, tx.clone()));
796        }
797        let _ = execute_tx.send((idx, tx));
798        trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
799    }
800}
801
802/// Handle to all the spawned tasks.
803///
804/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
805/// caching task without cloning the expensive `BundleState`.
806#[derive(Debug)]
807pub struct PayloadHandle<Tx, Err, R> {
808    /// Handle to the background state root computation, if spawned.
809    state_root_handle: Option<StateRootHandle>,
810    /// Whether main execution should stream per-tx state updates into the sparse trie task.
811    install_state_hook: bool,
812    // must include the receiver of the state root wired to the sparse trie
813    prewarm_handle: CacheTaskHandle<R>,
814    /// Stream of block transactions and their indices in the block.
815    transactions: IndexedTxReceiver<Tx, Err>,
816    /// Span for tracing
817    _span: Span,
818}
819
820impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
821    /// Awaits the state root
822    ///
823    /// # Panics
824    ///
825    /// If payload processing was started without background tasks.
826    #[instrument(
827        level = "debug",
828        target = "engine::tree::payload_processor",
829        name = "await_state_root",
830        skip_all
831    )]
832    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
833        self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
834    }
835
836    /// Takes the state root receiver out of the handle for use with custom waiting logic
837    /// (e.g., timeout-based waiting).
838    ///
839    /// # Panics
840    ///
841    /// If payload processing was started without background tasks.
842    pub const fn take_state_root_rx(
843        &mut self,
844    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
845        self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
846    }
847
848    /// Returns a state hook to stream execution state updates to the sparse trie cache task.
849    ///
850    /// Returns `None` when BAL-driven hashed state streaming feeds the sparse trie task.
851    pub fn state_hook(&self) -> Option<impl OnStateHook> {
852        self.install_state_hook
853            .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
854            .flatten()
855    }
856
857    /// Returns a clone of the sender that streams updates into the sparse-trie task. The BAL
858    /// execute path uses this to spawn its own sparse-trie streaming task fed from the
859    /// snapshot.
860    pub fn sparse_trie_updates_tx(&self) -> Option<CrossbeamSender<StateRootMessage>> {
861        self.state_root_handle.as_ref().map(|handle| handle.updates_tx().clone())
862    }
863
864    /// Returns a clone of the caches used by prewarming
865    pub fn caches(&self) -> Option<ExecutionCache> {
866        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
867    }
868
869    /// Returns engine cache metrics if a cache exists for prewarming.
870    pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
871        self.prewarm_handle.cache_metrics.clone()
872    }
873
874    /// Returns a reference to the shared executed transaction index counter.
875    ///
876    /// The main execution loop should store `index + 1` after executing each transaction so that
877    /// prewarm workers can skip transactions that have already been processed.
878    pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
879        &self.prewarm_handle.executed_tx_index
880    }
881
882    /// Terminates the pre-warming transaction processing.
883    ///
884    /// Note: This does not terminate the task yet.
885    pub fn stop_prewarming_execution(&self) {
886        self.prewarm_handle.stop_prewarming_execution()
887    }
888
889    /// Terminates the entire caching task.
890    ///
891    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
892    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
893    /// path without cloning the expensive `BundleState`.
894    ///
895    /// Returns a sender for the channel that should be notified on block validation success.
896    pub fn terminate_caching(
897        &mut self,
898        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
899    ) -> Option<mpsc::Sender<()>> {
900        self.prewarm_handle.terminate_caching(execution_outcome)
901    }
902
903    /// Returns iterator yielding transactions from the stream.
904    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
905        self.transactions.iter().map(|(_, tx)| tx)
906    }
907
908    /// Returns a clone of the indexed transaction receiver.
909    pub fn clone_transaction_receiver(&self) -> IndexedTxReceiver<Tx, Err> {
910        self.transactions.clone()
911    }
912
913    /// Takes the hashed state receiver out of the handle for use with custom waiting logic
914    pub fn take_hashed_state_rx(&mut self) -> Option<mpsc::Receiver<HashedPostState>> {
915        self.state_root_handle.as_mut().map(|handle| handle.take_hashed_state_rx())
916    }
917}
918
919/// Access to the spawned [`PrewarmCacheTask`].
920///
921/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
922/// prewarm task without cloning the expensive `BundleState`.
923#[derive(Debug)]
924pub struct CacheTaskHandle<R> {
925    /// The shared cache the task operates with.
926    saved_cache: Option<SavedCache>,
927    /// Channel to the spawned prewarm task if any
928    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
929    /// Shared counter tracking the next transaction index to be executed by the main execution
930    /// loop. Prewarm workers skip transactions below this index.
931    executed_tx_index: Arc<AtomicUsize>,
932    /// Metrics for the execution cache.
933    cache_metrics: Option<CachedStateMetrics>,
934}
935
936impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
937    /// Terminates the pre-warming transaction processing.
938    ///
939    /// Note: This does not terminate the task yet.
940    pub fn stop_prewarming_execution(&self) {
941        self.to_prewarm_task
942            .as_ref()
943            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
944    }
945
946    /// Terminates the entire pre-warming task.
947    ///
948    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
949    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
950    #[must_use = "sender must be used and notified on block validation success"]
951    pub fn terminate_caching(
952        &mut self,
953        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
954    ) -> Option<mpsc::Sender<()>> {
955        if let Some(tx) = self.to_prewarm_task.take() {
956            let (valid_block_tx, valid_block_rx) = mpsc::channel();
957            let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
958            let _ = tx.send(event);
959
960            Some(valid_block_tx)
961        } else {
962            None
963        }
964    }
965}
966
967impl<R> Drop for CacheTaskHandle<R> {
968    fn drop(&mut self) {
969        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
970        if let Some(tx) = self.to_prewarm_task.take() {
971            let _ = tx.send(PrewarmTaskEvent::Terminate {
972                execution_outcome: None,
973                valid_block_rx: mpsc::channel().1,
974            });
975        }
976    }
977}
978
979/// EVM context required to execute a block.
980#[derive(Debug, Clone)]
981pub struct ExecutionEnv<Evm: ConfigureEvm> {
982    /// Evm environment.
983    pub evm_env: EvmEnvFor<Evm>,
984    /// Hash of the block being executed.
985    pub hash: B256,
986    /// Hash of the parent block.
987    pub parent_hash: B256,
988    /// State root of the parent block.
989    /// Used for sparse trie continuation: if the preserved trie's anchor matches this,
990    /// the trie can be reused directly.
991    pub parent_state_root: B256,
992    /// Number of transactions in the block.
993    /// Used to determine parallel worker count for prewarming.
994    pub transaction_count: usize,
995    /// Total gas used by all transactions in the block.
996    /// Used to adaptively select multiproof chunk size for optimal throughput.
997    pub gas_used: u64,
998    /// Withdrawals included in the block.
999    /// Used to generate prefetch targets for withdrawal addresses.
1000    pub withdrawals: Option<Vec<Withdrawal>>,
1001    /// Optional decoded BAL for the block.
1002    /// Used to validate and optimize execution.
1003    pub decoded_bal: Option<Arc<DecodedBal>>,
1004}
1005
1006impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
1007where
1008    EvmEnvFor<Evm>: Default,
1009{
1010    /// Creates a new [`ExecutionEnv`] with default values for testing.
1011    #[cfg(any(test, feature = "test-utils"))]
1012    pub fn test_default() -> Self {
1013        Self {
1014            evm_env: Default::default(),
1015            hash: Default::default(),
1016            parent_hash: Default::default(),
1017            parent_state_root: Default::default(),
1018            transaction_count: 0,
1019            gas_used: 0,
1020            withdrawals: None,
1021            decoded_bal: None,
1022        }
1023    }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028    use crate::tree::{
1029        payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1030        precompile_cache::PrecompileCacheMap,
1031        ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
1032    };
1033    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1034    use rand::Rng;
1035    use reth_chainspec::ChainSpec;
1036    use reth_db_common::init::init_genesis;
1037    use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
1038    use reth_evm::OnStateHook;
1039    use reth_evm_ethereum::EthEvmConfig;
1040    use reth_execution_cache::CachedStatus;
1041    use reth_primitives_traits::{Account, Recovered, StorageEntry};
1042    use reth_provider::{
1043        providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
1044        test_utils::create_test_provider_factory_with_chain_spec,
1045        ChainSpecProvider, HashingWriter,
1046    };
1047    use reth_revm::db::BundleState;
1048    use reth_testing_utils::generators;
1049    use reth_trie::{test_utils::state_root, HashedPostState};
1050    use reth_trie_db::ChangesetCache;
1051    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1052    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot, TransactionId};
1053    use std::sync::Arc;
1054
1055    fn make_saved_cache(hash: B256) -> SavedCache {
1056        let execution_cache = ExecutionCache::new(1_000);
1057        SavedCache::new(hash, execution_cache)
1058    }
1059
1060    #[test]
1061    fn execution_cache_allows_single_checkout() {
1062        let execution_cache = PayloadExecutionCache::default();
1063        let hash = B256::from([1u8; 32]);
1064
1065        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1066
1067        let first = execution_cache.get_cache_for(hash);
1068        assert!(first.is_some(), "expected initial checkout to succeed");
1069
1070        let second = execution_cache.get_cache_for(hash);
1071        assert!(second.is_none(), "second checkout should be blocked while guard is active");
1072
1073        drop(first);
1074
1075        let third = execution_cache.get_cache_for(hash);
1076        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1077    }
1078
1079    #[test]
1080    fn execution_cache_checkout_releases_on_drop() {
1081        let execution_cache = PayloadExecutionCache::default();
1082        let hash = B256::from([2u8; 32]);
1083
1084        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1085
1086        {
1087            let guard = execution_cache.get_cache_for(hash);
1088            assert!(guard.is_some(), "expected checkout to succeed");
1089            // Guard dropped at end of scope
1090        }
1091
1092        let retry = execution_cache.get_cache_for(hash);
1093        assert!(retry.is_some(), "checkout should succeed after guard drop");
1094    }
1095
1096    #[test]
1097    fn execution_cache_mismatch_parent_clears_and_returns() {
1098        let execution_cache = PayloadExecutionCache::default();
1099        let hash = B256::from([3u8; 32]);
1100
1101        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1102
1103        // When the parent hash doesn't match (fork block), the cache is cleared,
1104        // hash updated on the original, and clone returned for reuse
1105        let different_hash = B256::from([4u8; 32]);
1106        let cache = execution_cache.get_cache_for(different_hash);
1107        assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1108
1109        drop(cache);
1110
1111        // The stored cache now has the fork block's parent hash.
1112        // Canonical chain looking for original hash sees a mismatch → clears and reuses.
1113        let original = execution_cache.get_cache_for(hash);
1114        assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1115    }
1116
1117    #[test]
1118    fn execution_cache_update_after_release_succeeds() {
1119        let execution_cache = PayloadExecutionCache::default();
1120        let initial = B256::from([5u8; 32]);
1121
1122        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1123
1124        let guard =
1125            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1126
1127        drop(guard);
1128
1129        let updated = B256::from([6u8; 32]);
1130        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1131
1132        let new_checkout = execution_cache.get_cache_for(updated);
1133        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1134    }
1135
1136    #[test]
1137    fn on_inserted_executed_block_populates_cache() {
1138        let payload_processor = PayloadProcessor::new(
1139            reth_tasks::Runtime::test(),
1140            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1141            &TreeConfig::default(),
1142            PrecompileCacheMap::default(),
1143        );
1144
1145        let parent_hash = B256::from([1u8; 32]);
1146        let block_hash = B256::from([10u8; 32]);
1147        let block_with_parent = BlockWithParent {
1148            block: BlockNumHash { hash: block_hash, number: 1 },
1149            parent: parent_hash,
1150        };
1151        let bundle_state = BundleState::default();
1152
1153        // Cache should be empty initially
1154        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1155
1156        // Update cache with inserted block
1157        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1158
1159        // Cache should now exist for the block hash
1160        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1161        assert!(cached.is_some());
1162        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1163    }
1164
1165    #[test]
1166    fn on_inserted_executed_block_skips_on_parent_mismatch() {
1167        let payload_processor = PayloadProcessor::new(
1168            reth_tasks::Runtime::test(),
1169            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1170            &TreeConfig::default(),
1171            PrecompileCacheMap::default(),
1172        );
1173
1174        // Setup: populate cache with block 1
1175        let block1_hash = B256::from([1u8; 32]);
1176        payload_processor
1177            .execution_cache
1178            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1179
1180        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
1181        let wrong_parent = B256::from([99u8; 32]);
1182        let block3_hash = B256::from([3u8; 32]);
1183        let block_with_parent = BlockWithParent {
1184            block: BlockNumHash { hash: block3_hash, number: 3 },
1185            parent: wrong_parent,
1186        };
1187        let bundle_state = BundleState::default();
1188
1189        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1190
1191        // Cache should still be for block 1 (unchanged)
1192        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1193        assert!(cached.is_some(), "Original cache should be preserved");
1194
1195        // Cache for block 3 should not exist
1196        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1197        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1198    }
1199
1200    #[test]
1201    fn on_inserted_executed_block_does_not_mutate_checked_out_parent_cache() {
1202        let payload_processor = PayloadProcessor::new(
1203            reth_tasks::Runtime::test(),
1204            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1205            &TreeConfig::default(),
1206            PrecompileCacheMap::default(),
1207        );
1208
1209        let parent_hash = B256::from([1u8; 32]);
1210        payload_processor
1211            .execution_cache
1212            .update_with_guard(|slot| *slot = Some(make_saved_cache(parent_hash)));
1213
1214        // Checking out the cache bumps its `ExecutionCache` refcount, marking the slot as in-use.
1215        // The returned SavedCache shares the same underlying ExecutionCache Arc as the slot,
1216        // so any writes through the slot are observable here
1217        let checked_out = payload_processor
1218            .execution_cache
1219            .get_cache_for(parent_hash)
1220            .expect("expected parent cache checkout to succeed");
1221
1222        let polluted_address = Address::random();
1223        let bundle_state = BundleState::builder(2..=2)
1224            .state_present_account_info(
1225                polluted_address,
1226                AccountInfo {
1227                    balance: U256::from(1337),
1228                    nonce: 7,
1229                    code_hash: KECCAK_EMPTY,
1230                    code: None,
1231                    account_id: None,
1232                },
1233            )
1234            .build();
1235
1236        // Make parent match the cached slot so we bypass the parent-mismatch guard and exercise
1237        // the in-use guard specifically.
1238        let block_with_parent = BlockWithParent {
1239            block: BlockNumHash { hash: B256::from([2u8; 32]), number: 2 },
1240            parent: parent_hash,
1241        };
1242
1243        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1244
1245        // The closure runs only on a cache miss, so NotCached(None) means polluted_address was
1246        // absent and Cached(Some(_)) means it was written by on_inserted_executed_block.
1247        let account = checked_out
1248            .cache()
1249            .get_or_try_insert_account_with(polluted_address, || Ok::<_, ()>(None))
1250            .expect("cache read should succeed");
1251
1252        assert_eq!(
1253            account,
1254            CachedStatus::NotCached(None),
1255            "checked-out parent cache should not observe state from inserted local block"
1256        );
1257    }
1258
1259    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1260        let mut rng = generators::rng();
1261        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1262        let mut updates = Vec::with_capacity(updates_per_account);
1263
1264        for _ in 0..updates_per_account {
1265            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1266            let mut state_update = EvmState::default();
1267
1268            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1269
1270            for &address in selected_addresses {
1271                let mut storage = HashMap::default();
1272                if rng.random_bool(0.7) {
1273                    for _ in 0..rng.random_range(1..10) {
1274                        let slot = U256::from(rng.random::<u64>());
1275                        storage.insert(
1276                            slot,
1277                            EvmStorageSlot::new_changed(
1278                                U256::ZERO,
1279                                U256::from(rng.random::<u64>()),
1280                                TransactionId::ZERO,
1281                            ),
1282                        );
1283                    }
1284                }
1285
1286                let mut account = revm_state::Account::default();
1287                account.info = AccountInfo {
1288                    balance: U256::from(rng.random::<u64>()),
1289                    nonce: rng.random::<u64>(),
1290                    code_hash: KECCAK_EMPTY,
1291                    code: Some(Default::default()),
1292                    account_id: None,
1293                };
1294                account.storage = storage;
1295                account.status = AccountStatus::Touched;
1296                account.transaction_id = TransactionId::ZERO;
1297
1298                state_update.insert(address, account);
1299            }
1300
1301            updates.push(state_update);
1302        }
1303
1304        updates
1305    }
1306
1307    #[test]
1308    fn test_state_root() {
1309        reth_tracing::init_test_tracing();
1310
1311        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1312        let genesis_hash = init_genesis(&factory).unwrap();
1313
1314        let state_updates = create_mock_state_updates(10, 10);
1315        let mut hashed_state = HashedPostState::default();
1316        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1317            HashMap::default();
1318
1319        {
1320            let provider_rw = factory.provider_rw().expect("failed to get provider");
1321
1322            for update in &state_updates {
1323                let account_updates = update.iter().map(|(address, account)| {
1324                    (*address, Some(Account::from_revm_account(account)))
1325                });
1326                provider_rw
1327                    .insert_account_for_hashing(account_updates)
1328                    .expect("failed to insert accounts");
1329
1330                let storage_updates = update.iter().map(|(address, account)| {
1331                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1332                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1333                    });
1334                    (*address, storage_entries)
1335                });
1336                provider_rw
1337                    .insert_storage_for_hashing(storage_updates)
1338                    .expect("failed to insert storage");
1339            }
1340            provider_rw.commit().expect("failed to commit changes");
1341        }
1342
1343        for update in &state_updates {
1344            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1345
1346            for (address, account) in update {
1347                let storage: HashMap<B256, U256> = account
1348                    .storage
1349                    .iter()
1350                    .map(|(k, v)| (B256::from(*k), v.present_value))
1351                    .collect();
1352
1353                let entry = accumulated_state.entry(*address).or_default();
1354                entry.0 = Account::from_revm_account(account);
1355                entry.1.extend(storage);
1356            }
1357        }
1358
1359        let mut payload_processor = PayloadProcessor::new(
1360            reth_tasks::Runtime::test(),
1361            EthEvmConfig::new(factory.chain_spec()),
1362            &TreeConfig::default(),
1363            PrecompileCacheMap::default(),
1364        );
1365
1366        let provider_factory = BlockchainProvider::new(factory).unwrap();
1367
1368        let mut handle = payload_processor.spawn(
1369            ExecutionEnv::test_default(),
1370            (
1371                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1372                std::convert::identity,
1373            ),
1374            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1375            OverlayStateProviderFactory::new(
1376                provider_factory,
1377                OverlayBuilder::<EthPrimitives>::new(genesis_hash, ChangesetCache::new()),
1378            ),
1379            &TreeConfig::default(),
1380            false,
1381        );
1382
1383        let mut state_hook = handle.state_hook().expect("state hook is None");
1384
1385        for update in state_updates {
1386            state_hook.on_state(update);
1387        }
1388        drop(state_hook);
1389
1390        let root_from_task = handle.state_root().expect("task failed").state_root;
1391        let root_from_regular = state_root(accumulated_state);
1392
1393        assert_eq!(
1394            root_from_task, root_from_regular,
1395            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1396        );
1397    }
1398
1399    /// Tests the full prewarm lifecycle for a fork block:
1400    ///
1401    /// 1. Cache is at canonical block 4.
1402    /// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
1403    ///    `PrewarmCacheTask` does when it receives a `SavedCache`.
1404    /// 3. Prewarm populates the shared cache with fork-specific state.
1405    /// 4. While the prewarm clone is alive, the cache is unavailable (`usage_count` > 1).
1406    /// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
1407    /// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
1408    ///    data.
1409    #[test]
1410    fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1411        let execution_cache = PayloadExecutionCache::default();
1412
1413        // Canonical chain at block 4.
1414        let block4_hash = B256::from([4u8; 32]);
1415        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1416
1417        // Fork block arrives with parent = block 2. Prewarm task checks out the cache.
1418        // This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
1419        let fork_parent = B256::from([2u8; 32]);
1420        let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1421        assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1422        let prewarm_cache = prewarm_cache.unwrap();
1423        assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1424
1425        // Prewarm populates cache with fork-specific state (ancestor data for block 2).
1426        // Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
1427        let fork_addr = Address::from([0xBB; 20]);
1428        let fork_key = B256::from([0xCC; 32]);
1429        prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1430
1431        // While prewarm holds the clone, the cache handle count > 1 so the cache is in use.
1432        let during_prewarm = execution_cache.get_cache_for(block4_hash);
1433        assert!(
1434            during_prewarm.is_none(),
1435            "cache must be unavailable while prewarm holds a reference"
1436        );
1437
1438        // Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
1439        drop(prewarm_cache);
1440
1441        // Canonical block 5 arrives (parent = block 4).
1442        // Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
1443        // clears the stale fork data, and returns a cache with hash = block4_hash.
1444        let block5_cache = execution_cache.get_cache_for(block4_hash);
1445        assert!(
1446            block5_cache.is_some(),
1447            "canonical chain must get cache after fork prewarm is dropped"
1448        );
1449        assert_eq!(
1450            block5_cache.as_ref().unwrap().executed_block_hash(),
1451            block4_hash,
1452            "cache must carry the canonical parent hash, not the fork parent"
1453        );
1454    }
1455}