reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{
6        CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7        SavedCache,
8    },
9    payload_processor::{
10        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11        sparse_trie::StateRootComputeOutcome,
12    },
13    sparse_trie::SparseTrieTask,
14    StateProviderBuilder, TreeConfig,
15};
16use alloy_evm::{block::StateChangeSource, ToTxEnv};
17use alloy_primitives::B256;
18use executor::WorkloadExecutor;
19use multiproof::{SparseTrieUpdate, *};
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use reth_engine_primitives::ExecutableTxIterator;
23use reth_evm::{
24    execute::{ExecutableTxFor, WithTxEnv},
25    ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
26};
27use reth_primitives_traits::NodePrimitives;
28use reth_provider::{
29    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
30    StateReader,
31};
32use reth_revm::{db::BundleState, state::EvmState};
33use reth_trie::TrieInput;
34use reth_trie_parallel::{
35    proof_task::{ProofTaskCtx, ProofTaskManager},
36    root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39    provider::{TrieNodeProvider, TrieNodeProviderFactory},
40    ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
41};
42use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
43use std::sync::{
44    atomic::AtomicBool,
45    mpsc::{self, channel, Sender},
46    Arc,
47};
48use tracing::{debug, instrument};
49
50mod configured_sparse_trie;
51pub mod executor;
52pub mod multiproof;
53pub mod prewarm;
54pub mod sparse_trie;
55
56use configured_sparse_trie::ConfiguredSparseTrie;
57
58/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
59///
60/// These values were determined by performing benchmarks using gradually increasing values to judge
61/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
62/// would deteriorate to the point where PST might as well not be used.
63pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
64    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
65
66/// Entrypoint for executing the payload.
67#[derive(Debug)]
68pub struct PayloadProcessor<Evm>
69where
70    Evm: ConfigureEvm,
71{
72    /// The executor used by to spawn tasks.
73    executor: WorkloadExecutor,
74    /// The most recent cache used for execution.
75    execution_cache: ExecutionCache,
76    /// Metrics for trie operations
77    trie_metrics: MultiProofTaskMetrics,
78    /// Cross-block cache size in bytes.
79    cross_block_cache_size: u64,
80    /// Whether transactions should not be executed on prewarming task.
81    disable_transaction_prewarming: bool,
82    /// Determines how to configure the evm for execution.
83    evm_config: Evm,
84    /// Whether precompile cache should be disabled.
85    precompile_cache_disabled: bool,
86    /// Precompile cache map.
87    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
88    /// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
89    /// that allocations can be minimized.
90    sparse_state_trie: Arc<
91        parking_lot::Mutex<
92            Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
93        >,
94    >,
95    /// Whether to disable the parallel sparse trie.
96    disable_parallel_sparse_trie: bool,
97    /// A cleared trie input, kept around to be reused so allocations can be minimized.
98    trie_input: Option<TrieInput>,
99}
100
101impl<N, Evm> PayloadProcessor<Evm>
102where
103    N: NodePrimitives,
104    Evm: ConfigureEvm<Primitives = N>,
105{
106    /// Creates a new payload processor.
107    pub fn new(
108        executor: WorkloadExecutor,
109        evm_config: Evm,
110        config: &TreeConfig,
111        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
112    ) -> Self {
113        Self {
114            executor,
115            execution_cache: Default::default(),
116            trie_metrics: Default::default(),
117            cross_block_cache_size: config.cross_block_cache_size(),
118            disable_transaction_prewarming: config.disable_caching_and_prewarming(),
119            evm_config,
120            precompile_cache_disabled: config.precompile_cache_disabled(),
121            precompile_cache_map,
122            sparse_state_trie: Arc::default(),
123            trie_input: None,
124            disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
125        }
126    }
127}
128
129impl<N, Evm> PayloadProcessor<Evm>
130where
131    N: NodePrimitives,
132    Evm: ConfigureEvm<Primitives = N> + 'static,
133{
134    /// Spawns all background tasks and returns a handle connected to the tasks.
135    ///
136    /// - Transaction prewarming task
137    /// - State root task
138    /// - Sparse trie task
139    ///
140    /// # Transaction prewarming task
141    ///
142    /// Responsible for feeding state updates to the multi proof task.
143    ///
144    /// This task runs until:
145    ///  - externally cancelled (e.g. sequential block execution is complete)
146    ///
147    /// ## Multi proof task
148    ///
149    /// Responsible for preparing sparse trie messages for the sparse trie task.
150    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
151    /// output back to this task.
152    ///
153    /// Receives updates from sequential execution.
154    /// This task runs until it receives a shutdown signal, which should be after the block
155    /// was fully executed.
156    ///
157    /// ## Sparse trie task
158    ///
159    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
160    ///
161    /// This task runs until there are no further updates to process.
162    ///
163    ///
164    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
165    /// canceling)
166    pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
167        &mut self,
168        env: ExecutionEnv<Evm>,
169        transactions: I,
170        provider_builder: StateProviderBuilder<N, P>,
171        consistent_view: ConsistentDbView<P>,
172        trie_input: TrieInput,
173        config: &TreeConfig,
174    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
175    where
176        P: DatabaseProviderFactory<Provider: BlockReader>
177            + BlockReader
178            + StateProviderFactory
179            + StateReader
180            + Clone
181            + 'static,
182    {
183        let (to_sparse_trie, sparse_trie_rx) = channel();
184        // spawn multiproof task, save the trie input
185        let (trie_input, state_root_config) =
186            MultiProofConfig::new_from_input(consistent_view, trie_input);
187        self.trie_input = Some(trie_input);
188
189        // Create and spawn the storage proof task
190        let task_ctx = ProofTaskCtx::new(
191            state_root_config.nodes_sorted.clone(),
192            state_root_config.state_sorted.clone(),
193            state_root_config.prefix_sets.clone(),
194        );
195        let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
196        let proof_task = ProofTaskManager::new(
197            self.executor.handle().clone(),
198            state_root_config.consistent_view.clone(),
199            task_ctx,
200            max_proof_task_concurrency,
201        );
202
203        // We set it to half of the proof task concurrency, because often for each multiproof we
204        // spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
205        let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
206        let multi_proof_task = MultiProofTask::new(
207            state_root_config,
208            self.executor.clone(),
209            proof_task.handle(),
210            to_sparse_trie,
211            max_multi_proof_task_concurrency,
212        );
213
214        // wire the multiproof task to the prewarm task
215        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
216
217        let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
218
219        let prewarm_handle =
220            self.spawn_caching_with(env, prewarm_rx, provider_builder, to_multi_proof.clone());
221
222        // spawn multi-proof task
223        self.executor.spawn_blocking(move || {
224            multi_proof_task.run();
225        });
226
227        // wire the sparse trie to the state root response receiver
228        let (state_root_tx, state_root_rx) = channel();
229
230        // Spawn the sparse trie task using any stored trie and parallel trie configuration.
231        self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
232
233        // spawn the proof task
234        self.executor.spawn_blocking(move || {
235            if let Err(err) = proof_task.run() {
236                // At least log if there is an error at any point
237                tracing::error!(
238                    target: "engine::root",
239                    ?err,
240                    "Storage proof task returned an error"
241                );
242            }
243        });
244
245        PayloadHandle {
246            to_multi_proof,
247            prewarm_handle,
248            state_root: Some(state_root_rx),
249            transactions: execution_rx,
250        }
251    }
252
253    /// Spawn cache prewarming exclusively.
254    ///
255    /// Returns a [`PayloadHandle`] to communicate with the task.
256    pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
257        &self,
258        env: ExecutionEnv<Evm>,
259        transactions: I,
260        provider_builder: StateProviderBuilder<N, P>,
261    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
262    where
263        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
264    {
265        let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
266        let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
267        PayloadHandle {
268            to_multi_proof: None,
269            prewarm_handle,
270            state_root: None,
271            transactions: execution_rx,
272        }
273    }
274
275    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
276    #[expect(clippy::type_complexity)]
277    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
278        &self,
279        transactions: I,
280    ) -> (
281        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
282        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
283    ) {
284        let (prewarm_tx, prewarm_rx) = mpsc::channel();
285        let (execute_tx, execute_rx) = mpsc::channel();
286        self.executor.spawn_blocking(move || {
287            for tx in transactions {
288                let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx });
289                // only send Ok(_) variants to prewarming task
290                if let Ok(tx) = &tx {
291                    let _ = prewarm_tx.send(tx.clone());
292                }
293                let _ = execute_tx.send(tx);
294            }
295        });
296
297        (prewarm_rx, execute_rx)
298    }
299
300    /// Spawn prewarming optionally wired to the multiproof task for target updates.
301    fn spawn_caching_with<P>(
302        &self,
303        env: ExecutionEnv<Evm>,
304        mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
305        provider_builder: StateProviderBuilder<N, P>,
306        to_multi_proof: Option<Sender<MultiProofMessage>>,
307    ) -> CacheTaskHandle
308    where
309        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
310    {
311        if self.disable_transaction_prewarming {
312            // if no transactions should be executed we clear them but still spawn the task for
313            // caching updates
314            transactions = mpsc::channel().1;
315        }
316
317        let (cache, cache_metrics) = self.cache_for(env.parent_hash).split();
318        // configure prewarming
319        let prewarm_ctx = PrewarmContext {
320            env,
321            evm_config: self.evm_config.clone(),
322            cache: cache.clone(),
323            cache_metrics: cache_metrics.clone(),
324            provider: provider_builder,
325            metrics: PrewarmMetrics::default(),
326            terminate_execution: Arc::new(AtomicBool::new(false)),
327            precompile_cache_disabled: self.precompile_cache_disabled,
328            precompile_cache_map: self.precompile_cache_map.clone(),
329        };
330
331        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
332            self.executor.clone(),
333            self.execution_cache.clone(),
334            prewarm_ctx,
335            to_multi_proof,
336        );
337
338        // spawn pre-warm task
339        {
340            let to_prewarm_task = to_prewarm_task.clone();
341            self.executor.spawn_blocking(move || {
342                prewarm_task.run(transactions, to_prewarm_task);
343            });
344        }
345
346        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
347    }
348
349    /// Takes the trie input from the inner payload processor, if it exists.
350    pub const fn take_trie_input(&mut self) -> Option<TrieInput> {
351        self.trie_input.take()
352    }
353
354    /// Returns the cache for the given parent hash.
355    ///
356    /// If the given hash is different then what is recently cached, then this will create a new
357    /// instance.
358    #[instrument(target = "engine::caching", skip(self))]
359    fn cache_for(&self, parent_hash: B256) -> SavedCache {
360        self.execution_cache
361            .get_cache_for(parent_hash)
362            .inspect(|_| debug!("reusing execution cache"))
363            .unwrap_or_else(|| {
364                debug!("creating new execution cache on cache miss");
365                let cache =
366                    ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
367                SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
368            })
369    }
370
371    /// Spawns the [`SparseTrieTask`] for this payload processor.
372    fn spawn_sparse_trie_task<BPF>(
373        &self,
374        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
375        proof_task_handle: BPF,
376        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
377    ) where
378        BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
379        BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
380        BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
381    {
382        // Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
383        // there's none to reuse.
384        let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
385        let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
386            let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
387                ConfiguredSparseTrie::Serial(Default::default())
388            } else {
389                ConfiguredSparseTrie::Parallel(Box::new(
390                    ParallelSparseTrie::default()
391                        .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
392                ))
393            });
394            ClearedSparseStateTrie::from_state_trie(
395                SparseStateTrie::new()
396                    .with_accounts_trie(default_trie.clone())
397                    .with_default_storage_trie(default_trie)
398                    .with_updates(true),
399            )
400        });
401
402        let task =
403            SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
404                sparse_trie_rx,
405                proof_task_handle,
406                self.trie_metrics.clone(),
407                sparse_state_trie,
408            );
409
410        self.executor.spawn_blocking(move || {
411            let (result, trie) = task.run();
412            // Send state root computation result
413            let _ = state_root_tx.send(result);
414
415            // Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
416            // to the next step, so that time spent clearing doesn't block the step after this one.
417            cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
418        });
419    }
420}
421
422/// Handle to all the spawned tasks.
423#[derive(Debug)]
424pub struct PayloadHandle<Tx, Err> {
425    /// Channel for evm state updates
426    to_multi_proof: Option<Sender<MultiProofMessage>>,
427    // must include the receiver of the state root wired to the sparse trie
428    prewarm_handle: CacheTaskHandle,
429    /// Receiver for the state root
430    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
431    /// Stream of block transactions
432    transactions: mpsc::Receiver<Result<Tx, Err>>,
433}
434
435impl<Tx, Err> PayloadHandle<Tx, Err> {
436    /// Awaits the state root
437    ///
438    /// # Panics
439    ///
440    /// If payload processing was started without background tasks.
441    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
442        self.state_root
443            .take()
444            .expect("state_root is None")
445            .recv()
446            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
447    }
448
449    /// Returns a state hook to be used to send state updates to this task.
450    ///
451    /// If a multiproof task is spawned the hook will notify it about new states.
452    pub fn state_hook(&self) -> impl OnStateHook {
453        // convert the channel into a `StateHookSender` that emits an event on drop
454        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
455
456        move |source: StateChangeSource, state: &EvmState| {
457            if let Some(sender) = &to_multi_proof {
458                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
459            }
460        }
461    }
462
463    /// Returns a clone of the caches used by prewarming
464    pub(super) fn caches(&self) -> StateExecutionCache {
465        self.prewarm_handle.cache.clone()
466    }
467
468    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
469        self.prewarm_handle.cache_metrics.clone()
470    }
471
472    /// Terminates the pre-warming transaction processing.
473    ///
474    /// Note: This does not terminate the task yet.
475    pub(super) fn stop_prewarming_execution(&self) {
476        self.prewarm_handle.stop_prewarming_execution()
477    }
478
479    /// Terminates the entire caching task.
480    ///
481    /// If the [`BundleState`] is provided it will update the shared cache.
482    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
483        self.prewarm_handle.terminate_caching(block_output)
484    }
485
486    /// Returns iterator yielding transactions from the stream.
487    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
488        core::iter::repeat_with(|| self.transactions.recv())
489            .take_while(|res| res.is_ok())
490            .map(|res| res.unwrap())
491    }
492}
493
494/// Access to the spawned [`PrewarmCacheTask`].
495#[derive(Debug)]
496pub(crate) struct CacheTaskHandle {
497    /// The shared cache the task operates with.
498    cache: StateExecutionCache,
499    /// Metrics for the caches
500    cache_metrics: CachedStateMetrics,
501    /// Channel to the spawned prewarm task if any
502    to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
503}
504
505impl CacheTaskHandle {
506    /// Terminates the pre-warming transaction processing.
507    ///
508    /// Note: This does not terminate the task yet.
509    pub(super) fn stop_prewarming_execution(&self) {
510        self.to_prewarm_task
511            .as_ref()
512            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
513    }
514
515    /// Terminates the entire pre-warming task.
516    ///
517    /// If the [`BundleState`] is provided it will update the shared cache.
518    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
519        self.to_prewarm_task
520            .take()
521            .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
522    }
523}
524
525impl Drop for CacheTaskHandle {
526    fn drop(&mut self) {
527        // Ensure we always terminate on drop
528        self.terminate_caching(None);
529    }
530}
531
532/// Shared access to most recently used cache.
533///
534/// This cache is intended to used for processing the payload in the following manner:
535///  - Get Cache if the payload's parent block matches the parent block
536///  - Update cache upon successful payload execution
537///
538/// This process assumes that payloads are received sequentially.
539///
540/// ## Cache Safety
541///
542/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
543/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
544/// the cache may be corrupted or cleared.
545///
546/// ## Cache vs Prewarming Distinction
547///
548/// **`ExecutionCache`**:
549/// - Stores parent block's execution state after completion
550/// - Used to fetch parent data for next block's execution
551/// - Must be exclusively accessed during save operations
552///
553/// **`PrewarmCacheTask`**:
554/// - Speculatively loads accounts/storage that might be used in transaction execution
555/// - Prepares data for state root proof computation
556/// - Runs concurrently but must not interfere with cache saves
557#[derive(Clone, Debug, Default)]
558struct ExecutionCache {
559    /// Guarded cloneable cache identified by a block hash.
560    inner: Arc<RwLock<Option<SavedCache>>>,
561}
562
563impl ExecutionCache {
564    /// Returns the cache if the currently store cache is for the given `parent_hash`
565    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
566        let cache = self.inner.read();
567        cache
568            .as_ref()
569            .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
570    }
571
572    /// Clears the tracked cache
573    #[expect(unused)]
574    pub(crate) fn clear(&self) {
575        self.inner.write().take();
576    }
577
578    /// Updates the cache with a closure that has exclusive access to the guard.
579    /// This ensures that all cache operations happen atomically.
580    ///
581    /// ## CRITICAL SAFETY REQUIREMENT
582    ///
583    /// **Before calling this method, you MUST ensure there are no other active cache users.**
584    /// This includes:
585    /// - No running [`PrewarmCacheTask`] instances that could write to the cache
586    /// - No concurrent transactions that might access the cached state
587    /// - All prewarming operations must be completed or cancelled
588    ///
589    /// Violating this requirement can result in cache corruption, incorrect state data,
590    /// and potential consensus failures.
591    pub(crate) fn update_with_guard<F>(&self, update_fn: F)
592    where
593        F: FnOnce(&mut Option<SavedCache>),
594    {
595        let mut guard = self.inner.write();
596        update_fn(&mut guard);
597    }
598}
599
600/// EVM context required to execute a block.
601#[derive(Debug, Clone)]
602pub struct ExecutionEnv<Evm: ConfigureEvm> {
603    /// Evm environment.
604    pub evm_env: EvmEnvFor<Evm>,
605    /// Hash of the block being executed.
606    pub hash: B256,
607    /// Hash of the parent block.
608    pub parent_hash: B256,
609}
610
611impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
612where
613    EvmEnvFor<Evm>: Default,
614{
615    fn default() -> Self {
616        Self {
617            evm_env: Default::default(),
618            hash: Default::default(),
619            parent_hash: Default::default(),
620        }
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use crate::tree::{
627        payload_processor::{
628            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
629        },
630        precompile_cache::PrecompileCacheMap,
631        StateProviderBuilder, TreeConfig,
632    };
633    use alloy_evm::block::StateChangeSource;
634    use rand::Rng;
635    use reth_chainspec::ChainSpec;
636    use reth_db_common::init::init_genesis;
637    use reth_ethereum_primitives::TransactionSigned;
638    use reth_evm::OnStateHook;
639    use reth_evm_ethereum::EthEvmConfig;
640    use reth_primitives_traits::{Account, Recovered, StorageEntry};
641    use reth_provider::{
642        providers::{BlockchainProvider, ConsistentDbView},
643        test_utils::create_test_provider_factory_with_chain_spec,
644        ChainSpecProvider, HashingWriter,
645    };
646    use reth_testing_utils::generators;
647    use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
648    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
649    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
650    use std::sync::Arc;
651
652    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
653        let mut rng = generators::rng();
654        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
655        let mut updates = Vec::with_capacity(updates_per_account);
656
657        for _ in 0..updates_per_account {
658            let num_accounts_in_update = rng.random_range(1..=num_accounts);
659            let mut state_update = EvmState::default();
660
661            let selected_addresses = &all_addresses[0..num_accounts_in_update];
662
663            for &address in selected_addresses {
664                let mut storage = HashMap::default();
665                if rng.random_bool(0.7) {
666                    for _ in 0..rng.random_range(1..10) {
667                        let slot = U256::from(rng.random::<u64>());
668                        storage.insert(
669                            slot,
670                            EvmStorageSlot::new_changed(
671                                U256::ZERO,
672                                U256::from(rng.random::<u64>()),
673                                0,
674                            ),
675                        );
676                    }
677                }
678
679                let account = revm_state::Account {
680                    info: AccountInfo {
681                        balance: U256::from(rng.random::<u64>()),
682                        nonce: rng.random::<u64>(),
683                        code_hash: KECCAK_EMPTY,
684                        code: Some(Default::default()),
685                    },
686                    storage,
687                    status: AccountStatus::Touched,
688                    transaction_id: 0,
689                };
690
691                state_update.insert(address, account);
692            }
693
694            updates.push(state_update);
695        }
696
697        updates
698    }
699
700    #[test]
701    fn test_state_root() {
702        reth_tracing::init_test_tracing();
703
704        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
705        let genesis_hash = init_genesis(&factory).unwrap();
706
707        let state_updates = create_mock_state_updates(10, 10);
708        let mut hashed_state = HashedPostState::default();
709        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
710            HashMap::default();
711
712        {
713            let provider_rw = factory.provider_rw().expect("failed to get provider");
714
715            for update in &state_updates {
716                let account_updates = update.iter().map(|(address, account)| {
717                    (*address, Some(Account::from_revm_account(account)))
718                });
719                provider_rw
720                    .insert_account_for_hashing(account_updates)
721                    .expect("failed to insert accounts");
722
723                let storage_updates = update.iter().map(|(address, account)| {
724                    let storage_entries = account.storage.iter().map(|(slot, value)| {
725                        StorageEntry { key: B256::from(*slot), value: value.present_value }
726                    });
727                    (*address, storage_entries)
728                });
729                provider_rw
730                    .insert_storage_for_hashing(storage_updates)
731                    .expect("failed to insert storage");
732            }
733            provider_rw.commit().expect("failed to commit changes");
734        }
735
736        for update in &state_updates {
737            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
738
739            for (address, account) in update {
740                let storage: HashMap<B256, U256> = account
741                    .storage
742                    .iter()
743                    .map(|(k, v)| (B256::from(*k), v.present_value))
744                    .collect();
745
746                let entry = accumulated_state.entry(*address).or_default();
747                entry.0 = Account::from_revm_account(account);
748                entry.1.extend(storage);
749            }
750        }
751
752        let mut payload_processor = PayloadProcessor::new(
753            WorkloadExecutor::default(),
754            EthEvmConfig::new(factory.chain_spec()),
755            &TreeConfig::default(),
756            PrecompileCacheMap::default(),
757        );
758        let provider = BlockchainProvider::new(factory).unwrap();
759        let mut handle = payload_processor.spawn(
760            Default::default(),
761            core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
762            StateProviderBuilder::new(provider.clone(), genesis_hash, None),
763            ConsistentDbView::new_with_latest_tip(provider).unwrap(),
764            TrieInput::from_state(hashed_state),
765            &TreeConfig::default(),
766        );
767
768        let mut state_hook = handle.state_hook();
769
770        for (i, update) in state_updates.into_iter().enumerate() {
771            state_hook.on_state(StateChangeSource::Transaction(i), &update);
772        }
773        drop(state_hook);
774
775        let root_from_task = handle.state_root().expect("task failed").state_root;
776        let root_from_regular = state_root(accumulated_state);
777
778        assert_eq!(
779            root_from_task, root_from_regular,
780            "State root mismatch: task={root_from_task}, base={root_from_regular}"
781        );
782    }
783}