reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5    payload_processor::{
6        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7        sparse_trie::StateRootComputeOutcome,
8    },
9    sparse_trie::SparseTrieTask,
10    StateProviderBuilder, TreeConfig,
11};
12use alloy_evm::{block::StateChangeSource, ToTxEnv};
13use alloy_primitives::B256;
14use executor::WorkloadExecutor;
15use multiproof::{SparseTrieUpdate, *};
16use parking_lot::RwLock;
17use prewarm::PrewarmMetrics;
18use reth_engine_primitives::ExecutableTxIterator;
19use reth_evm::{
20    execute::{ExecutableTxFor, WithTxEnv},
21    ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
22};
23use reth_primitives_traits::NodePrimitives;
24use reth_provider::{
25    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
26    StateReader,
27};
28use reth_revm::{db::BundleState, state::EvmState};
29use reth_trie::TrieInput;
30use reth_trie_parallel::{
31    proof_task::{ProofTaskCtx, ProofTaskManager},
32    root::ParallelStateRootError,
33};
34use reth_trie_sparse::{
35    provider::{TrieNodeProvider, TrieNodeProviderFactory},
36    ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
37};
38use std::sync::{
39    atomic::AtomicBool,
40    mpsc::{self, channel, Sender},
41    Arc,
42};
43
44use super::precompile_cache::PrecompileCacheMap;
45
46mod configured_sparse_trie;
47pub mod executor;
48pub mod multiproof;
49pub mod prewarm;
50pub mod sparse_trie;
51
52use configured_sparse_trie::ConfiguredSparseTrie;
53
54/// Entrypoint for executing the payload.
55#[derive(Debug)]
56pub struct PayloadProcessor<Evm>
57where
58    Evm: ConfigureEvm,
59{
60    /// The executor used by to spawn tasks.
61    executor: WorkloadExecutor,
62    /// The most recent cache used for execution.
63    execution_cache: ExecutionCache,
64    /// Metrics for trie operations
65    trie_metrics: MultiProofTaskMetrics,
66    /// Cross-block cache size in bytes.
67    cross_block_cache_size: u64,
68    /// Whether transactions should not be executed on prewarming task.
69    disable_transaction_prewarming: bool,
70    /// Determines how to configure the evm for execution.
71    evm_config: Evm,
72    /// Whether precompile cache should be disabled.
73    precompile_cache_disabled: bool,
74    /// Precompile cache map.
75    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
76    /// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
77    /// that allocations can be minimized.
78    sparse_state_trie: Arc<
79        parking_lot::Mutex<Option<ClearedSparseStateTrie<ConfiguredSparseTrie, SerialSparseTrie>>>,
80    >,
81    /// Whether to use the parallel sparse trie.
82    disable_parallel_sparse_trie: bool,
83    /// A cleared trie input, kept around to be reused so allocations can be minimized.
84    trie_input: Option<TrieInput>,
85}
86
87impl<N, Evm> PayloadProcessor<Evm>
88where
89    N: NodePrimitives,
90    Evm: ConfigureEvm<Primitives = N>,
91{
92    /// Creates a new payload processor.
93    pub fn new(
94        executor: WorkloadExecutor,
95        evm_config: Evm,
96        config: &TreeConfig,
97        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
98    ) -> Self {
99        Self {
100            executor,
101            execution_cache: Default::default(),
102            trie_metrics: Default::default(),
103            cross_block_cache_size: config.cross_block_cache_size(),
104            disable_transaction_prewarming: config.disable_caching_and_prewarming(),
105            evm_config,
106            precompile_cache_disabled: config.precompile_cache_disabled(),
107            precompile_cache_map,
108            sparse_state_trie: Arc::default(),
109            trie_input: None,
110            disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
111        }
112    }
113}
114
115impl<N, Evm> PayloadProcessor<Evm>
116where
117    N: NodePrimitives,
118    Evm: ConfigureEvm<Primitives = N> + 'static,
119{
120    /// Spawns all background tasks and returns a handle connected to the tasks.
121    ///
122    /// - Transaction prewarming task
123    /// - State root task
124    /// - Sparse trie task
125    ///
126    /// # Transaction prewarming task
127    ///
128    /// Responsible for feeding state updates to the multi proof task.
129    ///
130    /// This task runs until:
131    ///  - externally cancelled (e.g. sequential block execution is complete)
132    ///
133    /// ## Multi proof task
134    ///
135    /// Responsible for preparing sparse trie messages for the sparse trie task.
136    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
137    /// output back to this task.
138    ///
139    /// Receives updates from sequential execution.
140    /// This task runs until it receives a shutdown signal, which should be after the block
141    /// was fully executed.
142    ///
143    /// ## Sparse trie task
144    ///
145    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
146    ///
147    /// This task runs until there are no further updates to process.
148    ///
149    ///
150    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
151    /// canceling)
152    pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
153        &mut self,
154        env: ExecutionEnv<Evm>,
155        transactions: I,
156        provider_builder: StateProviderBuilder<N, P>,
157        consistent_view: ConsistentDbView<P>,
158        trie_input: TrieInput,
159        config: &TreeConfig,
160    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
161    where
162        P: DatabaseProviderFactory<Provider: BlockReader>
163            + BlockReader
164            + StateProviderFactory
165            + StateReader
166            + Clone
167            + 'static,
168    {
169        let (to_sparse_trie, sparse_trie_rx) = channel();
170        // spawn multiproof task, save the trie input
171        let (trie_input, state_root_config) =
172            MultiProofConfig::new_from_input(consistent_view, trie_input);
173        self.trie_input = Some(trie_input);
174
175        // Create and spawn the storage proof task
176        let task_ctx = ProofTaskCtx::new(
177            state_root_config.nodes_sorted.clone(),
178            state_root_config.state_sorted.clone(),
179            state_root_config.prefix_sets.clone(),
180        );
181        let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
182        let proof_task = ProofTaskManager::new(
183            self.executor.handle().clone(),
184            state_root_config.consistent_view.clone(),
185            task_ctx,
186            max_proof_task_concurrency,
187        );
188
189        // We set it to half of the proof task concurrency, because often for each multiproof we
190        // spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
191        let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
192        let multi_proof_task = MultiProofTask::new(
193            state_root_config,
194            self.executor.clone(),
195            proof_task.handle(),
196            to_sparse_trie,
197            max_multi_proof_task_concurrency,
198        );
199
200        // wire the multiproof task to the prewarm task
201        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
202
203        let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
204
205        let prewarm_handle =
206            self.spawn_caching_with(env, prewarm_rx, provider_builder, to_multi_proof.clone());
207
208        // spawn multi-proof task
209        self.executor.spawn_blocking(move || {
210            multi_proof_task.run();
211        });
212
213        // wire the sparse trie to the state root response receiver
214        let (state_root_tx, state_root_rx) = channel();
215
216        // Spawn the sparse trie task using any stored trie and parallel trie configuration.
217        self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
218
219        // spawn the proof task
220        self.executor.spawn_blocking(move || {
221            if let Err(err) = proof_task.run() {
222                // At least log if there is an error at any point
223                tracing::error!(
224                    target: "engine::root",
225                    ?err,
226                    "Storage proof task returned an error"
227                );
228            }
229        });
230
231        PayloadHandle {
232            to_multi_proof,
233            prewarm_handle,
234            state_root: Some(state_root_rx),
235            transactions: execution_rx,
236        }
237    }
238
239    /// Spawn cache prewarming exclusively.
240    ///
241    /// Returns a [`PayloadHandle`] to communicate with the task.
242    pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
243        &self,
244        env: ExecutionEnv<Evm>,
245        transactions: I,
246        provider_builder: StateProviderBuilder<N, P>,
247    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
248    where
249        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
250    {
251        let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
252        let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
253        PayloadHandle {
254            to_multi_proof: None,
255            prewarm_handle,
256            state_root: None,
257            transactions: execution_rx,
258        }
259    }
260
261    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
262    #[expect(clippy::type_complexity)]
263    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
264        &self,
265        transactions: I,
266    ) -> (
267        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
268        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
269    ) {
270        let (prewarm_tx, prewarm_rx) = mpsc::channel();
271        let (execute_tx, execute_rx) = mpsc::channel();
272        self.executor.spawn_blocking(move || {
273            for tx in transactions {
274                let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx });
275                // only send Ok(_) variants to prewarming task
276                if let Ok(tx) = &tx {
277                    let _ = prewarm_tx.send(tx.clone());
278                }
279                let _ = execute_tx.send(tx);
280            }
281        });
282
283        (prewarm_rx, execute_rx)
284    }
285
286    /// Spawn prewarming optionally wired to the multiproof task for target updates.
287    fn spawn_caching_with<P>(
288        &self,
289        env: ExecutionEnv<Evm>,
290        mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
291        provider_builder: StateProviderBuilder<N, P>,
292        to_multi_proof: Option<Sender<MultiProofMessage>>,
293    ) -> CacheTaskHandle
294    where
295        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
296    {
297        if self.disable_transaction_prewarming {
298            // if no transactions should be executed we clear them but still spawn the task for
299            // caching updates
300            transactions = mpsc::channel().1;
301        }
302
303        let (cache, cache_metrics) = self.cache_for(env.parent_hash).split();
304        // configure prewarming
305        let prewarm_ctx = PrewarmContext {
306            env,
307            evm_config: self.evm_config.clone(),
308            cache: cache.clone(),
309            cache_metrics: cache_metrics.clone(),
310            provider: provider_builder,
311            metrics: PrewarmMetrics::default(),
312            terminate_execution: Arc::new(AtomicBool::new(false)),
313            precompile_cache_disabled: self.precompile_cache_disabled,
314            precompile_cache_map: self.precompile_cache_map.clone(),
315        };
316
317        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
318            self.executor.clone(),
319            self.execution_cache.clone(),
320            prewarm_ctx,
321            to_multi_proof,
322        );
323
324        // spawn pre-warm task
325        {
326            let to_prewarm_task = to_prewarm_task.clone();
327            self.executor.spawn_blocking(move || {
328                prewarm_task.run(transactions, to_prewarm_task);
329            });
330        }
331
332        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
333    }
334
335    /// Takes the trie input from the inner payload processor, if it exists.
336    pub const fn take_trie_input(&mut self) -> Option<TrieInput> {
337        self.trie_input.take()
338    }
339
340    /// Returns the cache for the given parent hash.
341    ///
342    /// If the given hash is different then what is recently cached, then this will create a new
343    /// instance.
344    fn cache_for(&self, parent_hash: B256) -> SavedCache {
345        self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
346            let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
347            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
348        })
349    }
350
351    /// Spawns the [`SparseTrieTask`] for this payload processor.
352    fn spawn_sparse_trie_task<BPF>(
353        &self,
354        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
355        proof_task_handle: BPF,
356        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
357    ) where
358        BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
359        BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
360        BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
361    {
362        // Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
363        // there's none to reuse.
364        let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
365        let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
366            let accounts_trie = if self.disable_parallel_sparse_trie {
367                ConfiguredSparseTrie::Serial(Default::default())
368            } else {
369                ConfiguredSparseTrie::Parallel(Default::default())
370            };
371            ClearedSparseStateTrie::from_state_trie(
372                SparseStateTrie::new()
373                    .with_accounts_trie(SparseTrie::Blind(Some(Box::new(accounts_trie))))
374                    .with_updates(true),
375            )
376        });
377
378        let task =
379            SparseTrieTask::<_, ConfiguredSparseTrie, SerialSparseTrie>::new_with_cleared_trie(
380                sparse_trie_rx,
381                proof_task_handle,
382                self.trie_metrics.clone(),
383                sparse_state_trie,
384            );
385
386        self.executor.spawn_blocking(move || {
387            let (result, trie) = task.run();
388            // Send state root computation result
389            let _ = state_root_tx.send(result);
390
391            // Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
392            // to the next step, so that time spent clearing doesn't block the step after this one.
393            cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
394        });
395    }
396}
397
398/// Handle to all the spawned tasks.
399#[derive(Debug)]
400pub struct PayloadHandle<Tx, Err> {
401    /// Channel for evm state updates
402    to_multi_proof: Option<Sender<MultiProofMessage>>,
403    // must include the receiver of the state root wired to the sparse trie
404    prewarm_handle: CacheTaskHandle,
405    /// Receiver for the state root
406    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
407    /// Stream of block transactions
408    transactions: mpsc::Receiver<Result<Tx, Err>>,
409}
410
411impl<Tx, Err> PayloadHandle<Tx, Err> {
412    /// Awaits the state root
413    ///
414    /// # Panics
415    ///
416    /// If payload processing was started without background tasks.
417    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
418        self.state_root
419            .take()
420            .expect("state_root is None")
421            .recv()
422            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
423    }
424
425    /// Returns a state hook to be used to send state updates to this task.
426    ///
427    /// If a multiproof task is spawned the hook will notify it about new states.
428    pub fn state_hook(&self) -> impl OnStateHook {
429        // convert the channel into a `StateHookSender` that emits an event on drop
430        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
431
432        move |source: StateChangeSource, state: &EvmState| {
433            if let Some(sender) = &to_multi_proof {
434                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
435            }
436        }
437    }
438
439    /// Returns a clone of the caches used by prewarming
440    pub(super) fn caches(&self) -> ProviderCaches {
441        self.prewarm_handle.cache.clone()
442    }
443
444    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
445        self.prewarm_handle.cache_metrics.clone()
446    }
447
448    /// Terminates the pre-warming transaction processing.
449    ///
450    /// Note: This does not terminate the task yet.
451    pub(super) fn stop_prewarming_execution(&self) {
452        self.prewarm_handle.stop_prewarming_execution()
453    }
454
455    /// Terminates the entire caching task.
456    ///
457    /// If the [`BundleState`] is provided it will update the shared cache.
458    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
459        self.prewarm_handle.terminate_caching(block_output)
460    }
461
462    /// Returns iterator yielding transactions from the stream.
463    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
464        core::iter::repeat_with(|| self.transactions.recv())
465            .take_while(|res| res.is_ok())
466            .map(|res| res.unwrap())
467    }
468}
469
470/// Access to the spawned [`PrewarmCacheTask`].
471#[derive(Debug)]
472pub(crate) struct CacheTaskHandle {
473    /// The shared cache the task operates with.
474    cache: ProviderCaches,
475    /// Metrics for the caches
476    cache_metrics: CachedStateMetrics,
477    /// Channel to the spawned prewarm task if any
478    to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
479}
480
481impl CacheTaskHandle {
482    /// Terminates the pre-warming transaction processing.
483    ///
484    /// Note: This does not terminate the task yet.
485    pub(super) fn stop_prewarming_execution(&self) {
486        self.to_prewarm_task
487            .as_ref()
488            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
489    }
490
491    /// Terminates the entire pre-warming task.
492    ///
493    /// If the [`BundleState`] is provided it will update the shared cache.
494    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
495        self.to_prewarm_task
496            .take()
497            .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
498    }
499}
500
501impl Drop for CacheTaskHandle {
502    fn drop(&mut self) {
503        // Ensure we always terminate on drop
504        self.terminate_caching(None);
505    }
506}
507
508/// Shared access to most recently used cache.
509///
510/// This cache is intended to used for processing the payload in the following manner:
511///  - Get Cache if the payload's parent block matches the parent block
512///  - Update cache upon successful payload execution
513///
514/// This process assumes that payloads are received sequentially.
515#[derive(Clone, Debug, Default)]
516struct ExecutionCache {
517    /// Guarded cloneable cache identified by a block hash.
518    inner: Arc<RwLock<Option<SavedCache>>>,
519}
520
521impl ExecutionCache {
522    /// Returns the cache if the currently store cache is for the given `parent_hash`
523    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
524        let cache = self.inner.read();
525        cache
526            .as_ref()
527            .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
528    }
529
530    /// Clears the tracked cache
531    #[expect(unused)]
532    pub(crate) fn clear(&self) {
533        self.inner.write().take();
534    }
535
536    /// Stores the provider cache
537    pub(crate) fn save_cache(&self, cache: SavedCache) {
538        self.inner.write().replace(cache);
539    }
540}
541
542/// EVM context required to execute a block.
543#[derive(Debug, Clone)]
544pub struct ExecutionEnv<Evm: ConfigureEvm> {
545    /// Evm environment.
546    pub evm_env: EvmEnvFor<Evm>,
547    /// Hash of the block being executed.
548    pub hash: B256,
549    /// Hash of the parent block.
550    pub parent_hash: B256,
551}
552
553impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
554where
555    EvmEnvFor<Evm>: Default,
556{
557    fn default() -> Self {
558        Self {
559            evm_env: Default::default(),
560            hash: Default::default(),
561            parent_hash: Default::default(),
562        }
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use crate::tree::{
569        payload_processor::{
570            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
571        },
572        precompile_cache::PrecompileCacheMap,
573        StateProviderBuilder, TreeConfig,
574    };
575    use alloy_evm::block::StateChangeSource;
576    use rand::Rng;
577    use reth_chainspec::ChainSpec;
578    use reth_db_common::init::init_genesis;
579    use reth_ethereum_primitives::TransactionSigned;
580    use reth_evm::OnStateHook;
581    use reth_evm_ethereum::EthEvmConfig;
582    use reth_primitives_traits::{Account, Recovered, StorageEntry};
583    use reth_provider::{
584        providers::{BlockchainProvider, ConsistentDbView},
585        test_utils::create_test_provider_factory_with_chain_spec,
586        ChainSpecProvider, HashingWriter,
587    };
588    use reth_testing_utils::generators;
589    use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
590    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
591    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
592    use std::sync::Arc;
593
594    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
595        let mut rng = generators::rng();
596        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
597        let mut updates = Vec::with_capacity(updates_per_account);
598
599        for _ in 0..updates_per_account {
600            let num_accounts_in_update = rng.random_range(1..=num_accounts);
601            let mut state_update = EvmState::default();
602
603            let selected_addresses = &all_addresses[0..num_accounts_in_update];
604
605            for &address in selected_addresses {
606                let mut storage = HashMap::default();
607                if rng.random_bool(0.7) {
608                    for _ in 0..rng.random_range(1..10) {
609                        let slot = U256::from(rng.random::<u64>());
610                        storage.insert(
611                            slot,
612                            EvmStorageSlot::new_changed(
613                                U256::ZERO,
614                                U256::from(rng.random::<u64>()),
615                                0,
616                            ),
617                        );
618                    }
619                }
620
621                let account = revm_state::Account {
622                    info: AccountInfo {
623                        balance: U256::from(rng.random::<u64>()),
624                        nonce: rng.random::<u64>(),
625                        code_hash: KECCAK_EMPTY,
626                        code: Some(Default::default()),
627                    },
628                    storage,
629                    status: AccountStatus::Touched,
630                    transaction_id: 0,
631                };
632
633                state_update.insert(address, account);
634            }
635
636            updates.push(state_update);
637        }
638
639        updates
640    }
641
642    #[test]
643    fn test_state_root() {
644        reth_tracing::init_test_tracing();
645
646        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
647        let genesis_hash = init_genesis(&factory).unwrap();
648
649        let state_updates = create_mock_state_updates(10, 10);
650        let mut hashed_state = HashedPostState::default();
651        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
652            HashMap::default();
653
654        {
655            let provider_rw = factory.provider_rw().expect("failed to get provider");
656
657            for update in &state_updates {
658                let account_updates = update.iter().map(|(address, account)| {
659                    (*address, Some(Account::from_revm_account(account)))
660                });
661                provider_rw
662                    .insert_account_for_hashing(account_updates)
663                    .expect("failed to insert accounts");
664
665                let storage_updates = update.iter().map(|(address, account)| {
666                    let storage_entries = account.storage.iter().map(|(slot, value)| {
667                        StorageEntry { key: B256::from(*slot), value: value.present_value }
668                    });
669                    (*address, storage_entries)
670                });
671                provider_rw
672                    .insert_storage_for_hashing(storage_updates)
673                    .expect("failed to insert storage");
674            }
675            provider_rw.commit().expect("failed to commit changes");
676        }
677
678        for update in &state_updates {
679            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
680
681            for (address, account) in update {
682                let storage: HashMap<B256, U256> = account
683                    .storage
684                    .iter()
685                    .map(|(k, v)| (B256::from(*k), v.present_value))
686                    .collect();
687
688                let entry = accumulated_state.entry(*address).or_default();
689                entry.0 = Account::from_revm_account(account);
690                entry.1.extend(storage);
691            }
692        }
693
694        let mut payload_processor = PayloadProcessor::new(
695            WorkloadExecutor::default(),
696            EthEvmConfig::new(factory.chain_spec()),
697            &TreeConfig::default(),
698            PrecompileCacheMap::default(),
699        );
700        let provider = BlockchainProvider::new(factory).unwrap();
701        let mut handle = payload_processor.spawn(
702            Default::default(),
703            core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
704            StateProviderBuilder::new(provider.clone(), genesis_hash, None),
705            ConsistentDbView::new_with_latest_tip(provider).unwrap(),
706            TrieInput::from_state(hashed_state),
707            &TreeConfig::default(),
708        );
709
710        let mut state_hook = handle.state_hook();
711
712        for (i, update) in state_updates.into_iter().enumerate() {
713            state_hook.on_state(StateChangeSource::Transaction(i), &update);
714        }
715        drop(state_hook);
716
717        let root_from_task = handle.state_root().expect("task failed").state_root;
718        let root_from_regular = state_root(accumulated_state);
719
720        assert_eq!(
721            root_from_task, root_from_regular,
722            "State root mismatch: task={root_from_task}, base={root_from_regular}"
723        );
724    }
725}