reth_engine_tree/tree/payload_processor/
mod.rs1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 cached_state::{
6 CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7 SavedCache,
8 },
9 payload_processor::{
10 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11 sparse_trie::StateRootComputeOutcome,
12 },
13 sparse_trie::SparseTrieTask,
14 StateProviderBuilder, TreeConfig,
15};
16use alloy_evm::{block::StateChangeSource, ToTxEnv};
17use alloy_primitives::B256;
18use executor::WorkloadExecutor;
19use multiproof::{SparseTrieUpdate, *};
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use reth_engine_primitives::ExecutableTxIterator;
23use reth_evm::{
24 execute::{ExecutableTxFor, WithTxEnv},
25 ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
26};
27use reth_primitives_traits::NodePrimitives;
28use reth_provider::{
29 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
30 StateReader,
31};
32use reth_revm::{db::BundleState, state::EvmState};
33use reth_trie::TrieInput;
34use reth_trie_parallel::{
35 proof_task::{ProofTaskCtx, ProofTaskManager},
36 root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39 provider::{TrieNodeProvider, TrieNodeProviderFactory},
40 ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
41};
42use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
43use std::sync::{
44 atomic::AtomicBool,
45 mpsc::{self, channel, Sender},
46 Arc,
47};
48use tracing::{debug, instrument, warn};
49
50mod configured_sparse_trie;
51pub mod executor;
52pub mod multiproof;
53pub mod prewarm;
54pub mod sparse_trie;
55
56use configured_sparse_trie::ConfiguredSparseTrie;
57
58pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
64 ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
65
66#[derive(Debug)]
68pub struct PayloadProcessor<Evm>
69where
70 Evm: ConfigureEvm,
71{
72 executor: WorkloadExecutor,
74 execution_cache: ExecutionCache,
76 trie_metrics: MultiProofTaskMetrics,
78 cross_block_cache_size: u64,
80 disable_transaction_prewarming: bool,
82 evm_config: Evm,
84 precompile_cache_disabled: bool,
86 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
88 sparse_state_trie: Arc<
91 parking_lot::Mutex<
92 Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
93 >,
94 >,
95 disable_parallel_sparse_trie: bool,
97 trie_input: Option<TrieInput>,
99 prewarm_max_concurrency: usize,
101}
102
103impl<N, Evm> PayloadProcessor<Evm>
104where
105 N: NodePrimitives,
106 Evm: ConfigureEvm<Primitives = N>,
107{
108 pub fn new(
110 executor: WorkloadExecutor,
111 evm_config: Evm,
112 config: &TreeConfig,
113 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
114 ) -> Self {
115 Self {
116 executor,
117 execution_cache: Default::default(),
118 trie_metrics: Default::default(),
119 cross_block_cache_size: config.cross_block_cache_size(),
120 disable_transaction_prewarming: config.disable_caching_and_prewarming(),
121 evm_config,
122 precompile_cache_disabled: config.precompile_cache_disabled(),
123 precompile_cache_map,
124 sparse_state_trie: Arc::default(),
125 trie_input: None,
126 disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
127 prewarm_max_concurrency: config.prewarm_max_concurrency(),
128 }
129 }
130}
131
132impl<N, Evm> PayloadProcessor<Evm>
133where
134 N: NodePrimitives,
135 Evm: ConfigureEvm<Primitives = N> + 'static,
136{
137 #[allow(clippy::type_complexity)]
173 pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
174 &mut self,
175 env: ExecutionEnv<Evm>,
176 transactions: I,
177 provider_builder: StateProviderBuilder<N, P>,
178 consistent_view: ConsistentDbView<P>,
179 trie_input: TrieInput,
180 config: &TreeConfig,
181 ) -> Result<
182 PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
183 (reth_provider::ProviderError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
184 >
185 where
186 P: DatabaseProviderFactory<Provider: BlockReader>
187 + BlockReader
188 + StateProviderFactory
189 + StateReader
190 + Clone
191 + 'static,
192 {
193 let (to_sparse_trie, sparse_trie_rx) = channel();
194 let (trie_input, state_root_config) =
196 MultiProofConfig::new_from_input(consistent_view, trie_input);
197 self.trie_input = Some(trie_input);
198
199 let task_ctx = ProofTaskCtx::new(
201 state_root_config.nodes_sorted.clone(),
202 state_root_config.state_sorted.clone(),
203 state_root_config.prefix_sets.clone(),
204 );
205 let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
206 let storage_worker_count = config.storage_worker_count();
207 let proof_task = match ProofTaskManager::new(
208 self.executor.handle().clone(),
209 state_root_config.consistent_view.clone(),
210 task_ctx,
211 max_proof_task_concurrency,
212 storage_worker_count,
213 ) {
214 Ok(task) => task,
215 Err(error) => {
216 return Err((error, transactions, env, provider_builder));
217 }
218 };
219
220 let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
223 let multi_proof_task = MultiProofTask::new(
224 state_root_config,
225 self.executor.clone(),
226 proof_task.handle(),
227 to_sparse_trie,
228 max_multi_proof_task_concurrency,
229 config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
230 );
231
232 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
234
235 let (prewarm_rx, execution_rx, transaction_count_hint) =
236 self.spawn_tx_iterator(transactions);
237
238 let prewarm_handle = self.spawn_caching_with(
239 env,
240 prewarm_rx,
241 transaction_count_hint,
242 provider_builder,
243 to_multi_proof.clone(),
244 );
245
246 self.executor.spawn_blocking(move || {
248 multi_proof_task.run();
249 });
250
251 let (state_root_tx, state_root_rx) = channel();
253
254 self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
256
257 self.executor.spawn_blocking(move || {
259 if let Err(err) = proof_task.run() {
260 tracing::error!(
262 target: "engine::root",
263 ?err,
264 "Storage proof task returned an error"
265 );
266 }
267 });
268
269 Ok(PayloadHandle {
270 to_multi_proof,
271 prewarm_handle,
272 state_root: Some(state_root_rx),
273 transactions: execution_rx,
274 })
275 }
276
277 pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
281 &self,
282 env: ExecutionEnv<Evm>,
283 transactions: I,
284 provider_builder: StateProviderBuilder<N, P>,
285 ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
286 where
287 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
288 {
289 let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
290 let prewarm_handle =
291 self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
292 PayloadHandle {
293 to_multi_proof: None,
294 prewarm_handle,
295 state_root: None,
296 transactions: execution_rx,
297 }
298 }
299
300 #[expect(clippy::type_complexity)]
302 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
303 &self,
304 transactions: I,
305 ) -> (
306 mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
307 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
308 usize,
309 ) {
310 let (lower, upper) = transactions.size_hint();
313 let transaction_count_hint = upper.unwrap_or(lower);
314
315 let (prewarm_tx, prewarm_rx) = mpsc::channel();
316 let (execute_tx, execute_rx) = mpsc::channel();
317 self.executor.spawn_blocking(move || {
318 for tx in transactions {
319 let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx });
320 if let Ok(tx) = &tx {
322 let _ = prewarm_tx.send(tx.clone());
323 }
324 let _ = execute_tx.send(tx);
325 }
326 });
327
328 (prewarm_rx, execute_rx, transaction_count_hint)
329 }
330
331 fn spawn_caching_with<P>(
333 &self,
334 env: ExecutionEnv<Evm>,
335 mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
336 transaction_count_hint: usize,
337 provider_builder: StateProviderBuilder<N, P>,
338 to_multi_proof: Option<Sender<MultiProofMessage>>,
339 ) -> CacheTaskHandle
340 where
341 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
342 {
343 if self.disable_transaction_prewarming {
344 transactions = mpsc::channel().1;
347 }
348
349 let saved_cache = self.cache_for(env.parent_hash);
350 let cache = saved_cache.cache().clone();
351 let cache_metrics = saved_cache.metrics().clone();
352 let prewarm_ctx = PrewarmContext {
354 env,
355 evm_config: self.evm_config.clone(),
356 saved_cache,
357 provider: provider_builder,
358 metrics: PrewarmMetrics::default(),
359 terminate_execution: Arc::new(AtomicBool::new(false)),
360 precompile_cache_disabled: self.precompile_cache_disabled,
361 precompile_cache_map: self.precompile_cache_map.clone(),
362 };
363
364 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
365 self.executor.clone(),
366 self.execution_cache.clone(),
367 prewarm_ctx,
368 to_multi_proof,
369 transaction_count_hint,
370 self.prewarm_max_concurrency,
371 );
372
373 {
375 let to_prewarm_task = to_prewarm_task.clone();
376 self.executor.spawn_blocking(move || {
377 prewarm_task.run(transactions, to_prewarm_task);
378 });
379 }
380
381 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
382 }
383
384 pub const fn take_trie_input(&mut self) -> Option<TrieInput> {
386 self.trie_input.take()
387 }
388
389 #[instrument(target = "engine::caching", skip(self))]
394 fn cache_for(&self, parent_hash: B256) -> SavedCache {
395 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
396 debug!("reusing execution cache");
397 cache
398 } else {
399 debug!("creating new execution cache on cache miss");
400 let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
401 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
402 }
403 }
404
405 fn spawn_sparse_trie_task<BPF>(
407 &self,
408 sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
409 proof_task_handle: BPF,
410 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
411 ) where
412 BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
413 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
414 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
415 {
416 let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
419 let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
420 let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
421 ConfiguredSparseTrie::Serial(Default::default())
422 } else {
423 ConfiguredSparseTrie::Parallel(Box::new(
424 ParallelSparseTrie::default()
425 .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
426 ))
427 });
428 ClearedSparseStateTrie::from_state_trie(
429 SparseStateTrie::new()
430 .with_accounts_trie(default_trie.clone())
431 .with_default_storage_trie(default_trie)
432 .with_updates(true),
433 )
434 });
435
436 let task =
437 SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
438 sparse_trie_rx,
439 proof_task_handle,
440 self.trie_metrics.clone(),
441 sparse_state_trie,
442 );
443
444 self.executor.spawn_blocking(move || {
445 let (result, trie) = task.run();
446 let _ = state_root_tx.send(result);
448
449 cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
452 });
453 }
454}
455
456#[derive(Debug)]
458pub struct PayloadHandle<Tx, Err> {
459 to_multi_proof: Option<Sender<MultiProofMessage>>,
461 prewarm_handle: CacheTaskHandle,
463 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
465 transactions: mpsc::Receiver<Result<Tx, Err>>,
467}
468
469impl<Tx, Err> PayloadHandle<Tx, Err> {
470 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
476 self.state_root
477 .take()
478 .expect("state_root is None")
479 .recv()
480 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
481 }
482
483 pub fn state_hook(&self) -> impl OnStateHook {
487 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
489
490 move |source: StateChangeSource, state: &EvmState| {
491 if let Some(sender) = &to_multi_proof {
492 let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
493 }
494 }
495 }
496
497 pub(super) fn caches(&self) -> StateExecutionCache {
499 self.prewarm_handle.cache.clone()
500 }
501
502 pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
504 self.prewarm_handle.cache_metrics.clone()
505 }
506
507 pub(super) fn stop_prewarming_execution(&self) {
511 self.prewarm_handle.stop_prewarming_execution()
512 }
513
514 pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
518 self.prewarm_handle.terminate_caching(block_output)
519 }
520
521 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
523 core::iter::repeat_with(|| self.transactions.recv())
524 .take_while(|res| res.is_ok())
525 .map(|res| res.unwrap())
526 }
527}
528
529#[derive(Debug)]
531pub(crate) struct CacheTaskHandle {
532 cache: StateExecutionCache,
534 cache_metrics: CachedStateMetrics,
536 to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
538}
539
540impl CacheTaskHandle {
541 pub(super) fn stop_prewarming_execution(&self) {
545 self.to_prewarm_task
546 .as_ref()
547 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
548 }
549
550 pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
554 if let Some(tx) = self.to_prewarm_task.take() {
555 let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
557 let _ = tx.send(event);
558 }
559 }
560}
561
562impl Drop for CacheTaskHandle {
563 fn drop(&mut self) {
564 self.terminate_caching(None);
566 }
567}
568
569#[derive(Clone, Debug, Default)]
595struct ExecutionCache {
596 inner: Arc<RwLock<Option<SavedCache>>>,
598}
599
600impl ExecutionCache {
601 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
607 let cache = self.inner.read();
608 cache
609 .as_ref()
610 .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
611 .cloned()
612 }
613
614 #[expect(unused)]
616 pub(crate) fn clear(&self) {
617 self.inner.write().take();
618 }
619
620 pub(crate) fn update_with_guard<F>(&self, update_fn: F)
634 where
635 F: FnOnce(&mut Option<SavedCache>),
636 {
637 let mut guard = self.inner.write();
638 update_fn(&mut guard);
639 }
640}
641
642#[derive(Debug, Clone)]
644pub struct ExecutionEnv<Evm: ConfigureEvm> {
645 pub evm_env: EvmEnvFor<Evm>,
647 pub hash: B256,
649 pub parent_hash: B256,
651}
652
653impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
654where
655 EvmEnvFor<Evm>: Default,
656{
657 fn default() -> Self {
658 Self {
659 evm_env: Default::default(),
660 hash: Default::default(),
661 parent_hash: Default::default(),
662 }
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::ExecutionCache;
669 use crate::tree::{
670 cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
671 payload_processor::{
672 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
673 },
674 precompile_cache::PrecompileCacheMap,
675 StateProviderBuilder, TreeConfig,
676 };
677 use alloy_evm::block::StateChangeSource;
678 use rand::Rng;
679 use reth_chainspec::ChainSpec;
680 use reth_db_common::init::init_genesis;
681 use reth_ethereum_primitives::TransactionSigned;
682 use reth_evm::OnStateHook;
683 use reth_evm_ethereum::EthEvmConfig;
684 use reth_primitives_traits::{Account, Recovered, StorageEntry};
685 use reth_provider::{
686 providers::{BlockchainProvider, ConsistentDbView},
687 test_utils::create_test_provider_factory_with_chain_spec,
688 ChainSpecProvider, HashingWriter,
689 };
690 use reth_testing_utils::generators;
691 use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
692 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
693 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
694 use std::sync::Arc;
695
696 fn make_saved_cache(hash: B256) -> SavedCache {
697 let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
698 SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
699 }
700
701 #[test]
702 fn execution_cache_allows_single_checkout() {
703 let execution_cache = ExecutionCache::default();
704 let hash = B256::from([1u8; 32]);
705
706 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
707
708 let first = execution_cache.get_cache_for(hash);
709 assert!(first.is_some(), "expected initial checkout to succeed");
710
711 let second = execution_cache.get_cache_for(hash);
712 assert!(second.is_none(), "second checkout should be blocked while guard is active");
713
714 drop(first);
715
716 let third = execution_cache.get_cache_for(hash);
717 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
718 }
719
720 #[test]
721 fn execution_cache_checkout_releases_on_drop() {
722 let execution_cache = ExecutionCache::default();
723 let hash = B256::from([2u8; 32]);
724
725 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
726
727 {
728 let guard = execution_cache.get_cache_for(hash);
729 assert!(guard.is_some(), "expected checkout to succeed");
730 }
732
733 let retry = execution_cache.get_cache_for(hash);
734 assert!(retry.is_some(), "checkout should succeed after guard drop");
735 }
736
737 #[test]
738 fn execution_cache_mismatch_parent_returns_none() {
739 let execution_cache = ExecutionCache::default();
740 let hash = B256::from([3u8; 32]);
741
742 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
743
744 let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
745 assert!(miss.is_none(), "checkout should fail for different parent hash");
746 }
747
748 #[test]
749 fn execution_cache_update_after_release_succeeds() {
750 let execution_cache = ExecutionCache::default();
751 let initial = B256::from([5u8; 32]);
752
753 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
754
755 let guard =
756 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
757
758 drop(guard);
759
760 let updated = B256::from([6u8; 32]);
761 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
762
763 let new_checkout = execution_cache.get_cache_for(updated);
764 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
765 }
766
767 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
768 let mut rng = generators::rng();
769 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
770 let mut updates = Vec::with_capacity(updates_per_account);
771
772 for _ in 0..updates_per_account {
773 let num_accounts_in_update = rng.random_range(1..=num_accounts);
774 let mut state_update = EvmState::default();
775
776 let selected_addresses = &all_addresses[0..num_accounts_in_update];
777
778 for &address in selected_addresses {
779 let mut storage = HashMap::default();
780 if rng.random_bool(0.7) {
781 for _ in 0..rng.random_range(1..10) {
782 let slot = U256::from(rng.random::<u64>());
783 storage.insert(
784 slot,
785 EvmStorageSlot::new_changed(
786 U256::ZERO,
787 U256::from(rng.random::<u64>()),
788 0,
789 ),
790 );
791 }
792 }
793
794 let account = revm_state::Account {
795 info: AccountInfo {
796 balance: U256::from(rng.random::<u64>()),
797 nonce: rng.random::<u64>(),
798 code_hash: KECCAK_EMPTY,
799 code: Some(Default::default()),
800 },
801 storage,
802 status: AccountStatus::Touched,
803 transaction_id: 0,
804 };
805
806 state_update.insert(address, account);
807 }
808
809 updates.push(state_update);
810 }
811
812 updates
813 }
814
815 #[test]
816 fn test_state_root() {
817 reth_tracing::init_test_tracing();
818
819 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
820 let genesis_hash = init_genesis(&factory).unwrap();
821
822 let state_updates = create_mock_state_updates(10, 10);
823 let mut hashed_state = HashedPostState::default();
824 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
825 HashMap::default();
826
827 {
828 let provider_rw = factory.provider_rw().expect("failed to get provider");
829
830 for update in &state_updates {
831 let account_updates = update.iter().map(|(address, account)| {
832 (*address, Some(Account::from_revm_account(account)))
833 });
834 provider_rw
835 .insert_account_for_hashing(account_updates)
836 .expect("failed to insert accounts");
837
838 let storage_updates = update.iter().map(|(address, account)| {
839 let storage_entries = account.storage.iter().map(|(slot, value)| {
840 StorageEntry { key: B256::from(*slot), value: value.present_value }
841 });
842 (*address, storage_entries)
843 });
844 provider_rw
845 .insert_storage_for_hashing(storage_updates)
846 .expect("failed to insert storage");
847 }
848 provider_rw.commit().expect("failed to commit changes");
849 }
850
851 for update in &state_updates {
852 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
853
854 for (address, account) in update {
855 let storage: HashMap<B256, U256> = account
856 .storage
857 .iter()
858 .map(|(k, v)| (B256::from(*k), v.present_value))
859 .collect();
860
861 let entry = accumulated_state.entry(*address).or_default();
862 entry.0 = Account::from_revm_account(account);
863 entry.1.extend(storage);
864 }
865 }
866
867 let mut payload_processor = PayloadProcessor::new(
868 WorkloadExecutor::default(),
869 EthEvmConfig::new(factory.chain_spec()),
870 &TreeConfig::default(),
871 PrecompileCacheMap::default(),
872 );
873 let provider = BlockchainProvider::new(factory).unwrap();
874 let mut handle =
875 payload_processor
876 .spawn(
877 Default::default(),
878 core::iter::empty::<
879 Result<Recovered<TransactionSigned>, core::convert::Infallible>,
880 >(),
881 StateProviderBuilder::new(provider.clone(), genesis_hash, None),
882 ConsistentDbView::new_with_latest_tip(provider).unwrap(),
883 TrieInput::from_state(hashed_state),
884 &TreeConfig::default(),
885 )
886 .map_err(|(err, ..)| err)
887 .expect("failed to spawn payload processor");
888
889 let mut state_hook = handle.state_hook();
890
891 for (i, update) in state_updates.into_iter().enumerate() {
892 state_hook.on_state(StateChangeSource::Transaction(i), &update);
893 }
894 drop(state_hook);
895
896 let root_from_task = handle.state_root().expect("task failed").state_root;
897 let root_from_regular = state_root(accumulated_state);
898
899 assert_eq!(
900 root_from_task, root_from_regular,
901 "State root mismatch: task={root_from_task}, base={root_from_regular}"
902 );
903 }
904}