Skip to main content

reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
6    payload_processor::{
7        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
8        sparse_trie::StateRootComputeOutcome,
9    },
10    sparse_trie::SparseTrieCacheTask,
11    CacheWaitDurations, StateProviderBuilder, TreeConfig, WaitForCaches,
12};
13use alloy_eip7928::BlockAccessList;
14use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
15use alloy_evm::block::StateChangeSource;
16use alloy_primitives::B256;
17use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
18use metrics::{Counter, Histogram};
19use multiproof::*;
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use rayon::prelude::*;
23use reth_evm::{
24    block::ExecutableTxParts,
25    execute::{ExecutableTxFor, WithTxEnv},
26    ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
27    SpecFor, TxEnvFor,
28};
29use reth_metrics::Metrics;
30use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
31use reth_provider::{
32    BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
33};
34use reth_revm::{db::BundleState, state::EvmState};
35use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
36use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
37use reth_trie_parallel::{
38    proof_task::{ProofTaskCtx, ProofWorkerHandle},
39    root::ParallelStateRootError,
40};
41use reth_trie_sparse::{
42    ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
43};
44use std::{
45    ops::Not,
46    sync::{
47        atomic::{AtomicBool, AtomicUsize},
48        mpsc::{self, channel},
49        Arc,
50    },
51    time::Duration,
52};
53use tracing::{debug, debug_span, instrument, warn, Span};
54
55pub mod bal;
56pub mod multiproof;
57mod preserved_sparse_trie;
58pub mod prewarm;
59pub mod receipt_root_task;
60pub mod sparse_trie;
61
62use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
63
64/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
65///
66/// These values were determined by performing benchmarks using gradually increasing values to judge
67/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
68/// would deteriorate to the point where PST might as well not be used.
69pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
70    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
71
72/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
73/// nodes in allocated sparse tries.
74///
75/// Node maps have a key of `Nibbles` and value of `SparseNode`.
76/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
77///
78/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
79/// 120MB.
80pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
81
82/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
83/// in allocated sparse tries.
84///
85/// There are storage and account values, the largest of the two being account values, which are
86/// essentially `TrieAccount`s.
87///
88/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
89/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
90///
91/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
92/// 144MB.
93pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
94
95/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
96/// prewarm workers exceeds the execution time saved.
97pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
98
99/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
100type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
101    WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
102    <I as ExecutableTxTuple>::Error,
103    <N as NodePrimitives>::Receipt,
104>;
105
106/// Entrypoint for executing the payload.
107#[derive(Debug)]
108pub struct PayloadProcessor<Evm>
109where
110    Evm: ConfigureEvm,
111{
112    /// The executor used by to spawn tasks.
113    executor: Runtime,
114    /// The most recent cache used for execution.
115    execution_cache: PayloadExecutionCache,
116    /// Metrics for trie operations
117    trie_metrics: MultiProofTaskMetrics,
118    /// Cross-block cache size in bytes.
119    cross_block_cache_size: usize,
120    /// Whether transactions should not be executed on prewarming task.
121    disable_transaction_prewarming: bool,
122    /// Whether state cache should be disable
123    disable_state_cache: bool,
124    /// Determines how to configure the evm for execution.
125    evm_config: Evm,
126    /// Whether precompile cache should be disabled.
127    precompile_cache_disabled: bool,
128    /// Precompile cache map.
129    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
130    /// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
131    /// re-use allocated memory. Stored with the block hash it was computed for to enable trie
132    /// preservation across sequential payload validations.
133    sparse_state_trie: SharedPreservedSparseTrie,
134    /// Sparse trie prune depth.
135    sparse_trie_prune_depth: usize,
136    /// Maximum storage tries to retain after pruning.
137    sparse_trie_max_storage_tries: usize,
138    /// Whether sparse trie cache pruning is fully disabled.
139    disable_sparse_trie_cache_pruning: bool,
140    /// Whether to disable cache metrics recording.
141    disable_cache_metrics: bool,
142}
143
144impl<N, Evm> PayloadProcessor<Evm>
145where
146    N: NodePrimitives,
147    Evm: ConfigureEvm<Primitives = N>,
148{
149    /// Returns a reference to the workload executor driving payload tasks.
150    pub const fn executor(&self) -> &Runtime {
151        &self.executor
152    }
153
154    /// Creates a new payload processor.
155    pub fn new(
156        executor: Runtime,
157        evm_config: Evm,
158        config: &TreeConfig,
159        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
160    ) -> Self {
161        Self {
162            executor,
163            execution_cache: Default::default(),
164            trie_metrics: Default::default(),
165            cross_block_cache_size: config.cross_block_cache_size(),
166            disable_transaction_prewarming: config.disable_prewarming(),
167            evm_config,
168            disable_state_cache: config.disable_state_cache(),
169            precompile_cache_disabled: config.precompile_cache_disabled(),
170            precompile_cache_map,
171            sparse_state_trie: SharedPreservedSparseTrie::default(),
172            sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
173            sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
174            disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
175            disable_cache_metrics: config.disable_cache_metrics(),
176        }
177    }
178}
179
180impl<Evm> WaitForCaches for PayloadProcessor<Evm>
181where
182    Evm: ConfigureEvm,
183{
184    fn wait_for_caches(&self) -> CacheWaitDurations {
185        debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
186
187        // Wait for both caches in parallel using std threads
188        let execution_cache = self.execution_cache.clone();
189        let sparse_trie = self.sparse_state_trie.clone();
190
191        // Use channels and spawn_blocking instead of std::thread::spawn
192        let (execution_tx, execution_rx) = std::sync::mpsc::channel();
193        let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
194
195        self.executor.spawn_blocking_named("wait-exec-cache", move || {
196            let _ = execution_tx.send(execution_cache.wait_for_availability());
197        });
198        self.executor.spawn_blocking_named("wait-sparse-tri", move || {
199            let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
200        });
201
202        let execution_cache_duration =
203            execution_rx.recv().expect("execution cache wait task failed to send result");
204        let sparse_trie_duration =
205            sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
206
207        debug!(
208            target: "engine::tree::payload_processor",
209            ?execution_cache_duration,
210            ?sparse_trie_duration,
211            "Execution cache and sparse trie locks acquired"
212        );
213        CacheWaitDurations {
214            execution_cache: execution_cache_duration,
215            sparse_trie: sparse_trie_duration,
216        }
217    }
218}
219
220impl<N, Evm> PayloadProcessor<Evm>
221where
222    N: NodePrimitives,
223    Evm: ConfigureEvm<Primitives = N> + 'static,
224{
225    /// Spawns all background tasks and returns a handle connected to the tasks.
226    ///
227    /// - Transaction prewarming task
228    /// - State root task
229    /// - Sparse trie task
230    ///
231    /// # Transaction prewarming task
232    ///
233    /// Responsible for feeding state updates to the multi proof task.
234    ///
235    /// This task runs until:
236    ///  - externally cancelled (e.g. sequential block execution is complete)
237    ///
238    /// ## Multi proof task
239    ///
240    /// Responsible for preparing sparse trie messages for the sparse trie task.
241    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
242    /// output back to this task.
243    ///
244    /// Receives updates from sequential execution.
245    /// This task runs until it receives a shutdown signal, which should be after the block
246    /// was fully executed.
247    ///
248    /// ## Sparse trie task
249    ///
250    /// Responsible for calculating the state root.
251    ///
252    /// This task runs until there are no further updates to process.
253    ///
254    ///
255    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
256    /// canceling)
257    #[instrument(
258        level = "debug",
259        target = "engine::tree::payload_processor",
260        name = "payload processor",
261        skip_all
262    )]
263    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
264        &mut self,
265        env: ExecutionEnv<Evm>,
266        transactions: I,
267        provider_builder: StateProviderBuilder<N, P>,
268        multiproof_provider_factory: F,
269        config: &TreeConfig,
270        bal: Option<Arc<BlockAccessList>>,
271    ) -> IteratorPayloadHandle<Evm, I, N>
272    where
273        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
274        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
275            + Clone
276            + Send
277            + Sync
278            + 'static,
279    {
280        // start preparing transactions immediately
281        let (prewarm_rx, execution_rx) =
282            self.spawn_tx_iterator(transactions, env.transaction_count);
283
284        let span = Span::current();
285
286        let state_root_handle = self.spawn_state_root(multiproof_provider_factory, &env, config);
287        let prewarm_handle = self.spawn_caching_with(
288            env,
289            prewarm_rx,
290            provider_builder,
291            Some(state_root_handle.to_multi_proof.clone()),
292            bal,
293        );
294
295        PayloadHandle {
296            state_root_handle: Some(state_root_handle),
297            prewarm_handle,
298            transactions: execution_rx,
299            _span: span,
300        }
301    }
302
303    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
304    ///
305    /// Returns a [`PayloadHandle`] to communicate with the task.
306    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
307    pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
308        &self,
309        env: ExecutionEnv<Evm>,
310        transactions: I,
311        provider_builder: StateProviderBuilder<N, P>,
312        bal: Option<Arc<BlockAccessList>>,
313    ) -> IteratorPayloadHandle<Evm, I, N>
314    where
315        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
316    {
317        let (prewarm_rx, execution_rx) =
318            self.spawn_tx_iterator(transactions, env.transaction_count);
319        let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
320        PayloadHandle {
321            state_root_handle: None,
322            prewarm_handle,
323            transactions: execution_rx,
324            _span: Span::current(),
325        }
326    }
327
328    /// Spawns state root computation pipeline (multiproof + sparse trie tasks).
329    ///
330    /// The returned [`StateRootHandle`] provides:
331    /// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
332    ///   execution.
333    /// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
334    ///   state root.
335    ///
336    /// The state hook **must** be dropped after execution to signal the end of state updates.
337    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
338    pub fn spawn_state_root<F>(
339        &mut self,
340        multiproof_provider_factory: F,
341        env: &ExecutionEnv<Evm>,
342        config: &TreeConfig,
343    ) -> StateRootHandle
344    where
345        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
346            + Clone
347            + Send
348            + Sync
349            + 'static,
350    {
351        let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
352
353        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
354        let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
355        let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
356
357        let (state_root_tx, state_root_rx) = channel();
358
359        self.spawn_sparse_trie_task(
360            proof_handle,
361            state_root_tx,
362            from_multi_proof,
363            env.parent_state_root,
364            config.multiproof_chunk_size(),
365        );
366
367        StateRootHandle::new(to_multi_proof, state_root_rx)
368    }
369
370    /// Transaction count threshold below which proof workers are halved, since fewer transactions
371    /// produce fewer state changes and most workers would be idle overhead.
372    const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
373
374    /// Transaction count threshold below which sequential conversion is used.
375    ///
376    /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
377    /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
378    /// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
379    /// blocks.
380    const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
381
382    /// Number of leading transactions to convert sequentially before entering the rayon
383    /// parallel path.
384    ///
385    /// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
386    /// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
387    /// a small head sequentially and sending it immediately, execution can start without
388    /// waiting for rayon scheduling.
389    const PARALLEL_PREFETCH_COUNT: usize = 4;
390
391    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
392    ///
393    /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
394    /// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
395    /// iteration with [`ForEachOrdered`] to convert transactions in parallel while streaming
396    /// results to execution in the original transaction order.
397    #[expect(clippy::type_complexity)]
398    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
399    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
400        &self,
401        transactions: I,
402        transaction_count: usize,
403    ) -> (
404        mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
405        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
406    ) {
407        let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
408        let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
409
410        if transaction_count == 0 {
411            // Empty block — nothing to do.
412        } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
413            // Sequential path for small blocks — avoids rayon work-stealing setup and
414            // channel-based reorder overhead when it costs more than sequential conversion.
415            debug!(
416                target: "engine::tree::payload_processor",
417                transaction_count,
418                "using sequential sig recovery for small block"
419            );
420            self.executor.spawn_blocking_named("tx-iterator", move || {
421                let (transactions, convert) = transactions.into_parts();
422                convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
423            });
424        } else {
425            // Parallel path — recover signatures in parallel on rayon, stream results
426            // to execution in order via `for_each_ordered`.
427            //
428            // To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
429            // few transactions are recovered sequentially and sent immediately before
430            // entering the parallel iterator for the remainder.
431            let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
432            let executor = self.executor.clone();
433            self.executor.spawn_blocking_named("tx-iterator", move || {
434                let (transactions, convert) = transactions.into_parts();
435                let mut all: Vec<_> = transactions.into_iter().collect();
436                let rest = all.split_off(prefetch.min(all.len()));
437
438                // Convert the first few transactions sequentially so execution can
439                // start immediately without waiting for rayon work-stealing.
440                convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
441
442                // Convert the remaining transactions in parallel.
443                rest.into_par_iter()
444                    .enumerate()
445                    .map(|(i, tx)| {
446                        let idx = i + prefetch;
447                        let tx = convert.convert(tx);
448                        (idx, tx)
449                    })
450                    .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
451                        let tx = tx.map(|tx| {
452                            let (tx_env, tx) = tx.into_parts();
453                            let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
454                            let _ = prewarm_tx.send((idx, tx.clone()));
455                            tx
456                        });
457                        let _ = execute_tx.send(tx);
458                        debug!(target: "engine::tree::payload_processor", idx, "yielded transaction");
459                    });
460            });
461        }
462
463        (prewarm_rx, execute_rx)
464    }
465
466    /// Spawn prewarming optionally wired to the multiproof task for target updates.
467    #[instrument(
468        level = "debug",
469        target = "engine::tree::payload_processor",
470        skip_all,
471        fields(bal=%bal.is_some())
472    )]
473    fn spawn_caching_with<P>(
474        &self,
475        env: ExecutionEnv<Evm>,
476        transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
477        provider_builder: StateProviderBuilder<N, P>,
478        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
479        bal: Option<Arc<BlockAccessList>>,
480    ) -> CacheTaskHandle<N::Receipt>
481    where
482        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
483    {
484        let skip_prewarm =
485            self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
486
487        let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
488
489        let executed_tx_index = Arc::new(AtomicUsize::new(0));
490
491        // configure prewarming
492        let prewarm_ctx = PrewarmContext {
493            env,
494            evm_config: self.evm_config.clone(),
495            saved_cache: saved_cache.clone(),
496            provider: provider_builder,
497            metrics: PrewarmMetrics::default(),
498            terminate_execution: Arc::new(AtomicBool::new(false)),
499            executed_tx_index: Arc::clone(&executed_tx_index),
500            precompile_cache_disabled: self.precompile_cache_disabled,
501            precompile_cache_map: self.precompile_cache_map.clone(),
502        };
503
504        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
505            self.executor.clone(),
506            self.execution_cache.clone(),
507            prewarm_ctx,
508            to_multi_proof,
509        );
510
511        {
512            let to_prewarm_task = to_prewarm_task.clone();
513            self.executor.spawn_blocking_named("prewarm", move || {
514                let mode = if skip_prewarm {
515                    PrewarmMode::Skipped
516                } else if let Some(bal) = bal {
517                    PrewarmMode::BlockAccessList(bal)
518                } else {
519                    PrewarmMode::Transactions(transactions)
520                };
521                prewarm_task.run(mode, to_prewarm_task);
522            });
523        }
524
525        CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index }
526    }
527
528    /// Returns the cache for the given parent hash.
529    ///
530    /// If the given hash is different then what is recently cached, then this will create a new
531    /// instance.
532    #[instrument(level = "debug", target = "engine::caching", skip(self))]
533    fn cache_for(&self, parent_hash: B256) -> SavedCache {
534        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
535            debug!("reusing execution cache");
536            cache
537        } else {
538            debug!("creating new execution cache on cache miss");
539            let start = Instant::now();
540            let cache = ExecutionCache::new(self.cross_block_cache_size);
541            let metrics = CachedStateMetrics::zeroed();
542            metrics.record_cache_creation(start.elapsed());
543            SavedCache::new(parent_hash, cache, metrics)
544                .with_disable_cache_metrics(self.disable_cache_metrics)
545        }
546    }
547
548    /// Spawns the [`SparseTrieCacheTask`] for this payload processor.
549    ///
550    /// The trie is preserved when the new payload is a child of the previous one.
551    fn spawn_sparse_trie_task(
552        &self,
553        proof_worker_handle: ProofWorkerHandle,
554        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
555        from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
556        parent_state_root: B256,
557        chunk_size: usize,
558    ) {
559        let preserved_sparse_trie = self.sparse_state_trie.clone();
560        let trie_metrics = self.trie_metrics.clone();
561        let prune_depth = self.sparse_trie_prune_depth;
562        let max_storage_tries = self.sparse_trie_max_storage_tries;
563        let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
564        let executor = self.executor.clone();
565
566        let parent_span = Span::current();
567        self.executor.spawn_blocking_named("sparse-trie", move || {
568            reth_tasks::once!(increase_thread_priority);
569
570            let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
571                .entered();
572
573            // Reuse a stored SparseStateTrie if available, applying continuation logic.
574            // If this payload's parent state root matches the preserved trie's anchor,
575            // we can reuse the pruned trie structure. Otherwise, we clear the trie but
576            // keep allocations.
577            let start = Instant::now();
578            let preserved = preserved_sparse_trie.take();
579            trie_metrics
580                .sparse_trie_cache_wait_duration_histogram
581                .record(start.elapsed().as_secs_f64());
582
583            let sparse_state_trie = preserved
584                .map(|preserved| preserved.into_trie_for(parent_state_root))
585                .unwrap_or_else(|| {
586                    debug!(
587                        target: "engine::tree::payload_processor",
588                        "Creating new sparse trie - no preserved trie available"
589                    );
590                    let default_trie = RevealableSparseTrie::blind_from(
591                        ParallelSparseTrie::default().with_parallelism_thresholds(
592                            PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
593                        ),
594                    );
595                    SparseStateTrie::new()
596                        .with_accounts_trie(default_trie.clone())
597                        .with_default_storage_trie(default_trie)
598                        .with_updates(true)
599                });
600
601            let mut task = SparseTrieCacheTask::new_with_trie(
602                &executor,
603                from_multi_proof,
604                proof_worker_handle,
605                trie_metrics.clone(),
606                sparse_state_trie,
607                chunk_size,
608            );
609
610            let result = task.run();
611
612            // Acquire the guard before sending the result to prevent a race condition:
613            // Without this, the next block could start after send() but before store(),
614            // causing take() to return None and forcing it to create a new empty trie
615            // instead of reusing the preserved one. Holding the guard ensures the next
616            // block's take() blocks until we've stored the trie for reuse.
617            let mut guard = preserved_sparse_trie.lock();
618
619            let task_result = result.as_ref().ok().cloned();
620            // Send state root computation result - next block may start but will block on take()
621            if state_root_tx.send(result).is_err() {
622                // Receiver dropped - payload was likely invalid or cancelled.
623                // Clear the trie instead of preserving potentially invalid state.
624                debug!(
625                    target: "engine::tree::payload_processor",
626                    "State root receiver dropped, clearing trie"
627                );
628                let (trie, deferred) = task.into_cleared_trie(
629                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
630                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
631                );
632                guard.store(PreservedSparseTrie::cleared(trie));
633                // Drop guard before deferred to release lock before expensive deallocations
634                drop(guard);
635                drop(deferred);
636                return;
637            }
638
639            // Only preserve the trie as anchored if computation succeeded.
640            // A failed computation may have left the trie in a partially updated state.
641            let _enter =
642                debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
643            let deferred = if let Some(result) = task_result {
644                let start = Instant::now();
645                let (trie, deferred) = task.into_trie_for_reuse(
646                    prune_depth,
647                    max_storage_tries,
648                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
649                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
650                    disable_cache_pruning,
651                    &result.trie_updates,
652                );
653                trie_metrics
654                    .into_trie_for_reuse_duration_histogram
655                    .record(start.elapsed().as_secs_f64());
656                trie_metrics
657                    .sparse_trie_retained_memory_bytes
658                    .set(trie.memory_size() as f64);
659                trie_metrics
660                    .sparse_trie_retained_storage_tries
661                    .set(trie.retained_storage_tries_count() as f64);
662                guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
663                deferred
664            } else {
665                debug!(
666                    target: "engine::tree::payload_processor",
667                    "State root computation failed, clearing trie"
668                );
669                let (trie, deferred) = task.into_cleared_trie(
670                    SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
671                    SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
672                );
673                guard.store(PreservedSparseTrie::cleared(trie));
674                deferred
675            };
676            // Drop guard before deferred to release lock before expensive deallocations
677            drop(guard);
678            drop(deferred);
679        });
680    }
681
682    /// Updates the execution cache with the post-execution state from an inserted block.
683    ///
684    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
685    /// to ensure the cache remains warm for subsequent block execution.
686    ///
687    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
688    /// hitting the database, maintaining performance consistency.
689    pub fn on_inserted_executed_block(
690        &self,
691        block_with_parent: BlockWithParent,
692        bundle_state: &BundleState,
693    ) {
694        let disable_cache_metrics = self.disable_cache_metrics;
695        self.execution_cache.update_with_guard(|cached| {
696            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
697                debug!(
698                    target: "engine::caching",
699                    parent_hash = %block_with_parent.parent,
700                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
701                );
702                return
703            }
704
705            // Take existing cache (if any) or create fresh caches
706            let (caches, cache_metrics, _) = match cached.take() {
707                Some(existing) => existing.split(),
708                None => (
709                    ExecutionCache::new(self.cross_block_cache_size),
710                    CachedStateMetrics::zeroed(),
711                    false,
712                ),
713            };
714
715            // Insert the block's bundle state into cache
716            let new_cache =
717                SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
718                    .with_disable_cache_metrics(disable_cache_metrics);
719            if new_cache.cache().insert_state(bundle_state).is_err() {
720                *cached = None;
721                debug!(target: "engine::caching", "cleared execution cache on update error");
722                return
723            }
724            new_cache.update_metrics();
725
726            // Replace with the updated cache
727            *cached = Some(new_cache);
728            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
729        });
730    }
731}
732
733/// Converts transactions sequentially and sends them to the prewarm and execute channels.
734fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
735    iter: impl Iterator<Item = RawTx>,
736    convert: &C,
737    prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
738    execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
739) where
740    Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
741    TxEnv: Clone,
742    C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
743{
744    for (idx, raw_tx) in iter.enumerate() {
745        let tx = convert.convert(raw_tx);
746        let tx = tx.map(|tx| {
747            let (tx_env, tx) = tx.into_parts();
748            WithTxEnv { tx_env, tx: Arc::new(tx) }
749        });
750        if let Ok(tx) = &tx {
751            let _ = prewarm_tx.send((idx, tx.clone()));
752        }
753        let _ = execute_tx.send(tx);
754        debug!(target: "engine::tree::payload_processor", idx, "yielded transaction");
755    }
756}
757
758/// Handle to a background state root computation task.
759///
760/// Unlike [`PayloadHandle`], this does not include transaction iteration or cache prewarming.
761/// It only provides access to the state root computation via [`Self::state_hook`] and
762/// [`Self::state_root`].
763///
764/// Created by [`PayloadProcessor::spawn_state_root`].
765#[derive(Debug)]
766pub struct StateRootHandle {
767    /// Channel for evm state updates to the multiproof pipeline.
768    to_multi_proof: CrossbeamSender<MultiProofMessage>,
769    /// Receiver for the computed state root.
770    state_root_rx: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
771}
772
773impl StateRootHandle {
774    /// Creates a new state root handle.
775    pub const fn new(
776        to_multi_proof: CrossbeamSender<MultiProofMessage>,
777        state_root_rx: mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>,
778    ) -> Self {
779        Self { to_multi_proof, state_root_rx: Some(state_root_rx) }
780    }
781
782    /// Returns a state hook that streams state updates to the background state root task.
783    ///
784    /// The hook must be dropped after execution completes to signal the end of state updates.
785    pub fn state_hook(&self) -> impl OnStateHook {
786        let to_multi_proof = StateHookSender::new(self.to_multi_proof.clone());
787
788        move |source: StateChangeSource, state: &EvmState| {
789            let _ =
790                to_multi_proof.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
791        }
792    }
793
794    /// Awaits the state root computation result.
795    ///
796    /// # Panics
797    ///
798    /// If called more than once.
799    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
800        self.state_root_rx
801            .take()
802            .expect("state_root already taken")
803            .recv()
804            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
805    }
806
807    /// Takes the state root receiver for use with custom waiting logic (e.g., timeouts).
808    ///
809    /// # Panics
810    ///
811    /// If called more than once.
812    pub const fn take_state_root_rx(
813        &mut self,
814    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
815        self.state_root_rx.take().expect("state_root already taken")
816    }
817}
818
819/// Handle to all the spawned tasks.
820///
821/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
822/// caching task without cloning the expensive `BundleState`.
823#[derive(Debug)]
824pub struct PayloadHandle<Tx, Err, R> {
825    /// Handle to the background state root computation, if spawned.
826    state_root_handle: Option<StateRootHandle>,
827    // must include the receiver of the state root wired to the sparse trie
828    prewarm_handle: CacheTaskHandle<R>,
829    /// Stream of block transactions
830    transactions: mpsc::Receiver<Result<Tx, Err>>,
831    /// Span for tracing
832    _span: Span,
833}
834
835impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
836    /// Awaits the state root
837    ///
838    /// # Panics
839    ///
840    /// If payload processing was started without background tasks.
841    #[instrument(
842        level = "debug",
843        target = "engine::tree::payload_processor",
844        name = "await_state_root",
845        skip_all
846    )]
847    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
848        self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
849    }
850
851    /// Takes the state root receiver out of the handle for use with custom waiting logic
852    /// (e.g., timeout-based waiting).
853    ///
854    /// # Panics
855    ///
856    /// If payload processing was started without background tasks.
857    pub const fn take_state_root_rx(
858        &mut self,
859    ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
860        self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
861    }
862
863    /// Returns a state hook to be used to send state updates to this task.
864    ///
865    /// If a multiproof task is spawned the hook will notify it about new states.
866    pub fn state_hook(&self) -> Option<impl OnStateHook> {
867        self.state_root_handle.as_ref().map(|handle| handle.state_hook())
868    }
869
870    /// Returns a clone of the caches used by prewarming
871    pub fn caches(&self) -> Option<ExecutionCache> {
872        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
873    }
874
875    /// Returns a clone of the cache metrics used by prewarming
876    pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
877        self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
878    }
879
880    /// Returns a reference to the shared executed transaction index counter.
881    ///
882    /// The main execution loop should store `index + 1` after executing each transaction so that
883    /// prewarm workers can skip transactions that have already been processed.
884    pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
885        &self.prewarm_handle.executed_tx_index
886    }
887
888    /// Terminates the pre-warming transaction processing.
889    ///
890    /// Note: This does not terminate the task yet.
891    pub fn stop_prewarming_execution(&self) {
892        self.prewarm_handle.stop_prewarming_execution()
893    }
894
895    /// Terminates the entire caching task.
896    ///
897    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
898    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
899    /// path without cloning the expensive `BundleState`.
900    ///
901    /// Returns a sender for the channel that should be notified on block validation success.
902    pub fn terminate_caching(
903        &mut self,
904        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
905    ) -> Option<mpsc::Sender<()>> {
906        self.prewarm_handle.terminate_caching(execution_outcome)
907    }
908
909    /// Returns iterator yielding transactions from the stream.
910    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
911        self.transactions.iter()
912    }
913}
914
915/// Access to the spawned [`PrewarmCacheTask`].
916///
917/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
918/// prewarm task without cloning the expensive `BundleState`.
919#[derive(Debug)]
920pub struct CacheTaskHandle<R> {
921    /// The shared cache the task operates with.
922    saved_cache: Option<SavedCache>,
923    /// Channel to the spawned prewarm task if any
924    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
925    /// Shared counter tracking the next transaction index to be executed by the main execution
926    /// loop. Prewarm workers skip transactions below this index.
927    executed_tx_index: Arc<AtomicUsize>,
928}
929
930impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
931    /// Terminates the pre-warming transaction processing.
932    ///
933    /// Note: This does not terminate the task yet.
934    pub fn stop_prewarming_execution(&self) {
935        self.to_prewarm_task
936            .as_ref()
937            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
938    }
939
940    /// Terminates the entire pre-warming task.
941    ///
942    /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
943    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
944    #[must_use = "sender must be used and notified on block validation success"]
945    pub fn terminate_caching(
946        &mut self,
947        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
948    ) -> Option<mpsc::Sender<()>> {
949        if let Some(tx) = self.to_prewarm_task.take() {
950            let (valid_block_tx, valid_block_rx) = mpsc::channel();
951            let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
952            let _ = tx.send(event);
953
954            Some(valid_block_tx)
955        } else {
956            None
957        }
958    }
959}
960
961impl<R> Drop for CacheTaskHandle<R> {
962    fn drop(&mut self) {
963        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
964        if let Some(tx) = self.to_prewarm_task.take() {
965            let _ = tx.send(PrewarmTaskEvent::Terminate {
966                execution_outcome: None,
967                valid_block_rx: mpsc::channel().1,
968            });
969        }
970    }
971}
972
973/// Shared access to most recently used cache.
974///
975/// This cache is intended to used for processing the payload in the following manner:
976///  - Get Cache if the payload's parent block matches the parent block
977///  - Update cache upon successful payload execution
978///
979/// This process assumes that payloads are received sequentially.
980///
981/// ## Cache Safety
982///
983/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
984/// (such as prewarming tasks) must be terminated before calling
985/// [`PayloadExecutionCache::update_with_guard`], otherwise the cache may be corrupted or cleared.
986///
987/// ## Cache vs Prewarming Distinction
988///
989/// **[`PayloadExecutionCache`]**:
990/// - Stores parent block's execution state after completion
991/// - Used to fetch parent data for next block's execution
992/// - Must be exclusively accessed during save operations
993///
994/// **[`PrewarmCacheTask`]**:
995/// - Speculatively loads accounts/storage that might be used in transaction execution
996/// - Prepares data for state root proof computation
997/// - Runs concurrently but must not interfere with cache saves
998#[derive(Clone, Debug, Default)]
999pub struct PayloadExecutionCache {
1000    /// Guarded cloneable cache identified by a block hash.
1001    inner: Arc<RwLock<Option<SavedCache>>>,
1002    /// Metrics for cache operations.
1003    metrics: ExecutionCacheMetrics,
1004}
1005
1006impl PayloadExecutionCache {
1007    /// Returns the cache for `parent_hash` if it's available for use.
1008    ///
1009    /// A cache is considered available when:
1010    /// - It exists and matches the requested parent hash
1011    /// - No other tasks are currently using it (checked via Arc reference count)
1012    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
1013    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
1014        let start = Instant::now();
1015        let cache = self.inner.read();
1016
1017        let elapsed = start.elapsed();
1018        self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
1019        if elapsed.as_millis() > 5 {
1020            warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
1021        }
1022
1023        if let Some(c) = cache.as_ref() {
1024            let cached_hash = c.executed_block_hash();
1025            // Check that the cache hash matches the parent hash of the current block. It won't
1026            // match in case it's a fork block.
1027            let hash_matches = cached_hash == parent_hash;
1028            // Check `is_available()` to ensure no other tasks (e.g., prewarming) currently hold
1029            // a reference to this cache. We can only reuse it when we have exclusive access.
1030            let available = c.is_available();
1031            let usage_count = c.usage_count();
1032
1033            debug!(
1034                target: "engine::caching",
1035                %cached_hash,
1036                %parent_hash,
1037                hash_matches,
1038                available,
1039                usage_count,
1040                "Existing cache found"
1041            );
1042
1043            if available {
1044                // If the has is available (no other threads are using it), but has a mismatching
1045                // parent hash, we can just clear it and keep using without re-creating from
1046                // scratch.
1047                if !hash_matches {
1048                    c.clear();
1049                }
1050                return Some(c.clone())
1051            } else if hash_matches {
1052                self.metrics.execution_cache_in_use.increment(1);
1053            }
1054        } else {
1055            debug!(target: "engine::caching", %parent_hash, "No cache found");
1056        }
1057
1058        None
1059    }
1060
1061    /// Clears the tracked cache
1062    #[expect(unused)]
1063    pub(crate) fn clear(&self) {
1064        self.inner.write().take();
1065    }
1066
1067    /// Waits until the execution cache becomes available for use.
1068    ///
1069    /// This acquires a write lock to ensure exclusive access, then immediately releases it.
1070    /// This is useful for synchronization before starting payload processing.
1071    ///
1072    /// Returns the time spent waiting for the lock.
1073    pub fn wait_for_availability(&self) -> Duration {
1074        let start = Instant::now();
1075        // Acquire write lock to wait for any current holders to finish
1076        let _guard = self.inner.write();
1077        let elapsed = start.elapsed();
1078        if elapsed.as_millis() > 5 {
1079            debug!(
1080                target: "engine::tree::payload_processor",
1081                blocked_for=?elapsed,
1082                "Waited for execution cache to become available"
1083            );
1084        }
1085        elapsed
1086    }
1087
1088    /// Updates the cache with a closure that has exclusive access to the guard.
1089    /// This ensures that all cache operations happen atomically.
1090    ///
1091    /// ## CRITICAL SAFETY REQUIREMENT
1092    ///
1093    /// **Before calling this method, you MUST ensure there are no other active cache users.**
1094    /// This includes:
1095    /// - No running [`PrewarmCacheTask`] instances that could write to the cache
1096    /// - No concurrent transactions that might access the cached state
1097    /// - All prewarming operations must be completed or cancelled
1098    ///
1099    /// Violating this requirement can result in cache corruption, incorrect state data,
1100    /// and potential consensus failures.
1101    pub fn update_with_guard<F>(&self, update_fn: F)
1102    where
1103        F: FnOnce(&mut Option<SavedCache>),
1104    {
1105        let mut guard = self.inner.write();
1106        update_fn(&mut guard);
1107    }
1108}
1109
1110/// Metrics for execution cache operations.
1111#[derive(Metrics, Clone)]
1112#[metrics(scope = "consensus.engine.beacon")]
1113pub(crate) struct ExecutionCacheMetrics {
1114    /// Counter for when the execution cache was unavailable because other threads
1115    /// (e.g., prewarming) are still using it.
1116    pub(crate) execution_cache_in_use: Counter,
1117    /// Time spent waiting for execution cache mutex to become available.
1118    pub(crate) execution_cache_wait_duration: Histogram,
1119}
1120
1121/// EVM context required to execute a block.
1122#[derive(Debug, Clone)]
1123pub struct ExecutionEnv<Evm: ConfigureEvm> {
1124    /// Evm environment.
1125    pub evm_env: EvmEnvFor<Evm>,
1126    /// Hash of the block being executed.
1127    pub hash: B256,
1128    /// Hash of the parent block.
1129    pub parent_hash: B256,
1130    /// State root of the parent block.
1131    /// Used for sparse trie continuation: if the preserved trie's anchor matches this,
1132    /// the trie can be reused directly.
1133    pub parent_state_root: B256,
1134    /// Number of transactions in the block.
1135    /// Used to determine parallel worker count for prewarming.
1136    /// A value of 0 indicates the count is unknown.
1137    pub transaction_count: usize,
1138    /// Total gas used by all transactions in the block.
1139    /// Used to adaptively select multiproof chunk size for optimal throughput.
1140    pub gas_used: u64,
1141    /// Withdrawals included in the block.
1142    /// Used to generate prefetch targets for withdrawal addresses.
1143    pub withdrawals: Option<Vec<Withdrawal>>,
1144}
1145
1146impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
1147where
1148    EvmEnvFor<Evm>: Default,
1149{
1150    /// Creates a new [`ExecutionEnv`] with default values for testing.
1151    #[cfg(any(test, feature = "test-utils"))]
1152    pub fn test_default() -> Self {
1153        Self {
1154            evm_env: Default::default(),
1155            hash: Default::default(),
1156            parent_hash: Default::default(),
1157            parent_state_root: Default::default(),
1158            transaction_count: 0,
1159            gas_used: 0,
1160            withdrawals: None,
1161        }
1162    }
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167    use super::PayloadExecutionCache;
1168    use crate::tree::{
1169        cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
1170        payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1171        precompile_cache::PrecompileCacheMap,
1172        StateProviderBuilder, TreeConfig,
1173    };
1174    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1175    use alloy_evm::block::StateChangeSource;
1176    use rand::Rng;
1177    use reth_chainspec::ChainSpec;
1178    use reth_db_common::init::init_genesis;
1179    use reth_ethereum_primitives::TransactionSigned;
1180    use reth_evm::OnStateHook;
1181    use reth_evm_ethereum::EthEvmConfig;
1182    use reth_primitives_traits::{Account, Recovered, StorageEntry};
1183    use reth_provider::{
1184        providers::{BlockchainProvider, OverlayStateProviderFactory},
1185        test_utils::create_test_provider_factory_with_chain_spec,
1186        ChainSpecProvider, HashingWriter,
1187    };
1188    use reth_revm::db::BundleState;
1189    use reth_testing_utils::generators;
1190    use reth_trie::{test_utils::state_root, HashedPostState};
1191    use reth_trie_db::ChangesetCache;
1192    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1193    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1194    use std::sync::Arc;
1195
1196    fn make_saved_cache(hash: B256) -> SavedCache {
1197        let execution_cache = ExecutionCache::new(1_000);
1198        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
1199    }
1200
1201    #[test]
1202    fn execution_cache_allows_single_checkout() {
1203        let execution_cache = PayloadExecutionCache::default();
1204        let hash = B256::from([1u8; 32]);
1205
1206        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1207
1208        let first = execution_cache.get_cache_for(hash);
1209        assert!(first.is_some(), "expected initial checkout to succeed");
1210
1211        let second = execution_cache.get_cache_for(hash);
1212        assert!(second.is_none(), "second checkout should be blocked while guard is active");
1213
1214        drop(first);
1215
1216        let third = execution_cache.get_cache_for(hash);
1217        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1218    }
1219
1220    #[test]
1221    fn execution_cache_checkout_releases_on_drop() {
1222        let execution_cache = PayloadExecutionCache::default();
1223        let hash = B256::from([2u8; 32]);
1224
1225        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1226
1227        {
1228            let guard = execution_cache.get_cache_for(hash);
1229            assert!(guard.is_some(), "expected checkout to succeed");
1230            // Guard dropped at end of scope
1231        }
1232
1233        let retry = execution_cache.get_cache_for(hash);
1234        assert!(retry.is_some(), "checkout should succeed after guard drop");
1235    }
1236
1237    #[test]
1238    fn execution_cache_mismatch_parent_clears_and_returns() {
1239        let execution_cache = PayloadExecutionCache::default();
1240        let hash = B256::from([3u8; 32]);
1241
1242        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1243
1244        // When the parent hash doesn't match, the cache is cleared and returned for reuse
1245        let different_hash = B256::from([4u8; 32]);
1246        let cache = execution_cache.get_cache_for(different_hash);
1247        assert!(cache.is_some(), "cache should be returned for reuse after clearing")
1248    }
1249
1250    #[test]
1251    fn execution_cache_update_after_release_succeeds() {
1252        let execution_cache = PayloadExecutionCache::default();
1253        let initial = B256::from([5u8; 32]);
1254
1255        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1256
1257        let guard =
1258            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1259
1260        drop(guard);
1261
1262        let updated = B256::from([6u8; 32]);
1263        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1264
1265        let new_checkout = execution_cache.get_cache_for(updated);
1266        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1267    }
1268
1269    #[test]
1270    fn on_inserted_executed_block_populates_cache() {
1271        let payload_processor = PayloadProcessor::new(
1272            reth_tasks::Runtime::test(),
1273            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1274            &TreeConfig::default(),
1275            PrecompileCacheMap::default(),
1276        );
1277
1278        let parent_hash = B256::from([1u8; 32]);
1279        let block_hash = B256::from([10u8; 32]);
1280        let block_with_parent = BlockWithParent {
1281            block: BlockNumHash { hash: block_hash, number: 1 },
1282            parent: parent_hash,
1283        };
1284        let bundle_state = BundleState::default();
1285
1286        // Cache should be empty initially
1287        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1288
1289        // Update cache with inserted block
1290        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1291
1292        // Cache should now exist for the block hash
1293        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1294        assert!(cached.is_some());
1295        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1296    }
1297
1298    #[test]
1299    fn on_inserted_executed_block_skips_on_parent_mismatch() {
1300        let payload_processor = PayloadProcessor::new(
1301            reth_tasks::Runtime::test(),
1302            EthEvmConfig::new(Arc::new(ChainSpec::default())),
1303            &TreeConfig::default(),
1304            PrecompileCacheMap::default(),
1305        );
1306
1307        // Setup: populate cache with block 1
1308        let block1_hash = B256::from([1u8; 32]);
1309        payload_processor
1310            .execution_cache
1311            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1312
1313        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
1314        let wrong_parent = B256::from([99u8; 32]);
1315        let block3_hash = B256::from([3u8; 32]);
1316        let block_with_parent = BlockWithParent {
1317            block: BlockNumHash { hash: block3_hash, number: 3 },
1318            parent: wrong_parent,
1319        };
1320        let bundle_state = BundleState::default();
1321
1322        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1323
1324        // Cache should still be for block 1 (unchanged)
1325        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1326        assert!(cached.is_some(), "Original cache should be preserved");
1327
1328        // Cache for block 3 should not exist
1329        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1330        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1331    }
1332
1333    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1334        let mut rng = generators::rng();
1335        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1336        let mut updates = Vec::with_capacity(updates_per_account);
1337
1338        for _ in 0..updates_per_account {
1339            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1340            let mut state_update = EvmState::default();
1341
1342            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1343
1344            for &address in selected_addresses {
1345                let mut storage = HashMap::default();
1346                if rng.random_bool(0.7) {
1347                    for _ in 0..rng.random_range(1..10) {
1348                        let slot = U256::from(rng.random::<u64>());
1349                        storage.insert(
1350                            slot,
1351                            EvmStorageSlot::new_changed(
1352                                U256::ZERO,
1353                                U256::from(rng.random::<u64>()),
1354                                0,
1355                            ),
1356                        );
1357                    }
1358                }
1359
1360                let account = revm_state::Account {
1361                    info: AccountInfo {
1362                        balance: U256::from(rng.random::<u64>()),
1363                        nonce: rng.random::<u64>(),
1364                        code_hash: KECCAK_EMPTY,
1365                        code: Some(Default::default()),
1366                        account_id: None,
1367                    },
1368                    original_info: Box::new(AccountInfo::default()),
1369                    storage,
1370                    status: AccountStatus::Touched,
1371                    transaction_id: 0,
1372                };
1373
1374                state_update.insert(address, account);
1375            }
1376
1377            updates.push(state_update);
1378        }
1379
1380        updates
1381    }
1382
1383    #[test]
1384    fn test_state_root() {
1385        reth_tracing::init_test_tracing();
1386
1387        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1388        let genesis_hash = init_genesis(&factory).unwrap();
1389
1390        let state_updates = create_mock_state_updates(10, 10);
1391        let mut hashed_state = HashedPostState::default();
1392        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1393            HashMap::default();
1394
1395        {
1396            let provider_rw = factory.provider_rw().expect("failed to get provider");
1397
1398            for update in &state_updates {
1399                let account_updates = update.iter().map(|(address, account)| {
1400                    (*address, Some(Account::from_revm_account(account)))
1401                });
1402                provider_rw
1403                    .insert_account_for_hashing(account_updates)
1404                    .expect("failed to insert accounts");
1405
1406                let storage_updates = update.iter().map(|(address, account)| {
1407                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1408                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1409                    });
1410                    (*address, storage_entries)
1411                });
1412                provider_rw
1413                    .insert_storage_for_hashing(storage_updates)
1414                    .expect("failed to insert storage");
1415            }
1416            provider_rw.commit().expect("failed to commit changes");
1417        }
1418
1419        for update in &state_updates {
1420            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1421
1422            for (address, account) in update {
1423                let storage: HashMap<B256, U256> = account
1424                    .storage
1425                    .iter()
1426                    .map(|(k, v)| (B256::from(*k), v.present_value))
1427                    .collect();
1428
1429                let entry = accumulated_state.entry(*address).or_default();
1430                entry.0 = Account::from_revm_account(account);
1431                entry.1.extend(storage);
1432            }
1433        }
1434
1435        let mut payload_processor = PayloadProcessor::new(
1436            reth_tasks::Runtime::test(),
1437            EthEvmConfig::new(factory.chain_spec()),
1438            &TreeConfig::default(),
1439            PrecompileCacheMap::default(),
1440        );
1441
1442        let provider_factory = BlockchainProvider::new(factory).unwrap();
1443
1444        let mut handle = payload_processor.spawn(
1445            ExecutionEnv::test_default(),
1446            (
1447                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1448                std::convert::identity,
1449            ),
1450            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1451            OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1452            &TreeConfig::default(),
1453            None, // No BAL for test
1454        );
1455
1456        let mut state_hook = handle.state_hook().expect("state hook is None");
1457
1458        for (i, update) in state_updates.into_iter().enumerate() {
1459            state_hook.on_state(StateChangeSource::Transaction(i), &update);
1460        }
1461        drop(state_hook);
1462
1463        let root_from_task = handle.state_root().expect("task failed").state_root;
1464        let root_from_regular = state_root(accumulated_state);
1465
1466        assert_eq!(
1467            root_from_task, root_from_regular,
1468            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1469        );
1470    }
1471}