1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{
6        CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7        SavedCache,
8    },
9    payload_processor::{
10        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11        sparse_trie::StateRootComputeOutcome,
12    },
13    sparse_trie::SparseTrieTask,
14    StateProviderBuilder, TreeConfig,
15};
16use alloy_evm::{block::StateChangeSource, ToTxEnv};
17use alloy_primitives::B256;
18use crossbeam_channel::Sender as CrossbeamSender;
19use executor::WorkloadExecutor;
20use multiproof::{SparseTrieUpdate, *};
21use parking_lot::RwLock;
22use prewarm::PrewarmMetrics;
23use reth_engine_primitives::ExecutableTxIterator;
24use reth_evm::{
25    execute::{ExecutableTxFor, WithTxEnv},
26    ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
27};
28use reth_primitives_traits::NodePrimitives;
29use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
30use reth_revm::{db::BundleState, state::EvmState};
31use reth_trie::{
32    hashed_cursor::HashedCursorFactory, prefix_set::TriePrefixSetsMut,
33    trie_cursor::TrieCursorFactory,
34};
35use reth_trie_parallel::{
36    proof_task::{ProofTaskCtx, ProofWorkerHandle},
37    root::ParallelStateRootError,
38};
39use reth_trie_sparse::{
40    provider::{TrieNodeProvider, TrieNodeProviderFactory},
41    ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
42};
43use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
44use std::{
45    sync::{
46        atomic::AtomicBool,
47        mpsc::{self, channel},
48        Arc,
49    },
50    time::Instant,
51};
52use tracing::{debug, debug_span, instrument, warn};
53
54mod configured_sparse_trie;
55pub mod executor;
56pub mod multiproof;
57pub mod prewarm;
58pub mod sparse_trie;
59
60use configured_sparse_trie::ConfiguredSparseTrie;
61
62pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
68    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
69
70pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
79
80pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
92
93#[derive(Debug)]
95pub struct PayloadProcessor<Evm>
96where
97    Evm: ConfigureEvm,
98{
99    executor: WorkloadExecutor,
101    execution_cache: ExecutionCache,
103    trie_metrics: MultiProofTaskMetrics,
105    cross_block_cache_size: u64,
107    disable_transaction_prewarming: bool,
109    evm_config: Evm,
111    precompile_cache_disabled: bool,
113    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
115    sparse_state_trie: Arc<
118        parking_lot::Mutex<
119            Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
120        >,
121    >,
122    disable_parallel_sparse_trie: bool,
124    prewarm_max_concurrency: usize,
126}
127
128impl<N, Evm> PayloadProcessor<Evm>
129where
130    N: NodePrimitives,
131    Evm: ConfigureEvm<Primitives = N>,
132{
133    pub fn new(
135        executor: WorkloadExecutor,
136        evm_config: Evm,
137        config: &TreeConfig,
138        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
139    ) -> Self {
140        Self {
141            executor,
142            execution_cache: Default::default(),
143            trie_metrics: Default::default(),
144            cross_block_cache_size: config.cross_block_cache_size(),
145            disable_transaction_prewarming: config.disable_prewarming(),
146            evm_config,
147            precompile_cache_disabled: config.precompile_cache_disabled(),
148            precompile_cache_map,
149            sparse_state_trie: Arc::default(),
150            disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
151            prewarm_max_concurrency: config.prewarm_max_concurrency(),
152        }
153    }
154}
155
156impl<N, Evm> PayloadProcessor<Evm>
157where
158    N: NodePrimitives,
159    Evm: ConfigureEvm<Primitives = N> + 'static,
160{
161    #[allow(clippy::type_complexity)]
194    #[instrument(
195        level = "debug",
196        target = "engine::tree::payload_processor",
197        name = "payload processor",
198        skip_all
199    )]
200    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
201        &mut self,
202        env: ExecutionEnv<Evm>,
203        transactions: I,
204        provider_builder: StateProviderBuilder<N, P>,
205        multiproof_provider_factory: F,
206        config: &TreeConfig,
207    ) -> Result<
208        PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
209        (ParallelStateRootError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
210    >
211    where
212        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
213        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
214            + Clone
215            + Send
216            + 'static,
217    {
218        let span = tracing::Span::current();
219        let (to_sparse_trie, sparse_trie_rx) = channel();
220
221        let prefix_sets = Arc::new(TriePrefixSetsMut::default());
226
227        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory, prefix_sets);
229        let storage_worker_count = config.storage_worker_count();
230        let account_worker_count = config.account_worker_count();
231        let proof_handle = ProofWorkerHandle::new(
232            self.executor.handle().clone(),
233            task_ctx,
234            storage_worker_count,
235            account_worker_count,
236        );
237
238        let multi_proof_task = MultiProofTask::new(
239            proof_handle.clone(),
240            to_sparse_trie,
241            config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
242        );
243
244        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
246
247        let (prewarm_rx, execution_rx, transaction_count_hint) =
248            self.spawn_tx_iterator(transactions);
249
250        let prewarm_handle = self.spawn_caching_with(
251            env,
252            prewarm_rx,
253            transaction_count_hint,
254            provider_builder,
255            to_multi_proof.clone(),
256        );
257
258        self.executor.spawn_blocking(move || {
260            let _enter = span.entered();
261            multi_proof_task.run();
262        });
263
264        let (state_root_tx, state_root_rx) = channel();
266
267        self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
269
270        Ok(PayloadHandle {
271            to_multi_proof,
272            prewarm_handle,
273            state_root: Some(state_root_rx),
274            transactions: execution_rx,
275        })
276    }
277
278    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
282    pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
283        &self,
284        env: ExecutionEnv<Evm>,
285        transactions: I,
286        provider_builder: StateProviderBuilder<N, P>,
287    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
288    where
289        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
290    {
291        let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
292        let prewarm_handle =
293            self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
294        PayloadHandle {
295            to_multi_proof: None,
296            prewarm_handle,
297            state_root: None,
298            transactions: execution_rx,
299        }
300    }
301
302    #[expect(clippy::type_complexity)]
304    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
305        &self,
306        transactions: I,
307    ) -> (
308        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
309        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
310        usize,
311    ) {
312        let (lower, upper) = transactions.size_hint();
315        let transaction_count_hint = upper.unwrap_or(lower);
316
317        let (prewarm_tx, prewarm_rx) = mpsc::channel();
318        let (execute_tx, execute_rx) = mpsc::channel();
319        self.executor.spawn_blocking(move || {
320            for tx in transactions {
321                let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
322                if let Ok(tx) = &tx {
324                    let _ = prewarm_tx.send(tx.clone());
325                }
326                let _ = execute_tx.send(tx);
327            }
328        });
329
330        (prewarm_rx, execute_rx, transaction_count_hint)
331    }
332
333    fn spawn_caching_with<P>(
335        &self,
336        env: ExecutionEnv<Evm>,
337        mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
338        transaction_count_hint: usize,
339        provider_builder: StateProviderBuilder<N, P>,
340        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
341    ) -> CacheTaskHandle
342    where
343        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
344    {
345        if self.disable_transaction_prewarming {
346            transactions = mpsc::channel().1;
349        }
350
351        let saved_cache = self.cache_for(env.parent_hash);
352        let cache = saved_cache.cache().clone();
353        let cache_metrics = saved_cache.metrics().clone();
354        let prewarm_ctx = PrewarmContext {
356            env,
357            evm_config: self.evm_config.clone(),
358            saved_cache,
359            provider: provider_builder,
360            metrics: PrewarmMetrics::default(),
361            terminate_execution: Arc::new(AtomicBool::new(false)),
362            precompile_cache_disabled: self.precompile_cache_disabled,
363            precompile_cache_map: self.precompile_cache_map.clone(),
364        };
365
366        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
367            self.executor.clone(),
368            self.execution_cache.clone(),
369            prewarm_ctx,
370            to_multi_proof,
371            transaction_count_hint,
372            self.prewarm_max_concurrency,
373        );
374
375        {
377            let to_prewarm_task = to_prewarm_task.clone();
378            let span = debug_span!(target: "engine::tree::payload_processor", "prewarm task");
379            self.executor.spawn_blocking(move || {
380                let _enter = span.entered();
381                prewarm_task.run(transactions, to_prewarm_task);
382            });
383        }
384
385        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
386    }
387
388    #[instrument(level = "debug", target = "engine::caching", skip(self))]
393    fn cache_for(&self, parent_hash: B256) -> SavedCache {
394        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
395            debug!("reusing execution cache");
396            cache
397        } else {
398            debug!("creating new execution cache on cache miss");
399            let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
400            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
401        }
402    }
403
404    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
406    fn spawn_sparse_trie_task<BPF>(
407        &self,
408        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
409        proof_worker_handle: BPF,
410        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
411    ) where
412        BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
413        BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
414        BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
415    {
416        let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
419        let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
420            let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
421                ConfiguredSparseTrie::Serial(Default::default())
422            } else {
423                ConfiguredSparseTrie::Parallel(Box::new(
424                    ParallelSparseTrie::default()
425                        .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
426                ))
427            });
428            ClearedSparseStateTrie::from_state_trie(
429                SparseStateTrie::new()
430                    .with_accounts_trie(default_trie.clone())
431                    .with_default_storage_trie(default_trie)
432                    .with_updates(true),
433            )
434        });
435
436        let task =
437            SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
438                sparse_trie_rx,
439                proof_worker_handle,
440                self.trie_metrics.clone(),
441                sparse_state_trie,
442            );
443
444        let span = tracing::Span::current();
445        self.executor.spawn_blocking(move || {
446            let _enter = span.entered();
447
448            let (result, trie) = task.run();
449            let _ = state_root_tx.send(result);
451
452            let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
456            let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
457
458            cleared_trie.shrink_to(
460                SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
461                SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
462            );
463
464            cleared_sparse_trie.lock().replace(cleared_trie);
465        });
466    }
467}
468
469#[derive(Debug)]
471pub struct PayloadHandle<Tx, Err> {
472    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
474    prewarm_handle: CacheTaskHandle,
476    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
478    transactions: mpsc::Receiver<Result<Tx, Err>>,
480}
481
482impl<Tx, Err> PayloadHandle<Tx, Err> {
483    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
489    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
490        self.state_root
491            .take()
492            .expect("state_root is None")
493            .recv()
494            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
495    }
496
497    pub fn state_hook(&self) -> impl OnStateHook {
501        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
503
504        move |source: StateChangeSource, state: &EvmState| {
505            if let Some(sender) = &to_multi_proof {
506                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
507            }
508        }
509    }
510
511    pub(super) fn caches(&self) -> StateExecutionCache {
513        self.prewarm_handle.cache.clone()
514    }
515
516    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
518        self.prewarm_handle.cache_metrics.clone()
519    }
520
521    pub(super) fn stop_prewarming_execution(&self) {
525        self.prewarm_handle.stop_prewarming_execution()
526    }
527
528    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
532        self.prewarm_handle.terminate_caching(block_output)
533    }
534
535    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
537        core::iter::repeat_with(|| self.transactions.recv())
538            .take_while(|res| res.is_ok())
539            .map(|res| res.unwrap())
540    }
541}
542
543#[derive(Debug)]
545pub(crate) struct CacheTaskHandle {
546    cache: StateExecutionCache,
548    cache_metrics: CachedStateMetrics,
550    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
552}
553
554impl CacheTaskHandle {
555    pub(super) fn stop_prewarming_execution(&self) {
559        self.to_prewarm_task
560            .as_ref()
561            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
562    }
563
564    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
568        if let Some(tx) = self.to_prewarm_task.take() {
569            let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
571            let _ = tx.send(event);
572        }
573    }
574}
575
576impl Drop for CacheTaskHandle {
577    fn drop(&mut self) {
578        self.terminate_caching(None);
580    }
581}
582
583#[derive(Clone, Debug, Default)]
609struct ExecutionCache {
610    inner: Arc<RwLock<Option<SavedCache>>>,
612}
613
614impl ExecutionCache {
615    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
621    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
622        let start = Instant::now();
623        let cache = self.inner.read();
624
625        let elapsed = start.elapsed();
626        if elapsed.as_millis() > 5 {
627            warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
628        }
629
630        cache
631            .as_ref()
632            .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
633            .cloned()
634    }
635
636    #[expect(unused)]
638    pub(crate) fn clear(&self) {
639        self.inner.write().take();
640    }
641
642    pub(crate) fn update_with_guard<F>(&self, update_fn: F)
656    where
657        F: FnOnce(&mut Option<SavedCache>),
658    {
659        let mut guard = self.inner.write();
660        update_fn(&mut guard);
661    }
662}
663
664#[derive(Debug, Clone)]
666pub struct ExecutionEnv<Evm: ConfigureEvm> {
667    pub evm_env: EvmEnvFor<Evm>,
669    pub hash: B256,
671    pub parent_hash: B256,
673}
674
675impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
676where
677    EvmEnvFor<Evm>: Default,
678{
679    fn default() -> Self {
680        Self {
681            evm_env: Default::default(),
682            hash: Default::default(),
683            parent_hash: Default::default(),
684        }
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::ExecutionCache;
691    use crate::tree::{
692        cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
693        payload_processor::{
694            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
695        },
696        precompile_cache::PrecompileCacheMap,
697        StateProviderBuilder, TreeConfig,
698    };
699    use alloy_evm::block::StateChangeSource;
700    use rand::Rng;
701    use reth_chainspec::ChainSpec;
702    use reth_db_common::init::init_genesis;
703    use reth_ethereum_primitives::TransactionSigned;
704    use reth_evm::OnStateHook;
705    use reth_evm_ethereum::EthEvmConfig;
706    use reth_primitives_traits::{Account, Recovered, StorageEntry};
707    use reth_provider::{
708        providers::{BlockchainProvider, OverlayStateProviderFactory},
709        test_utils::create_test_provider_factory_with_chain_spec,
710        ChainSpecProvider, HashingWriter,
711    };
712    use reth_testing_utils::generators;
713    use reth_trie::{test_utils::state_root, HashedPostState};
714    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
715    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
716    use std::sync::Arc;
717
718    fn make_saved_cache(hash: B256) -> SavedCache {
719        let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
720        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
721    }
722
723    #[test]
724    fn execution_cache_allows_single_checkout() {
725        let execution_cache = ExecutionCache::default();
726        let hash = B256::from([1u8; 32]);
727
728        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
729
730        let first = execution_cache.get_cache_for(hash);
731        assert!(first.is_some(), "expected initial checkout to succeed");
732
733        let second = execution_cache.get_cache_for(hash);
734        assert!(second.is_none(), "second checkout should be blocked while guard is active");
735
736        drop(first);
737
738        let third = execution_cache.get_cache_for(hash);
739        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
740    }
741
742    #[test]
743    fn execution_cache_checkout_releases_on_drop() {
744        let execution_cache = ExecutionCache::default();
745        let hash = B256::from([2u8; 32]);
746
747        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
748
749        {
750            let guard = execution_cache.get_cache_for(hash);
751            assert!(guard.is_some(), "expected checkout to succeed");
752            }
754
755        let retry = execution_cache.get_cache_for(hash);
756        assert!(retry.is_some(), "checkout should succeed after guard drop");
757    }
758
759    #[test]
760    fn execution_cache_mismatch_parent_returns_none() {
761        let execution_cache = ExecutionCache::default();
762        let hash = B256::from([3u8; 32]);
763
764        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
765
766        let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
767        assert!(miss.is_none(), "checkout should fail for different parent hash");
768    }
769
770    #[test]
771    fn execution_cache_update_after_release_succeeds() {
772        let execution_cache = ExecutionCache::default();
773        let initial = B256::from([5u8; 32]);
774
775        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
776
777        let guard =
778            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
779
780        drop(guard);
781
782        let updated = B256::from([6u8; 32]);
783        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
784
785        let new_checkout = execution_cache.get_cache_for(updated);
786        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
787    }
788
789    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
790        let mut rng = generators::rng();
791        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
792        let mut updates = Vec::with_capacity(updates_per_account);
793
794        for _ in 0..updates_per_account {
795            let num_accounts_in_update = rng.random_range(1..=num_accounts);
796            let mut state_update = EvmState::default();
797
798            let selected_addresses = &all_addresses[0..num_accounts_in_update];
799
800            for &address in selected_addresses {
801                let mut storage = HashMap::default();
802                if rng.random_bool(0.7) {
803                    for _ in 0..rng.random_range(1..10) {
804                        let slot = U256::from(rng.random::<u64>());
805                        storage.insert(
806                            slot,
807                            EvmStorageSlot::new_changed(
808                                U256::ZERO,
809                                U256::from(rng.random::<u64>()),
810                                0,
811                            ),
812                        );
813                    }
814                }
815
816                let account = revm_state::Account {
817                    info: AccountInfo {
818                        balance: U256::from(rng.random::<u64>()),
819                        nonce: rng.random::<u64>(),
820                        code_hash: KECCAK_EMPTY,
821                        code: Some(Default::default()),
822                    },
823                    storage,
824                    status: AccountStatus::Touched,
825                    transaction_id: 0,
826                };
827
828                state_update.insert(address, account);
829            }
830
831            updates.push(state_update);
832        }
833
834        updates
835    }
836
837    #[test]
838    fn test_state_root() {
839        reth_tracing::init_test_tracing();
840
841        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
842        let genesis_hash = init_genesis(&factory).unwrap();
843
844        let state_updates = create_mock_state_updates(10, 10);
845        let mut hashed_state = HashedPostState::default();
846        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
847            HashMap::default();
848
849        {
850            let provider_rw = factory.provider_rw().expect("failed to get provider");
851
852            for update in &state_updates {
853                let account_updates = update.iter().map(|(address, account)| {
854                    (*address, Some(Account::from_revm_account(account)))
855                });
856                provider_rw
857                    .insert_account_for_hashing(account_updates)
858                    .expect("failed to insert accounts");
859
860                let storage_updates = update.iter().map(|(address, account)| {
861                    let storage_entries = account.storage.iter().map(|(slot, value)| {
862                        StorageEntry { key: B256::from(*slot), value: value.present_value }
863                    });
864                    (*address, storage_entries)
865                });
866                provider_rw
867                    .insert_storage_for_hashing(storage_updates)
868                    .expect("failed to insert storage");
869            }
870            provider_rw.commit().expect("failed to commit changes");
871        }
872
873        for update in &state_updates {
874            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
875
876            for (address, account) in update {
877                let storage: HashMap<B256, U256> = account
878                    .storage
879                    .iter()
880                    .map(|(k, v)| (B256::from(*k), v.present_value))
881                    .collect();
882
883                let entry = accumulated_state.entry(*address).or_default();
884                entry.0 = Account::from_revm_account(account);
885                entry.1.extend(storage);
886            }
887        }
888
889        let mut payload_processor = PayloadProcessor::new(
890            WorkloadExecutor::default(),
891            EthEvmConfig::new(factory.chain_spec()),
892            &TreeConfig::default(),
893            PrecompileCacheMap::default(),
894        );
895
896        let provider_factory = BlockchainProvider::new(factory).unwrap();
897
898        let mut handle =
899            payload_processor
900                .spawn(
901                    Default::default(),
902                    core::iter::empty::<
903                        Result<Recovered<TransactionSigned>, core::convert::Infallible>,
904                    >(),
905                    StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
906                    OverlayStateProviderFactory::new(provider_factory),
907                    &TreeConfig::default(),
908                )
909                .map_err(|(err, ..)| err)
910                .expect("failed to spawn payload processor");
911
912        let mut state_hook = handle.state_hook();
913
914        for (i, update) in state_updates.into_iter().enumerate() {
915            state_hook.on_state(StateChangeSource::Transaction(i), &update);
916        }
917        drop(state_hook);
918
919        let root_from_task = handle.state_root().expect("task failed").state_root;
920        let root_from_regular = state_root(accumulated_state);
921
922        assert_eq!(
923            root_from_task, root_from_regular,
924            "State root mismatch: task={root_from_task}, base={root_from_regular}"
925        );
926    }
927}