reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{
6        CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7        SavedCache,
8    },
9    payload_processor::{
10        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11        sparse_trie::StateRootComputeOutcome,
12    },
13    sparse_trie::SparseTrieTask,
14    StateProviderBuilder, TreeConfig,
15};
16use alloy_evm::{block::StateChangeSource, ToTxEnv};
17use alloy_primitives::B256;
18use executor::WorkloadExecutor;
19use multiproof::{SparseTrieUpdate, *};
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use reth_engine_primitives::ExecutableTxIterator;
23use reth_evm::{
24    execute::{ExecutableTxFor, WithTxEnv},
25    ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
26};
27use reth_primitives_traits::NodePrimitives;
28use reth_provider::{
29    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
30    StateReader,
31};
32use reth_revm::{db::BundleState, state::EvmState};
33use reth_trie::TrieInput;
34use reth_trie_parallel::{
35    proof_task::{ProofTaskCtx, ProofTaskManager},
36    root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39    provider::{TrieNodeProvider, TrieNodeProviderFactory},
40    ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
41};
42use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
43use std::sync::{
44    atomic::AtomicBool,
45    mpsc::{self, channel, Sender},
46    Arc,
47};
48use tracing::{debug, instrument, warn};
49
50mod configured_sparse_trie;
51pub mod executor;
52pub mod multiproof;
53pub mod prewarm;
54pub mod sparse_trie;
55
56use configured_sparse_trie::ConfiguredSparseTrie;
57
58/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
59///
60/// These values were determined by performing benchmarks using gradually increasing values to judge
61/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
62/// would deteriorate to the point where PST might as well not be used.
63pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
64    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
65
66/// Entrypoint for executing the payload.
67#[derive(Debug)]
68pub struct PayloadProcessor<Evm>
69where
70    Evm: ConfigureEvm,
71{
72    /// The executor used by to spawn tasks.
73    executor: WorkloadExecutor,
74    /// The most recent cache used for execution.
75    execution_cache: ExecutionCache,
76    /// Metrics for trie operations
77    trie_metrics: MultiProofTaskMetrics,
78    /// Cross-block cache size in bytes.
79    cross_block_cache_size: u64,
80    /// Whether transactions should not be executed on prewarming task.
81    disable_transaction_prewarming: bool,
82    /// Determines how to configure the evm for execution.
83    evm_config: Evm,
84    /// Whether precompile cache should be disabled.
85    precompile_cache_disabled: bool,
86    /// Precompile cache map.
87    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
88    /// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
89    /// that allocations can be minimized.
90    sparse_state_trie: Arc<
91        parking_lot::Mutex<
92            Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
93        >,
94    >,
95    /// Whether to disable the parallel sparse trie.
96    disable_parallel_sparse_trie: bool,
97    /// A cleared trie input, kept around to be reused so allocations can be minimized.
98    trie_input: Option<TrieInput>,
99    /// Maximum concurrency for prewarm task.
100    prewarm_max_concurrency: usize,
101}
102
103impl<N, Evm> PayloadProcessor<Evm>
104where
105    N: NodePrimitives,
106    Evm: ConfigureEvm<Primitives = N>,
107{
108    /// Creates a new payload processor.
109    pub fn new(
110        executor: WorkloadExecutor,
111        evm_config: Evm,
112        config: &TreeConfig,
113        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
114    ) -> Self {
115        Self {
116            executor,
117            execution_cache: Default::default(),
118            trie_metrics: Default::default(),
119            cross_block_cache_size: config.cross_block_cache_size(),
120            disable_transaction_prewarming: config.disable_caching_and_prewarming(),
121            evm_config,
122            precompile_cache_disabled: config.precompile_cache_disabled(),
123            precompile_cache_map,
124            sparse_state_trie: Arc::default(),
125            trie_input: None,
126            disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
127            prewarm_max_concurrency: config.prewarm_max_concurrency(),
128        }
129    }
130}
131
132impl<N, Evm> PayloadProcessor<Evm>
133where
134    N: NodePrimitives,
135    Evm: ConfigureEvm<Primitives = N> + 'static,
136{
137    /// Spawns all background tasks and returns a handle connected to the tasks.
138    ///
139    /// - Transaction prewarming task
140    /// - State root task
141    /// - Sparse trie task
142    ///
143    /// # Transaction prewarming task
144    ///
145    /// Responsible for feeding state updates to the multi proof task.
146    ///
147    /// This task runs until:
148    ///  - externally cancelled (e.g. sequential block execution is complete)
149    ///
150    /// ## Multi proof task
151    ///
152    /// Responsible for preparing sparse trie messages for the sparse trie task.
153    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
154    /// output back to this task.
155    ///
156    /// Receives updates from sequential execution.
157    /// This task runs until it receives a shutdown signal, which should be after the block
158    /// was fully executed.
159    ///
160    /// ## Sparse trie task
161    ///
162    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
163    ///
164    /// This task runs until there are no further updates to process.
165    ///
166    ///
167    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
168    /// canceling)
169    ///
170    /// Returns an error with the original transactions iterator if the proof task manager fails to
171    /// initialize.
172    #[allow(clippy::type_complexity)]
173    pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
174        &mut self,
175        env: ExecutionEnv<Evm>,
176        transactions: I,
177        provider_builder: StateProviderBuilder<N, P>,
178        consistent_view: ConsistentDbView<P>,
179        trie_input: TrieInput,
180        config: &TreeConfig,
181    ) -> Result<
182        PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
183        (reth_provider::ProviderError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
184    >
185    where
186        P: DatabaseProviderFactory<Provider: BlockReader>
187            + BlockReader
188            + StateProviderFactory
189            + StateReader
190            + Clone
191            + 'static,
192    {
193        let (to_sparse_trie, sparse_trie_rx) = channel();
194        // spawn multiproof task, save the trie input
195        let (trie_input, state_root_config) =
196            MultiProofConfig::new_from_input(consistent_view, trie_input);
197        self.trie_input = Some(trie_input);
198
199        // Create and spawn the storage proof task
200        let task_ctx = ProofTaskCtx::new(
201            state_root_config.nodes_sorted.clone(),
202            state_root_config.state_sorted.clone(),
203            state_root_config.prefix_sets.clone(),
204        );
205        let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
206        let storage_worker_count = config.storage_worker_count();
207        let proof_task = match ProofTaskManager::new(
208            self.executor.handle().clone(),
209            state_root_config.consistent_view.clone(),
210            task_ctx,
211            max_proof_task_concurrency,
212            storage_worker_count,
213        ) {
214            Ok(task) => task,
215            Err(error) => {
216                return Err((error, transactions, env, provider_builder));
217            }
218        };
219
220        // We set it to half of the proof task concurrency, because often for each multiproof we
221        // spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
222        let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
223        let multi_proof_task = MultiProofTask::new(
224            state_root_config,
225            self.executor.clone(),
226            proof_task.handle(),
227            to_sparse_trie,
228            max_multi_proof_task_concurrency,
229            config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
230        );
231
232        // wire the multiproof task to the prewarm task
233        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
234
235        let (prewarm_rx, execution_rx, transaction_count_hint) =
236            self.spawn_tx_iterator(transactions);
237
238        let prewarm_handle = self.spawn_caching_with(
239            env,
240            prewarm_rx,
241            transaction_count_hint,
242            provider_builder,
243            to_multi_proof.clone(),
244        );
245
246        // spawn multi-proof task
247        self.executor.spawn_blocking(move || {
248            multi_proof_task.run();
249        });
250
251        // wire the sparse trie to the state root response receiver
252        let (state_root_tx, state_root_rx) = channel();
253
254        // Spawn the sparse trie task using any stored trie and parallel trie configuration.
255        self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
256
257        // spawn the proof task
258        self.executor.spawn_blocking(move || {
259            if let Err(err) = proof_task.run() {
260                // At least log if there is an error at any point
261                tracing::error!(
262                    target: "engine::root",
263                    ?err,
264                    "Storage proof task returned an error"
265                );
266            }
267        });
268
269        Ok(PayloadHandle {
270            to_multi_proof,
271            prewarm_handle,
272            state_root: Some(state_root_rx),
273            transactions: execution_rx,
274        })
275    }
276
277    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
278    ///
279    /// Returns a [`PayloadHandle`] to communicate with the task.
280    pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
281        &self,
282        env: ExecutionEnv<Evm>,
283        transactions: I,
284        provider_builder: StateProviderBuilder<N, P>,
285    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
286    where
287        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
288    {
289        let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
290        let prewarm_handle =
291            self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
292        PayloadHandle {
293            to_multi_proof: None,
294            prewarm_handle,
295            state_root: None,
296            transactions: execution_rx,
297        }
298    }
299
300    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
301    #[expect(clippy::type_complexity)]
302    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
303        &self,
304        transactions: I,
305    ) -> (
306        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
307        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
308        usize,
309    ) {
310        // Get the transaction count for prewarming task
311        // Use upper bound if available (more accurate), otherwise use lower bound
312        let (lower, upper) = transactions.size_hint();
313        let transaction_count_hint = upper.unwrap_or(lower);
314
315        let (prewarm_tx, prewarm_rx) = mpsc::channel();
316        let (execute_tx, execute_rx) = mpsc::channel();
317        self.executor.spawn_blocking(move || {
318            for tx in transactions {
319                let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx });
320                // only send Ok(_) variants to prewarming task
321                if let Ok(tx) = &tx {
322                    let _ = prewarm_tx.send(tx.clone());
323                }
324                let _ = execute_tx.send(tx);
325            }
326        });
327
328        (prewarm_rx, execute_rx, transaction_count_hint)
329    }
330
331    /// Spawn prewarming optionally wired to the multiproof task for target updates.
332    fn spawn_caching_with<P>(
333        &self,
334        env: ExecutionEnv<Evm>,
335        mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
336        transaction_count_hint: usize,
337        provider_builder: StateProviderBuilder<N, P>,
338        to_multi_proof: Option<Sender<MultiProofMessage>>,
339    ) -> CacheTaskHandle
340    where
341        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
342    {
343        if self.disable_transaction_prewarming {
344            // if no transactions should be executed we clear them but still spawn the task for
345            // caching updates
346            transactions = mpsc::channel().1;
347        }
348
349        let saved_cache = self.cache_for(env.parent_hash);
350        let cache = saved_cache.cache().clone();
351        let cache_metrics = saved_cache.metrics().clone();
352        // configure prewarming
353        let prewarm_ctx = PrewarmContext {
354            env,
355            evm_config: self.evm_config.clone(),
356            saved_cache,
357            provider: provider_builder,
358            metrics: PrewarmMetrics::default(),
359            terminate_execution: Arc::new(AtomicBool::new(false)),
360            precompile_cache_disabled: self.precompile_cache_disabled,
361            precompile_cache_map: self.precompile_cache_map.clone(),
362        };
363
364        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
365            self.executor.clone(),
366            self.execution_cache.clone(),
367            prewarm_ctx,
368            to_multi_proof,
369            transaction_count_hint,
370            self.prewarm_max_concurrency,
371        );
372
373        // spawn pre-warm task
374        {
375            let to_prewarm_task = to_prewarm_task.clone();
376            self.executor.spawn_blocking(move || {
377                prewarm_task.run(transactions, to_prewarm_task);
378            });
379        }
380
381        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
382    }
383
384    /// Takes the trie input from the inner payload processor, if it exists.
385    pub const fn take_trie_input(&mut self) -> Option<TrieInput> {
386        self.trie_input.take()
387    }
388
389    /// Returns the cache for the given parent hash.
390    ///
391    /// If the given hash is different then what is recently cached, then this will create a new
392    /// instance.
393    #[instrument(target = "engine::caching", skip(self))]
394    fn cache_for(&self, parent_hash: B256) -> SavedCache {
395        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
396            debug!("reusing execution cache");
397            cache
398        } else {
399            debug!("creating new execution cache on cache miss");
400            let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
401            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
402        }
403    }
404
405    /// Spawns the [`SparseTrieTask`] for this payload processor.
406    fn spawn_sparse_trie_task<BPF>(
407        &self,
408        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
409        proof_task_handle: BPF,
410        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
411    ) where
412        BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
413        BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
414        BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
415    {
416        // Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
417        // there's none to reuse.
418        let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
419        let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
420            let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
421                ConfiguredSparseTrie::Serial(Default::default())
422            } else {
423                ConfiguredSparseTrie::Parallel(Box::new(
424                    ParallelSparseTrie::default()
425                        .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
426                ))
427            });
428            ClearedSparseStateTrie::from_state_trie(
429                SparseStateTrie::new()
430                    .with_accounts_trie(default_trie.clone())
431                    .with_default_storage_trie(default_trie)
432                    .with_updates(true),
433            )
434        });
435
436        let task =
437            SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
438                sparse_trie_rx,
439                proof_task_handle,
440                self.trie_metrics.clone(),
441                sparse_state_trie,
442            );
443
444        self.executor.spawn_blocking(move || {
445            let (result, trie) = task.run();
446            // Send state root computation result
447            let _ = state_root_tx.send(result);
448
449            // Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
450            // to the next step, so that time spent clearing doesn't block the step after this one.
451            cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
452        });
453    }
454}
455
456/// Handle to all the spawned tasks.
457#[derive(Debug)]
458pub struct PayloadHandle<Tx, Err> {
459    /// Channel for evm state updates
460    to_multi_proof: Option<Sender<MultiProofMessage>>,
461    // must include the receiver of the state root wired to the sparse trie
462    prewarm_handle: CacheTaskHandle,
463    /// Receiver for the state root
464    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
465    /// Stream of block transactions
466    transactions: mpsc::Receiver<Result<Tx, Err>>,
467}
468
469impl<Tx, Err> PayloadHandle<Tx, Err> {
470    /// Awaits the state root
471    ///
472    /// # Panics
473    ///
474    /// If payload processing was started without background tasks.
475    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
476        self.state_root
477            .take()
478            .expect("state_root is None")
479            .recv()
480            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
481    }
482
483    /// Returns a state hook to be used to send state updates to this task.
484    ///
485    /// If a multiproof task is spawned the hook will notify it about new states.
486    pub fn state_hook(&self) -> impl OnStateHook {
487        // convert the channel into a `StateHookSender` that emits an event on drop
488        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
489
490        move |source: StateChangeSource, state: &EvmState| {
491            if let Some(sender) = &to_multi_proof {
492                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
493            }
494        }
495    }
496
497    /// Returns a clone of the caches used by prewarming
498    pub(super) fn caches(&self) -> StateExecutionCache {
499        self.prewarm_handle.cache.clone()
500    }
501
502    /// Returns a clone of the cache metrics used by prewarming
503    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
504        self.prewarm_handle.cache_metrics.clone()
505    }
506
507    /// Terminates the pre-warming transaction processing.
508    ///
509    /// Note: This does not terminate the task yet.
510    pub(super) fn stop_prewarming_execution(&self) {
511        self.prewarm_handle.stop_prewarming_execution()
512    }
513
514    /// Terminates the entire caching task.
515    ///
516    /// If the [`BundleState`] is provided it will update the shared cache.
517    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
518        self.prewarm_handle.terminate_caching(block_output)
519    }
520
521    /// Returns iterator yielding transactions from the stream.
522    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
523        core::iter::repeat_with(|| self.transactions.recv())
524            .take_while(|res| res.is_ok())
525            .map(|res| res.unwrap())
526    }
527}
528
529/// Access to the spawned [`PrewarmCacheTask`].
530#[derive(Debug)]
531pub(crate) struct CacheTaskHandle {
532    /// The shared cache the task operates with.
533    cache: StateExecutionCache,
534    /// Metrics for the caches
535    cache_metrics: CachedStateMetrics,
536    /// Channel to the spawned prewarm task if any
537    to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
538}
539
540impl CacheTaskHandle {
541    /// Terminates the pre-warming transaction processing.
542    ///
543    /// Note: This does not terminate the task yet.
544    pub(super) fn stop_prewarming_execution(&self) {
545        self.to_prewarm_task
546            .as_ref()
547            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
548    }
549
550    /// Terminates the entire pre-warming task.
551    ///
552    /// If the [`BundleState`] is provided it will update the shared cache.
553    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
554        if let Some(tx) = self.to_prewarm_task.take() {
555            // Only clone when we have an active task and a state to send
556            let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
557            let _ = tx.send(event);
558        }
559    }
560}
561
562impl Drop for CacheTaskHandle {
563    fn drop(&mut self) {
564        // Ensure we always terminate on drop
565        self.terminate_caching(None);
566    }
567}
568
569/// Shared access to most recently used cache.
570///
571/// This cache is intended to used for processing the payload in the following manner:
572///  - Get Cache if the payload's parent block matches the parent block
573///  - Update cache upon successful payload execution
574///
575/// This process assumes that payloads are received sequentially.
576///
577/// ## Cache Safety
578///
579/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
580/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
581/// the cache may be corrupted or cleared.
582///
583/// ## Cache vs Prewarming Distinction
584///
585/// **`ExecutionCache`**:
586/// - Stores parent block's execution state after completion
587/// - Used to fetch parent data for next block's execution
588/// - Must be exclusively accessed during save operations
589///
590/// **`PrewarmCacheTask`**:
591/// - Speculatively loads accounts/storage that might be used in transaction execution
592/// - Prepares data for state root proof computation
593/// - Runs concurrently but must not interfere with cache saves
594#[derive(Clone, Debug, Default)]
595struct ExecutionCache {
596    /// Guarded cloneable cache identified by a block hash.
597    inner: Arc<RwLock<Option<SavedCache>>>,
598}
599
600impl ExecutionCache {
601    /// Returns the cache for `parent_hash` if it's available for use.
602    ///
603    /// A cache is considered available when:
604    /// - It exists and matches the requested parent hash
605    /// - No other tasks are currently using it (checked via Arc reference count)
606    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
607        let cache = self.inner.read();
608        cache
609            .as_ref()
610            .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
611            .cloned()
612    }
613
614    /// Clears the tracked cache
615    #[expect(unused)]
616    pub(crate) fn clear(&self) {
617        self.inner.write().take();
618    }
619
620    /// Updates the cache with a closure that has exclusive access to the guard.
621    /// This ensures that all cache operations happen atomically.
622    ///
623    /// ## CRITICAL SAFETY REQUIREMENT
624    ///
625    /// **Before calling this method, you MUST ensure there are no other active cache users.**
626    /// This includes:
627    /// - No running [`PrewarmCacheTask`] instances that could write to the cache
628    /// - No concurrent transactions that might access the cached state
629    /// - All prewarming operations must be completed or cancelled
630    ///
631    /// Violating this requirement can result in cache corruption, incorrect state data,
632    /// and potential consensus failures.
633    pub(crate) fn update_with_guard<F>(&self, update_fn: F)
634    where
635        F: FnOnce(&mut Option<SavedCache>),
636    {
637        let mut guard = self.inner.write();
638        update_fn(&mut guard);
639    }
640}
641
642/// EVM context required to execute a block.
643#[derive(Debug, Clone)]
644pub struct ExecutionEnv<Evm: ConfigureEvm> {
645    /// Evm environment.
646    pub evm_env: EvmEnvFor<Evm>,
647    /// Hash of the block being executed.
648    pub hash: B256,
649    /// Hash of the parent block.
650    pub parent_hash: B256,
651}
652
653impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
654where
655    EvmEnvFor<Evm>: Default,
656{
657    fn default() -> Self {
658        Self {
659            evm_env: Default::default(),
660            hash: Default::default(),
661            parent_hash: Default::default(),
662        }
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::ExecutionCache;
669    use crate::tree::{
670        cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
671        payload_processor::{
672            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
673        },
674        precompile_cache::PrecompileCacheMap,
675        StateProviderBuilder, TreeConfig,
676    };
677    use alloy_evm::block::StateChangeSource;
678    use rand::Rng;
679    use reth_chainspec::ChainSpec;
680    use reth_db_common::init::init_genesis;
681    use reth_ethereum_primitives::TransactionSigned;
682    use reth_evm::OnStateHook;
683    use reth_evm_ethereum::EthEvmConfig;
684    use reth_primitives_traits::{Account, Recovered, StorageEntry};
685    use reth_provider::{
686        providers::{BlockchainProvider, ConsistentDbView},
687        test_utils::create_test_provider_factory_with_chain_spec,
688        ChainSpecProvider, HashingWriter,
689    };
690    use reth_testing_utils::generators;
691    use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
692    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
693    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
694    use std::sync::Arc;
695
696    fn make_saved_cache(hash: B256) -> SavedCache {
697        let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
698        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
699    }
700
701    #[test]
702    fn execution_cache_allows_single_checkout() {
703        let execution_cache = ExecutionCache::default();
704        let hash = B256::from([1u8; 32]);
705
706        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
707
708        let first = execution_cache.get_cache_for(hash);
709        assert!(first.is_some(), "expected initial checkout to succeed");
710
711        let second = execution_cache.get_cache_for(hash);
712        assert!(second.is_none(), "second checkout should be blocked while guard is active");
713
714        drop(first);
715
716        let third = execution_cache.get_cache_for(hash);
717        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
718    }
719
720    #[test]
721    fn execution_cache_checkout_releases_on_drop() {
722        let execution_cache = ExecutionCache::default();
723        let hash = B256::from([2u8; 32]);
724
725        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
726
727        {
728            let guard = execution_cache.get_cache_for(hash);
729            assert!(guard.is_some(), "expected checkout to succeed");
730            // Guard dropped at end of scope
731        }
732
733        let retry = execution_cache.get_cache_for(hash);
734        assert!(retry.is_some(), "checkout should succeed after guard drop");
735    }
736
737    #[test]
738    fn execution_cache_mismatch_parent_returns_none() {
739        let execution_cache = ExecutionCache::default();
740        let hash = B256::from([3u8; 32]);
741
742        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
743
744        let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
745        assert!(miss.is_none(), "checkout should fail for different parent hash");
746    }
747
748    #[test]
749    fn execution_cache_update_after_release_succeeds() {
750        let execution_cache = ExecutionCache::default();
751        let initial = B256::from([5u8; 32]);
752
753        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
754
755        let guard =
756            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
757
758        drop(guard);
759
760        let updated = B256::from([6u8; 32]);
761        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
762
763        let new_checkout = execution_cache.get_cache_for(updated);
764        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
765    }
766
767    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
768        let mut rng = generators::rng();
769        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
770        let mut updates = Vec::with_capacity(updates_per_account);
771
772        for _ in 0..updates_per_account {
773            let num_accounts_in_update = rng.random_range(1..=num_accounts);
774            let mut state_update = EvmState::default();
775
776            let selected_addresses = &all_addresses[0..num_accounts_in_update];
777
778            for &address in selected_addresses {
779                let mut storage = HashMap::default();
780                if rng.random_bool(0.7) {
781                    for _ in 0..rng.random_range(1..10) {
782                        let slot = U256::from(rng.random::<u64>());
783                        storage.insert(
784                            slot,
785                            EvmStorageSlot::new_changed(
786                                U256::ZERO,
787                                U256::from(rng.random::<u64>()),
788                                0,
789                            ),
790                        );
791                    }
792                }
793
794                let account = revm_state::Account {
795                    info: AccountInfo {
796                        balance: U256::from(rng.random::<u64>()),
797                        nonce: rng.random::<u64>(),
798                        code_hash: KECCAK_EMPTY,
799                        code: Some(Default::default()),
800                    },
801                    storage,
802                    status: AccountStatus::Touched,
803                    transaction_id: 0,
804                };
805
806                state_update.insert(address, account);
807            }
808
809            updates.push(state_update);
810        }
811
812        updates
813    }
814
815    #[test]
816    fn test_state_root() {
817        reth_tracing::init_test_tracing();
818
819        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
820        let genesis_hash = init_genesis(&factory).unwrap();
821
822        let state_updates = create_mock_state_updates(10, 10);
823        let mut hashed_state = HashedPostState::default();
824        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
825            HashMap::default();
826
827        {
828            let provider_rw = factory.provider_rw().expect("failed to get provider");
829
830            for update in &state_updates {
831                let account_updates = update.iter().map(|(address, account)| {
832                    (*address, Some(Account::from_revm_account(account)))
833                });
834                provider_rw
835                    .insert_account_for_hashing(account_updates)
836                    .expect("failed to insert accounts");
837
838                let storage_updates = update.iter().map(|(address, account)| {
839                    let storage_entries = account.storage.iter().map(|(slot, value)| {
840                        StorageEntry { key: B256::from(*slot), value: value.present_value }
841                    });
842                    (*address, storage_entries)
843                });
844                provider_rw
845                    .insert_storage_for_hashing(storage_updates)
846                    .expect("failed to insert storage");
847            }
848            provider_rw.commit().expect("failed to commit changes");
849        }
850
851        for update in &state_updates {
852            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
853
854            for (address, account) in update {
855                let storage: HashMap<B256, U256> = account
856                    .storage
857                    .iter()
858                    .map(|(k, v)| (B256::from(*k), v.present_value))
859                    .collect();
860
861                let entry = accumulated_state.entry(*address).or_default();
862                entry.0 = Account::from_revm_account(account);
863                entry.1.extend(storage);
864            }
865        }
866
867        let mut payload_processor = PayloadProcessor::new(
868            WorkloadExecutor::default(),
869            EthEvmConfig::new(factory.chain_spec()),
870            &TreeConfig::default(),
871            PrecompileCacheMap::default(),
872        );
873        let provider = BlockchainProvider::new(factory).unwrap();
874        let mut handle =
875            payload_processor
876                .spawn(
877                    Default::default(),
878                    core::iter::empty::<
879                        Result<Recovered<TransactionSigned>, core::convert::Infallible>,
880                    >(),
881                    StateProviderBuilder::new(provider.clone(), genesis_hash, None),
882                    ConsistentDbView::new_with_latest_tip(provider).unwrap(),
883                    TrieInput::from_state(hashed_state),
884                    &TreeConfig::default(),
885                )
886                .map_err(|(err, ..)| err)
887                .expect("failed to spawn payload processor");
888
889        let mut state_hook = handle.state_hook();
890
891        for (i, update) in state_updates.into_iter().enumerate() {
892            state_hook.on_state(StateChangeSource::Transaction(i), &update);
893        }
894        drop(state_hook);
895
896        let root_from_task = handle.state_root().expect("task failed").state_root;
897        let root_from_regular = state_root(accumulated_state);
898
899        assert_eq!(
900            root_from_task, root_from_regular,
901            "State root mismatch: task={root_from_task}, base={root_from_regular}"
902        );
903    }
904}