reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{
6        CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7        SavedCache,
8    },
9    payload_processor::{
10        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11        sparse_trie::StateRootComputeOutcome,
12    },
13    sparse_trie::SparseTrieTask,
14    StateProviderBuilder, TreeConfig,
15};
16use alloy_evm::{block::StateChangeSource, ToTxEnv};
17use alloy_primitives::B256;
18use crossbeam_channel::Sender as CrossbeamSender;
19use executor::WorkloadExecutor;
20use multiproof::{SparseTrieUpdate, *};
21use parking_lot::RwLock;
22use prewarm::PrewarmMetrics;
23use reth_engine_primitives::ExecutableTxIterator;
24use reth_evm::{
25    execute::{ExecutableTxFor, WithTxEnv},
26    ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
27};
28use reth_primitives_traits::NodePrimitives;
29use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
30use reth_revm::{db::BundleState, state::EvmState};
31use reth_trie::{
32    hashed_cursor::HashedCursorFactory, prefix_set::TriePrefixSetsMut,
33    trie_cursor::TrieCursorFactory,
34};
35use reth_trie_parallel::{
36    proof_task::{ProofTaskCtx, ProofWorkerHandle},
37    root::ParallelStateRootError,
38};
39use reth_trie_sparse::{
40    provider::{TrieNodeProvider, TrieNodeProviderFactory},
41    ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
42};
43use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
44use std::{
45    sync::{
46        atomic::AtomicBool,
47        mpsc::{self, channel},
48        Arc,
49    },
50    time::Instant,
51};
52use tracing::{debug, debug_span, instrument, warn};
53
54mod configured_sparse_trie;
55pub mod executor;
56pub mod multiproof;
57pub mod prewarm;
58pub mod sparse_trie;
59
60use configured_sparse_trie::ConfiguredSparseTrie;
61
62/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
63///
64/// These values were determined by performing benchmarks using gradually increasing values to judge
65/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
66/// would deteriorate to the point where PST might as well not be used.
67pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
68    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
69
70/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
71/// nodes in allocated sparse tries.
72///
73/// Node maps have a key of `Nibbles` and value of `SparseNode`.
74/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
75///
76/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
77/// 120MB.
78pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
79
80/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
81/// in allocated sparse tries.
82///
83/// There are storage and account values, the largest of the two being account values, which are
84/// essentially `TrieAccount`s.
85///
86/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
87/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
88///
89/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
90/// 144MB.
91pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
92
93/// Entrypoint for executing the payload.
94#[derive(Debug)]
95pub struct PayloadProcessor<Evm>
96where
97    Evm: ConfigureEvm,
98{
99    /// The executor used by to spawn tasks.
100    executor: WorkloadExecutor,
101    /// The most recent cache used for execution.
102    execution_cache: ExecutionCache,
103    /// Metrics for trie operations
104    trie_metrics: MultiProofTaskMetrics,
105    /// Cross-block cache size in bytes.
106    cross_block_cache_size: u64,
107    /// Whether transactions should not be executed on prewarming task.
108    disable_transaction_prewarming: bool,
109    /// Determines how to configure the evm for execution.
110    evm_config: Evm,
111    /// Whether precompile cache should be disabled.
112    precompile_cache_disabled: bool,
113    /// Precompile cache map.
114    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
115    /// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
116    /// that allocations can be minimized.
117    sparse_state_trie: Arc<
118        parking_lot::Mutex<
119            Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
120        >,
121    >,
122    /// Whether to disable the parallel sparse trie.
123    disable_parallel_sparse_trie: bool,
124    /// Maximum concurrency for prewarm task.
125    prewarm_max_concurrency: usize,
126}
127
128impl<N, Evm> PayloadProcessor<Evm>
129where
130    N: NodePrimitives,
131    Evm: ConfigureEvm<Primitives = N>,
132{
133    /// Creates a new payload processor.
134    pub fn new(
135        executor: WorkloadExecutor,
136        evm_config: Evm,
137        config: &TreeConfig,
138        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
139    ) -> Self {
140        Self {
141            executor,
142            execution_cache: Default::default(),
143            trie_metrics: Default::default(),
144            cross_block_cache_size: config.cross_block_cache_size(),
145            disable_transaction_prewarming: config.disable_prewarming(),
146            evm_config,
147            precompile_cache_disabled: config.precompile_cache_disabled(),
148            precompile_cache_map,
149            sparse_state_trie: Arc::default(),
150            disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
151            prewarm_max_concurrency: config.prewarm_max_concurrency(),
152        }
153    }
154}
155
156impl<N, Evm> PayloadProcessor<Evm>
157where
158    N: NodePrimitives,
159    Evm: ConfigureEvm<Primitives = N> + 'static,
160{
161    /// Spawns all background tasks and returns a handle connected to the tasks.
162    ///
163    /// - Transaction prewarming task
164    /// - State root task
165    /// - Sparse trie task
166    ///
167    /// # Transaction prewarming task
168    ///
169    /// Responsible for feeding state updates to the multi proof task.
170    ///
171    /// This task runs until:
172    ///  - externally cancelled (e.g. sequential block execution is complete)
173    ///
174    /// ## Multi proof task
175    ///
176    /// Responsible for preparing sparse trie messages for the sparse trie task.
177    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
178    /// output back to this task.
179    ///
180    /// Receives updates from sequential execution.
181    /// This task runs until it receives a shutdown signal, which should be after the block
182    /// was fully executed.
183    ///
184    /// ## Sparse trie task
185    ///
186    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
187    ///
188    /// This task runs until there are no further updates to process.
189    ///
190    ///
191    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
192    /// canceling)
193    #[allow(clippy::type_complexity)]
194    #[instrument(
195        level = "debug",
196        target = "engine::tree::payload_processor",
197        name = "payload processor",
198        skip_all
199    )]
200    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
201        &mut self,
202        env: ExecutionEnv<Evm>,
203        transactions: I,
204        provider_builder: StateProviderBuilder<N, P>,
205        multiproof_provider_factory: F,
206        config: &TreeConfig,
207    ) -> Result<
208        PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
209        (ParallelStateRootError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
210    >
211    where
212        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
213        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
214            + Clone
215            + Send
216            + 'static,
217    {
218        let span = tracing::Span::current();
219        let (to_sparse_trie, sparse_trie_rx) = channel();
220
221        // We rely on the cursor factory to provide whatever DB overlay is necessary to see a
222        // consistent view of the database, including the trie tables. Because of this there is no
223        // need for an overarching prefix set to invalidate any section of the trie tables, and so
224        // we use an empty prefix set.
225        let prefix_sets = Arc::new(TriePrefixSetsMut::default());
226
227        // Create and spawn the storage proof task
228        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory, prefix_sets);
229        let storage_worker_count = config.storage_worker_count();
230        let account_worker_count = config.account_worker_count();
231        let proof_handle = ProofWorkerHandle::new(
232            self.executor.handle().clone(),
233            task_ctx,
234            storage_worker_count,
235            account_worker_count,
236        );
237
238        let multi_proof_task = MultiProofTask::new(
239            proof_handle.clone(),
240            to_sparse_trie,
241            config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
242        );
243
244        // wire the multiproof task to the prewarm task
245        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
246
247        let (prewarm_rx, execution_rx, transaction_count_hint) =
248            self.spawn_tx_iterator(transactions);
249
250        let prewarm_handle = self.spawn_caching_with(
251            env,
252            prewarm_rx,
253            transaction_count_hint,
254            provider_builder,
255            to_multi_proof.clone(),
256        );
257
258        // spawn multi-proof task
259        self.executor.spawn_blocking(move || {
260            let _enter = span.entered();
261            multi_proof_task.run();
262        });
263
264        // wire the sparse trie to the state root response receiver
265        let (state_root_tx, state_root_rx) = channel();
266
267        // Spawn the sparse trie task using any stored trie and parallel trie configuration.
268        self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
269
270        Ok(PayloadHandle {
271            to_multi_proof,
272            prewarm_handle,
273            state_root: Some(state_root_rx),
274            transactions: execution_rx,
275        })
276    }
277
278    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
279    ///
280    /// Returns a [`PayloadHandle`] to communicate with the task.
281    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
282    pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
283        &self,
284        env: ExecutionEnv<Evm>,
285        transactions: I,
286        provider_builder: StateProviderBuilder<N, P>,
287    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
288    where
289        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
290    {
291        let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
292        let prewarm_handle =
293            self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
294        PayloadHandle {
295            to_multi_proof: None,
296            prewarm_handle,
297            state_root: None,
298            transactions: execution_rx,
299        }
300    }
301
302    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
303    #[expect(clippy::type_complexity)]
304    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
305        &self,
306        transactions: I,
307    ) -> (
308        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
309        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
310        usize,
311    ) {
312        // Get the transaction count for prewarming task
313        // Use upper bound if available (more accurate), otherwise use lower bound
314        let (lower, upper) = transactions.size_hint();
315        let transaction_count_hint = upper.unwrap_or(lower);
316
317        let (prewarm_tx, prewarm_rx) = mpsc::channel();
318        let (execute_tx, execute_rx) = mpsc::channel();
319        self.executor.spawn_blocking(move || {
320            for tx in transactions {
321                let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
322                // only send Ok(_) variants to prewarming task
323                if let Ok(tx) = &tx {
324                    let _ = prewarm_tx.send(tx.clone());
325                }
326                let _ = execute_tx.send(tx);
327            }
328        });
329
330        (prewarm_rx, execute_rx, transaction_count_hint)
331    }
332
333    /// Spawn prewarming optionally wired to the multiproof task for target updates.
334    fn spawn_caching_with<P>(
335        &self,
336        env: ExecutionEnv<Evm>,
337        mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
338        transaction_count_hint: usize,
339        provider_builder: StateProviderBuilder<N, P>,
340        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
341    ) -> CacheTaskHandle
342    where
343        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
344    {
345        if self.disable_transaction_prewarming {
346            // if no transactions should be executed we clear them but still spawn the task for
347            // caching updates
348            transactions = mpsc::channel().1;
349        }
350
351        let saved_cache = self.cache_for(env.parent_hash);
352        let cache = saved_cache.cache().clone();
353        let cache_metrics = saved_cache.metrics().clone();
354        // configure prewarming
355        let prewarm_ctx = PrewarmContext {
356            env,
357            evm_config: self.evm_config.clone(),
358            saved_cache,
359            provider: provider_builder,
360            metrics: PrewarmMetrics::default(),
361            terminate_execution: Arc::new(AtomicBool::new(false)),
362            precompile_cache_disabled: self.precompile_cache_disabled,
363            precompile_cache_map: self.precompile_cache_map.clone(),
364        };
365
366        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
367            self.executor.clone(),
368            self.execution_cache.clone(),
369            prewarm_ctx,
370            to_multi_proof,
371            transaction_count_hint,
372            self.prewarm_max_concurrency,
373        );
374
375        // spawn pre-warm task
376        {
377            let to_prewarm_task = to_prewarm_task.clone();
378            let span = debug_span!(target: "engine::tree::payload_processor", "prewarm task");
379            self.executor.spawn_blocking(move || {
380                let _enter = span.entered();
381                prewarm_task.run(transactions, to_prewarm_task);
382            });
383        }
384
385        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
386    }
387
388    /// Returns the cache for the given parent hash.
389    ///
390    /// If the given hash is different then what is recently cached, then this will create a new
391    /// instance.
392    #[instrument(level = "debug", target = "engine::caching", skip(self))]
393    fn cache_for(&self, parent_hash: B256) -> SavedCache {
394        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
395            debug!("reusing execution cache");
396            cache
397        } else {
398            debug!("creating new execution cache on cache miss");
399            let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
400            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
401        }
402    }
403
404    /// Spawns the [`SparseTrieTask`] for this payload processor.
405    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
406    fn spawn_sparse_trie_task<BPF>(
407        &self,
408        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
409        proof_worker_handle: BPF,
410        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
411    ) where
412        BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
413        BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
414        BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
415    {
416        // Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
417        // there's none to reuse.
418        let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
419        let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
420            let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
421                ConfiguredSparseTrie::Serial(Default::default())
422            } else {
423                ConfiguredSparseTrie::Parallel(Box::new(
424                    ParallelSparseTrie::default()
425                        .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
426                ))
427            });
428            ClearedSparseStateTrie::from_state_trie(
429                SparseStateTrie::new()
430                    .with_accounts_trie(default_trie.clone())
431                    .with_default_storage_trie(default_trie)
432                    .with_updates(true),
433            )
434        });
435
436        let task =
437            SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
438                sparse_trie_rx,
439                proof_worker_handle,
440                self.trie_metrics.clone(),
441                sparse_state_trie,
442            );
443
444        let span = tracing::Span::current();
445        self.executor.spawn_blocking(move || {
446            let _enter = span.entered();
447
448            let (result, trie) = task.run();
449            // Send state root computation result
450            let _ = state_root_tx.send(result);
451
452            // Clear the SparseStateTrie, shrink, and replace it back into the mutex _after_ sending
453            // results to the next step, so that time spent clearing doesn't block the step after
454            // this one.
455            let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
456            let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
457
458            // Shrink the sparse trie so that we don't have ever increasing memory.
459            cleared_trie.shrink_to(
460                SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
461                SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
462            );
463
464            cleared_sparse_trie.lock().replace(cleared_trie);
465        });
466    }
467}
468
469/// Handle to all the spawned tasks.
470#[derive(Debug)]
471pub struct PayloadHandle<Tx, Err> {
472    /// Channel for evm state updates
473    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
474    // must include the receiver of the state root wired to the sparse trie
475    prewarm_handle: CacheTaskHandle,
476    /// Receiver for the state root
477    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
478    /// Stream of block transactions
479    transactions: mpsc::Receiver<Result<Tx, Err>>,
480}
481
482impl<Tx, Err> PayloadHandle<Tx, Err> {
483    /// Awaits the state root
484    ///
485    /// # Panics
486    ///
487    /// If payload processing was started without background tasks.
488    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
489    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
490        self.state_root
491            .take()
492            .expect("state_root is None")
493            .recv()
494            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
495    }
496
497    /// Returns a state hook to be used to send state updates to this task.
498    ///
499    /// If a multiproof task is spawned the hook will notify it about new states.
500    pub fn state_hook(&self) -> impl OnStateHook {
501        // convert the channel into a `StateHookSender` that emits an event on drop
502        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
503
504        move |source: StateChangeSource, state: &EvmState| {
505            if let Some(sender) = &to_multi_proof {
506                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
507            }
508        }
509    }
510
511    /// Returns a clone of the caches used by prewarming
512    pub(super) fn caches(&self) -> StateExecutionCache {
513        self.prewarm_handle.cache.clone()
514    }
515
516    /// Returns a clone of the cache metrics used by prewarming
517    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
518        self.prewarm_handle.cache_metrics.clone()
519    }
520
521    /// Terminates the pre-warming transaction processing.
522    ///
523    /// Note: This does not terminate the task yet.
524    pub(super) fn stop_prewarming_execution(&self) {
525        self.prewarm_handle.stop_prewarming_execution()
526    }
527
528    /// Terminates the entire caching task.
529    ///
530    /// If the [`BundleState`] is provided it will update the shared cache.
531    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
532        self.prewarm_handle.terminate_caching(block_output)
533    }
534
535    /// Returns iterator yielding transactions from the stream.
536    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
537        core::iter::repeat_with(|| self.transactions.recv())
538            .take_while(|res| res.is_ok())
539            .map(|res| res.unwrap())
540    }
541}
542
543/// Access to the spawned [`PrewarmCacheTask`].
544#[derive(Debug)]
545pub(crate) struct CacheTaskHandle {
546    /// The shared cache the task operates with.
547    cache: StateExecutionCache,
548    /// Metrics for the caches
549    cache_metrics: CachedStateMetrics,
550    /// Channel to the spawned prewarm task if any
551    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
552}
553
554impl CacheTaskHandle {
555    /// Terminates the pre-warming transaction processing.
556    ///
557    /// Note: This does not terminate the task yet.
558    pub(super) fn stop_prewarming_execution(&self) {
559        self.to_prewarm_task
560            .as_ref()
561            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
562    }
563
564    /// Terminates the entire pre-warming task.
565    ///
566    /// If the [`BundleState`] is provided it will update the shared cache.
567    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
568        if let Some(tx) = self.to_prewarm_task.take() {
569            // Only clone when we have an active task and a state to send
570            let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
571            let _ = tx.send(event);
572        }
573    }
574}
575
576impl Drop for CacheTaskHandle {
577    fn drop(&mut self) {
578        // Ensure we always terminate on drop
579        self.terminate_caching(None);
580    }
581}
582
583/// Shared access to most recently used cache.
584///
585/// This cache is intended to used for processing the payload in the following manner:
586///  - Get Cache if the payload's parent block matches the parent block
587///  - Update cache upon successful payload execution
588///
589/// This process assumes that payloads are received sequentially.
590///
591/// ## Cache Safety
592///
593/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
594/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
595/// the cache may be corrupted or cleared.
596///
597/// ## Cache vs Prewarming Distinction
598///
599/// **`ExecutionCache`**:
600/// - Stores parent block's execution state after completion
601/// - Used to fetch parent data for next block's execution
602/// - Must be exclusively accessed during save operations
603///
604/// **`PrewarmCacheTask`**:
605/// - Speculatively loads accounts/storage that might be used in transaction execution
606/// - Prepares data for state root proof computation
607/// - Runs concurrently but must not interfere with cache saves
608#[derive(Clone, Debug, Default)]
609struct ExecutionCache {
610    /// Guarded cloneable cache identified by a block hash.
611    inner: Arc<RwLock<Option<SavedCache>>>,
612}
613
614impl ExecutionCache {
615    /// Returns the cache for `parent_hash` if it's available for use.
616    ///
617    /// A cache is considered available when:
618    /// - It exists and matches the requested parent hash
619    /// - No other tasks are currently using it (checked via Arc reference count)
620    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
621    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
622        let start = Instant::now();
623        let cache = self.inner.read();
624
625        let elapsed = start.elapsed();
626        if elapsed.as_millis() > 5 {
627            warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
628        }
629
630        cache
631            .as_ref()
632            .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
633            .cloned()
634    }
635
636    /// Clears the tracked cache
637    #[expect(unused)]
638    pub(crate) fn clear(&self) {
639        self.inner.write().take();
640    }
641
642    /// Updates the cache with a closure that has exclusive access to the guard.
643    /// This ensures that all cache operations happen atomically.
644    ///
645    /// ## CRITICAL SAFETY REQUIREMENT
646    ///
647    /// **Before calling this method, you MUST ensure there are no other active cache users.**
648    /// This includes:
649    /// - No running [`PrewarmCacheTask`] instances that could write to the cache
650    /// - No concurrent transactions that might access the cached state
651    /// - All prewarming operations must be completed or cancelled
652    ///
653    /// Violating this requirement can result in cache corruption, incorrect state data,
654    /// and potential consensus failures.
655    pub(crate) fn update_with_guard<F>(&self, update_fn: F)
656    where
657        F: FnOnce(&mut Option<SavedCache>),
658    {
659        let mut guard = self.inner.write();
660        update_fn(&mut guard);
661    }
662}
663
664/// EVM context required to execute a block.
665#[derive(Debug, Clone)]
666pub struct ExecutionEnv<Evm: ConfigureEvm> {
667    /// Evm environment.
668    pub evm_env: EvmEnvFor<Evm>,
669    /// Hash of the block being executed.
670    pub hash: B256,
671    /// Hash of the parent block.
672    pub parent_hash: B256,
673}
674
675impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
676where
677    EvmEnvFor<Evm>: Default,
678{
679    fn default() -> Self {
680        Self {
681            evm_env: Default::default(),
682            hash: Default::default(),
683            parent_hash: Default::default(),
684        }
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::ExecutionCache;
691    use crate::tree::{
692        cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
693        payload_processor::{
694            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
695        },
696        precompile_cache::PrecompileCacheMap,
697        StateProviderBuilder, TreeConfig,
698    };
699    use alloy_evm::block::StateChangeSource;
700    use rand::Rng;
701    use reth_chainspec::ChainSpec;
702    use reth_db_common::init::init_genesis;
703    use reth_ethereum_primitives::TransactionSigned;
704    use reth_evm::OnStateHook;
705    use reth_evm_ethereum::EthEvmConfig;
706    use reth_primitives_traits::{Account, Recovered, StorageEntry};
707    use reth_provider::{
708        providers::{BlockchainProvider, OverlayStateProviderFactory},
709        test_utils::create_test_provider_factory_with_chain_spec,
710        ChainSpecProvider, HashingWriter,
711    };
712    use reth_testing_utils::generators;
713    use reth_trie::{test_utils::state_root, HashedPostState};
714    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
715    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
716    use std::sync::Arc;
717
718    fn make_saved_cache(hash: B256) -> SavedCache {
719        let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
720        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
721    }
722
723    #[test]
724    fn execution_cache_allows_single_checkout() {
725        let execution_cache = ExecutionCache::default();
726        let hash = B256::from([1u8; 32]);
727
728        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
729
730        let first = execution_cache.get_cache_for(hash);
731        assert!(first.is_some(), "expected initial checkout to succeed");
732
733        let second = execution_cache.get_cache_for(hash);
734        assert!(second.is_none(), "second checkout should be blocked while guard is active");
735
736        drop(first);
737
738        let third = execution_cache.get_cache_for(hash);
739        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
740    }
741
742    #[test]
743    fn execution_cache_checkout_releases_on_drop() {
744        let execution_cache = ExecutionCache::default();
745        let hash = B256::from([2u8; 32]);
746
747        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
748
749        {
750            let guard = execution_cache.get_cache_for(hash);
751            assert!(guard.is_some(), "expected checkout to succeed");
752            // Guard dropped at end of scope
753        }
754
755        let retry = execution_cache.get_cache_for(hash);
756        assert!(retry.is_some(), "checkout should succeed after guard drop");
757    }
758
759    #[test]
760    fn execution_cache_mismatch_parent_returns_none() {
761        let execution_cache = ExecutionCache::default();
762        let hash = B256::from([3u8; 32]);
763
764        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
765
766        let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
767        assert!(miss.is_none(), "checkout should fail for different parent hash");
768    }
769
770    #[test]
771    fn execution_cache_update_after_release_succeeds() {
772        let execution_cache = ExecutionCache::default();
773        let initial = B256::from([5u8; 32]);
774
775        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
776
777        let guard =
778            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
779
780        drop(guard);
781
782        let updated = B256::from([6u8; 32]);
783        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
784
785        let new_checkout = execution_cache.get_cache_for(updated);
786        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
787    }
788
789    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
790        let mut rng = generators::rng();
791        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
792        let mut updates = Vec::with_capacity(updates_per_account);
793
794        for _ in 0..updates_per_account {
795            let num_accounts_in_update = rng.random_range(1..=num_accounts);
796            let mut state_update = EvmState::default();
797
798            let selected_addresses = &all_addresses[0..num_accounts_in_update];
799
800            for &address in selected_addresses {
801                let mut storage = HashMap::default();
802                if rng.random_bool(0.7) {
803                    for _ in 0..rng.random_range(1..10) {
804                        let slot = U256::from(rng.random::<u64>());
805                        storage.insert(
806                            slot,
807                            EvmStorageSlot::new_changed(
808                                U256::ZERO,
809                                U256::from(rng.random::<u64>()),
810                                0,
811                            ),
812                        );
813                    }
814                }
815
816                let account = revm_state::Account {
817                    info: AccountInfo {
818                        balance: U256::from(rng.random::<u64>()),
819                        nonce: rng.random::<u64>(),
820                        code_hash: KECCAK_EMPTY,
821                        code: Some(Default::default()),
822                    },
823                    storage,
824                    status: AccountStatus::Touched,
825                    transaction_id: 0,
826                };
827
828                state_update.insert(address, account);
829            }
830
831            updates.push(state_update);
832        }
833
834        updates
835    }
836
837    #[test]
838    fn test_state_root() {
839        reth_tracing::init_test_tracing();
840
841        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
842        let genesis_hash = init_genesis(&factory).unwrap();
843
844        let state_updates = create_mock_state_updates(10, 10);
845        let mut hashed_state = HashedPostState::default();
846        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
847            HashMap::default();
848
849        {
850            let provider_rw = factory.provider_rw().expect("failed to get provider");
851
852            for update in &state_updates {
853                let account_updates = update.iter().map(|(address, account)| {
854                    (*address, Some(Account::from_revm_account(account)))
855                });
856                provider_rw
857                    .insert_account_for_hashing(account_updates)
858                    .expect("failed to insert accounts");
859
860                let storage_updates = update.iter().map(|(address, account)| {
861                    let storage_entries = account.storage.iter().map(|(slot, value)| {
862                        StorageEntry { key: B256::from(*slot), value: value.present_value }
863                    });
864                    (*address, storage_entries)
865                });
866                provider_rw
867                    .insert_storage_for_hashing(storage_updates)
868                    .expect("failed to insert storage");
869            }
870            provider_rw.commit().expect("failed to commit changes");
871        }
872
873        for update in &state_updates {
874            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
875
876            for (address, account) in update {
877                let storage: HashMap<B256, U256> = account
878                    .storage
879                    .iter()
880                    .map(|(k, v)| (B256::from(*k), v.present_value))
881                    .collect();
882
883                let entry = accumulated_state.entry(*address).or_default();
884                entry.0 = Account::from_revm_account(account);
885                entry.1.extend(storage);
886            }
887        }
888
889        let mut payload_processor = PayloadProcessor::new(
890            WorkloadExecutor::default(),
891            EthEvmConfig::new(factory.chain_spec()),
892            &TreeConfig::default(),
893            PrecompileCacheMap::default(),
894        );
895
896        let provider_factory = BlockchainProvider::new(factory).unwrap();
897
898        let mut handle =
899            payload_processor
900                .spawn(
901                    Default::default(),
902                    core::iter::empty::<
903                        Result<Recovered<TransactionSigned>, core::convert::Infallible>,
904                    >(),
905                    StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
906                    OverlayStateProviderFactory::new(provider_factory),
907                    &TreeConfig::default(),
908                )
909                .map_err(|(err, ..)| err)
910                .expect("failed to spawn payload processor");
911
912        let mut state_hook = handle.state_hook();
913
914        for (i, update) in state_updates.into_iter().enumerate() {
915            state_hook.on_state(StateChangeSource::Transaction(i), &update);
916        }
917        drop(state_hook);
918
919        let root_from_task = handle.state_root().expect("task failed").state_root;
920        let root_from_regular = state_root(accumulated_state);
921
922        assert_eq!(
923            root_from_task, root_from_regular,
924            "State root mismatch: task={root_from_task}, base={root_from_regular}"
925        );
926    }
927}