1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 cached_state::{
6 CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7 SavedCache,
8 },
9 payload_processor::{
10 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11 sparse_trie::StateRootComputeOutcome,
12 },
13 sparse_trie::SparseTrieTask,
14 StateProviderBuilder, TreeConfig,
15};
16use alloy_eips::eip1898::BlockWithParent;
17use alloy_evm::{block::StateChangeSource, ToTxEnv};
18use alloy_primitives::B256;
19use crossbeam_channel::Sender as CrossbeamSender;
20use executor::WorkloadExecutor;
21use multiproof::{SparseTrieUpdate, *};
22use parking_lot::RwLock;
23use prewarm::PrewarmMetrics;
24use rayon::iter::{ParallelBridge, ParallelIterator};
25use reth_engine_primitives::ExecutableTxIterator;
26use reth_evm::{
27 execute::{ExecutableTxFor, WithTxEnv},
28 ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
29};
30use reth_primitives_traits::NodePrimitives;
31use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
32use reth_revm::{db::BundleState, state::EvmState};
33use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
34use reth_trie_parallel::{
35 proof_task::{ProofTaskCtx, ProofWorkerHandle},
36 root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39 provider::{TrieNodeProvider, TrieNodeProviderFactory},
40 ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
41};
42use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
43use std::{
44 collections::BTreeMap,
45 sync::{
46 atomic::AtomicBool,
47 mpsc::{self, channel},
48 Arc,
49 },
50 time::Instant,
51};
52use tracing::{debug, debug_span, instrument, warn, Span};
53
54mod configured_sparse_trie;
55pub mod executor;
56pub mod multiproof;
57pub mod prewarm;
58pub mod sparse_trie;
59
60use configured_sparse_trie::ConfiguredSparseTrie;
61
62pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
68 ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
69
70pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
79
80pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
92
93#[derive(Debug)]
95pub struct PayloadProcessor<Evm>
96where
97 Evm: ConfigureEvm,
98{
99 executor: WorkloadExecutor,
101 execution_cache: ExecutionCache,
103 trie_metrics: MultiProofTaskMetrics,
105 cross_block_cache_size: u64,
107 disable_transaction_prewarming: bool,
109 disable_state_cache: bool,
111 evm_config: Evm,
113 precompile_cache_disabled: bool,
115 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
117 sparse_state_trie: Arc<
120 parking_lot::Mutex<
121 Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
122 >,
123 >,
124 disable_parallel_sparse_trie: bool,
126 prewarm_max_concurrency: usize,
128}
129
130impl<N, Evm> PayloadProcessor<Evm>
131where
132 N: NodePrimitives,
133 Evm: ConfigureEvm<Primitives = N>,
134{
135 pub(super) const fn executor(&self) -> &WorkloadExecutor {
137 &self.executor
138 }
139
140 pub fn new(
142 executor: WorkloadExecutor,
143 evm_config: Evm,
144 config: &TreeConfig,
145 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
146 ) -> Self {
147 Self {
148 executor,
149 execution_cache: Default::default(),
150 trie_metrics: Default::default(),
151 cross_block_cache_size: config.cross_block_cache_size(),
152 disable_transaction_prewarming: config.disable_prewarming(),
153 evm_config,
154 disable_state_cache: config.disable_state_cache(),
155 precompile_cache_disabled: config.precompile_cache_disabled(),
156 precompile_cache_map,
157 sparse_state_trie: Arc::default(),
158 disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
159 prewarm_max_concurrency: config.prewarm_max_concurrency(),
160 }
161 }
162}
163
164impl<N, Evm> PayloadProcessor<Evm>
165where
166 N: NodePrimitives,
167 Evm: ConfigureEvm<Primitives = N> + 'static,
168{
169 #[allow(clippy::type_complexity)]
202 #[instrument(
203 level = "debug",
204 target = "engine::tree::payload_processor",
205 name = "payload processor",
206 skip_all
207 )]
208 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
209 &mut self,
210 env: ExecutionEnv<Evm>,
211 transactions: I,
212 provider_builder: StateProviderBuilder<N, P>,
213 multiproof_provider_factory: F,
214 config: &TreeConfig,
215 ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
216 where
217 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
218 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
219 + Clone
220 + Send
221 + 'static,
222 {
223 let (prewarm_rx, execution_rx, transaction_count_hint) =
225 self.spawn_tx_iterator(transactions);
226
227 let span = Span::current();
228 let (to_sparse_trie, sparse_trie_rx) = channel();
229
230 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
237 let storage_worker_count = config.storage_worker_count();
238 let account_worker_count = config.account_worker_count();
239 let proof_handle = ProofWorkerHandle::new(
240 self.executor.handle().clone(),
241 task_ctx,
242 storage_worker_count,
243 account_worker_count,
244 );
245
246 let multi_proof_task = MultiProofTask::new(
247 proof_handle.clone(),
248 to_sparse_trie,
249 config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
250 );
251
252 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
254
255 let prewarm_handle = self.spawn_caching_with(
256 env,
257 prewarm_rx,
258 transaction_count_hint,
259 provider_builder,
260 to_multi_proof.clone(),
261 );
262
263 let parent_span = span.clone();
265 self.executor.spawn_blocking(move || {
266 let _enter = parent_span.entered();
267 multi_proof_task.run();
268 });
269
270 let (state_root_tx, state_root_rx) = channel();
272
273 self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
275
276 PayloadHandle {
277 to_multi_proof,
278 prewarm_handle,
279 state_root: Some(state_root_rx),
280 transactions: execution_rx,
281 _span: span,
282 }
283 }
284
285 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
289 pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
290 &self,
291 env: ExecutionEnv<Evm>,
292 transactions: I,
293 provider_builder: StateProviderBuilder<N, P>,
294 ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
295 where
296 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
297 {
298 let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
299 let prewarm_handle =
300 self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
301 PayloadHandle {
302 to_multi_proof: None,
303 prewarm_handle,
304 state_root: None,
305 transactions: execution_rx,
306 _span: Span::current(),
307 }
308 }
309
310 #[expect(clippy::type_complexity)]
312 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
313 &self,
314 transactions: I,
315 ) -> (
316 mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
317 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
318 usize,
319 ) {
320 let (transactions, convert) = transactions.into();
321 let transactions = transactions.into_iter();
322 let (lower, upper) = transactions.size_hint();
325 let transaction_count_hint = upper.unwrap_or(lower);
326
327 let (tx, rx) = mpsc::channel();
330 self.executor.spawn_blocking(move || {
331 transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
332 let tx = convert(tx);
333 let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
334 let _ = sender.send((idx, tx));
335 });
336 });
337
338 let (prewarm_tx, prewarm_rx) = mpsc::channel();
341 let (execute_tx, execute_rx) = mpsc::channel();
342 self.executor.spawn_blocking(move || {
343 let mut next_for_execution = 0;
344 let mut queue = BTreeMap::new();
345 while let Ok((idx, tx)) = rx.recv() {
346 if let Ok(tx) = &tx {
348 let _ = prewarm_tx.send(tx.clone());
349 }
350
351 if next_for_execution == idx {
352 let _ = execute_tx.send(tx);
353 next_for_execution += 1;
354
355 while let Some(entry) = queue.first_entry() &&
356 *entry.key() == next_for_execution
357 {
358 let _ = execute_tx.send(entry.remove());
359 next_for_execution += 1;
360 }
361 } else {
362 queue.insert(idx, tx);
363 }
364 }
365 });
366
367 (prewarm_rx, execute_rx, transaction_count_hint)
368 }
369
370 fn spawn_caching_with<P>(
372 &self,
373 env: ExecutionEnv<Evm>,
374 mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
375 transaction_count_hint: usize,
376 provider_builder: StateProviderBuilder<N, P>,
377 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
378 ) -> CacheTaskHandle
379 where
380 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
381 {
382 if self.disable_transaction_prewarming {
383 transactions = mpsc::channel().1;
386 }
387
388 let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
389 (None, None, None)
390 } else {
391 let saved_cache = self.cache_for(env.parent_hash);
392 let cache = saved_cache.cache().clone();
393 let cache_metrics = saved_cache.metrics().clone();
394 (Some(saved_cache), Some(cache), Some(cache_metrics))
395 };
396
397 let prewarm_ctx = PrewarmContext {
399 env,
400 evm_config: self.evm_config.clone(),
401 saved_cache,
402 provider: provider_builder,
403 metrics: PrewarmMetrics::default(),
404 terminate_execution: Arc::new(AtomicBool::new(false)),
405 precompile_cache_disabled: self.precompile_cache_disabled,
406 precompile_cache_map: self.precompile_cache_map.clone(),
407 };
408
409 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
410 self.executor.clone(),
411 self.execution_cache.clone(),
412 prewarm_ctx,
413 to_multi_proof,
414 transaction_count_hint,
415 self.prewarm_max_concurrency,
416 );
417
418 {
420 let to_prewarm_task = to_prewarm_task.clone();
421 self.executor.spawn_blocking(move || {
422 prewarm_task.run(transactions, to_prewarm_task);
423 });
424 }
425
426 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
427 }
428
429 #[instrument(level = "debug", target = "engine::caching", skip(self))]
434 fn cache_for(&self, parent_hash: B256) -> SavedCache {
435 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
436 debug!("reusing execution cache");
437 cache
438 } else {
439 debug!("creating new execution cache on cache miss");
440 let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
441 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
442 }
443 }
444
445 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
447 fn spawn_sparse_trie_task<BPF>(
448 &self,
449 sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
450 proof_worker_handle: BPF,
451 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
452 ) where
453 BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
454 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
455 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
456 {
457 let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
460 let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
461 let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
462 ConfiguredSparseTrie::Serial(Default::default())
463 } else {
464 ConfiguredSparseTrie::Parallel(Box::new(
465 ParallelSparseTrie::default()
466 .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
467 ))
468 });
469 ClearedSparseStateTrie::from_state_trie(
470 SparseStateTrie::new()
471 .with_accounts_trie(default_trie.clone())
472 .with_default_storage_trie(default_trie)
473 .with_updates(true),
474 )
475 });
476
477 let task =
478 SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
479 sparse_trie_rx,
480 proof_worker_handle,
481 self.trie_metrics.clone(),
482 sparse_state_trie,
483 );
484
485 let span = Span::current();
486 self.executor.spawn_blocking(move || {
487 let _enter = span.entered();
488
489 let (result, trie) = task.run();
490 let _ = state_root_tx.send(result);
492
493 let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
497 let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
498
499 cleared_trie.shrink_to(
501 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
502 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
503 );
504
505 cleared_sparse_trie.lock().replace(cleared_trie);
506 });
507 }
508
509 pub(crate) fn on_inserted_executed_block(
517 &self,
518 block_with_parent: BlockWithParent,
519 bundle_state: &BundleState,
520 ) {
521 self.execution_cache.update_with_guard(|cached| {
522 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
523 debug!(
524 target: "engine::caching",
525 parent_hash = %block_with_parent.parent,
526 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
527 );
528 return;
529 }
530
531 let (caches, cache_metrics) = match cached.take() {
533 Some(existing) => {
534 existing.split()
535 }
536 None => (
537 ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
538 CachedStateMetrics::zeroed(),
539 ),
540 };
541
542 let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics);
544 if new_cache.cache().insert_state(bundle_state).is_err() {
545 *cached = None;
546 debug!(target: "engine::caching", "cleared execution cache on update error");
547 return;
548 }
549 new_cache.update_metrics();
550
551 *cached = Some(new_cache);
553 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
554 });
555 }
556}
557
558#[derive(Debug)]
560pub struct PayloadHandle<Tx, Err> {
561 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
563 prewarm_handle: CacheTaskHandle,
565 transactions: mpsc::Receiver<Result<Tx, Err>>,
567 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
569 _span: Span,
571}
572
573impl<Tx, Err> PayloadHandle<Tx, Err> {
574 #[instrument(
580 level = "debug",
581 target = "engine::tree::payload_processor",
582 name = "await_state_root",
583 skip_all
584 )]
585 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
586 self.state_root
587 .take()
588 .expect("state_root is None")
589 .recv()
590 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
591 }
592
593 pub fn state_hook(&self) -> impl OnStateHook {
597 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
599
600 move |source: StateChangeSource, state: &EvmState| {
601 if let Some(sender) = &to_multi_proof {
602 let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
603 }
604 }
605 }
606
607 pub(super) fn caches(&self) -> Option<StateExecutionCache> {
609 self.prewarm_handle.cache.clone()
610 }
611
612 pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
614 self.prewarm_handle.cache_metrics.clone()
615 }
616
617 pub(super) fn stop_prewarming_execution(&self) {
621 self.prewarm_handle.stop_prewarming_execution()
622 }
623
624 pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
628 self.prewarm_handle.terminate_caching(block_output)
629 }
630
631 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
633 core::iter::repeat_with(|| self.transactions.recv())
634 .take_while(|res| res.is_ok())
635 .map(|res| res.unwrap())
636 }
637}
638
639#[derive(Debug)]
641pub(crate) struct CacheTaskHandle {
642 cache: Option<StateExecutionCache>,
644 cache_metrics: Option<CachedStateMetrics>,
646 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
648}
649
650impl CacheTaskHandle {
651 pub(super) fn stop_prewarming_execution(&self) {
655 self.to_prewarm_task
656 .as_ref()
657 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
658 }
659
660 pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
664 if let Some(tx) = self.to_prewarm_task.take() {
665 let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
667 let _ = tx.send(event);
668 }
669 }
670}
671
672impl Drop for CacheTaskHandle {
673 fn drop(&mut self) {
674 self.terminate_caching(None);
676 }
677}
678
679#[derive(Clone, Debug, Default)]
705struct ExecutionCache {
706 inner: Arc<RwLock<Option<SavedCache>>>,
708}
709
710impl ExecutionCache {
711 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
717 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
718 let start = Instant::now();
719 let cache = self.inner.read();
720
721 let elapsed = start.elapsed();
722 if elapsed.as_millis() > 5 {
723 warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
724 }
725
726 cache
727 .as_ref()
728 .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
729 .cloned()
730 }
731
732 #[expect(unused)]
734 pub(crate) fn clear(&self) {
735 self.inner.write().take();
736 }
737
738 pub(crate) fn update_with_guard<F>(&self, update_fn: F)
752 where
753 F: FnOnce(&mut Option<SavedCache>),
754 {
755 let mut guard = self.inner.write();
756 update_fn(&mut guard);
757 }
758}
759
760#[derive(Debug, Clone)]
762pub struct ExecutionEnv<Evm: ConfigureEvm> {
763 pub evm_env: EvmEnvFor<Evm>,
765 pub hash: B256,
767 pub parent_hash: B256,
769}
770
771impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
772where
773 EvmEnvFor<Evm>: Default,
774{
775 fn default() -> Self {
776 Self {
777 evm_env: Default::default(),
778 hash: Default::default(),
779 parent_hash: Default::default(),
780 }
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use super::ExecutionCache;
787 use crate::tree::{
788 cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
789 payload_processor::{
790 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
791 },
792 precompile_cache::PrecompileCacheMap,
793 StateProviderBuilder, TreeConfig,
794 };
795 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
796 use alloy_evm::block::StateChangeSource;
797 use rand::Rng;
798 use reth_chainspec::ChainSpec;
799 use reth_db_common::init::init_genesis;
800 use reth_ethereum_primitives::TransactionSigned;
801 use reth_evm::OnStateHook;
802 use reth_evm_ethereum::EthEvmConfig;
803 use reth_primitives_traits::{Account, Recovered, StorageEntry};
804 use reth_provider::{
805 providers::{BlockchainProvider, OverlayStateProviderFactory},
806 test_utils::create_test_provider_factory_with_chain_spec,
807 ChainSpecProvider, HashingWriter,
808 };
809 use reth_revm::db::BundleState;
810 use reth_testing_utils::generators;
811 use reth_trie::{test_utils::state_root, HashedPostState};
812 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
813 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
814 use std::sync::Arc;
815
816 fn make_saved_cache(hash: B256) -> SavedCache {
817 let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
818 SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
819 }
820
821 #[test]
822 fn execution_cache_allows_single_checkout() {
823 let execution_cache = ExecutionCache::default();
824 let hash = B256::from([1u8; 32]);
825
826 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
827
828 let first = execution_cache.get_cache_for(hash);
829 assert!(first.is_some(), "expected initial checkout to succeed");
830
831 let second = execution_cache.get_cache_for(hash);
832 assert!(second.is_none(), "second checkout should be blocked while guard is active");
833
834 drop(first);
835
836 let third = execution_cache.get_cache_for(hash);
837 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
838 }
839
840 #[test]
841 fn execution_cache_checkout_releases_on_drop() {
842 let execution_cache = ExecutionCache::default();
843 let hash = B256::from([2u8; 32]);
844
845 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
846
847 {
848 let guard = execution_cache.get_cache_for(hash);
849 assert!(guard.is_some(), "expected checkout to succeed");
850 }
852
853 let retry = execution_cache.get_cache_for(hash);
854 assert!(retry.is_some(), "checkout should succeed after guard drop");
855 }
856
857 #[test]
858 fn execution_cache_mismatch_parent_returns_none() {
859 let execution_cache = ExecutionCache::default();
860 let hash = B256::from([3u8; 32]);
861
862 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
863
864 let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
865 assert!(miss.is_none(), "checkout should fail for different parent hash");
866 }
867
868 #[test]
869 fn execution_cache_update_after_release_succeeds() {
870 let execution_cache = ExecutionCache::default();
871 let initial = B256::from([5u8; 32]);
872
873 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
874
875 let guard =
876 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
877
878 drop(guard);
879
880 let updated = B256::from([6u8; 32]);
881 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
882
883 let new_checkout = execution_cache.get_cache_for(updated);
884 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
885 }
886
887 #[test]
888 fn on_inserted_executed_block_populates_cache() {
889 let payload_processor = PayloadProcessor::new(
890 WorkloadExecutor::default(),
891 EthEvmConfig::new(Arc::new(ChainSpec::default())),
892 &TreeConfig::default(),
893 PrecompileCacheMap::default(),
894 );
895
896 let parent_hash = B256::from([1u8; 32]);
897 let block_hash = B256::from([10u8; 32]);
898 let block_with_parent = BlockWithParent {
899 block: BlockNumHash { hash: block_hash, number: 1 },
900 parent: parent_hash,
901 };
902 let bundle_state = BundleState::default();
903
904 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
906
907 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
909
910 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
912 assert!(cached.is_some());
913 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
914 }
915
916 #[test]
917 fn on_inserted_executed_block_skips_on_parent_mismatch() {
918 let payload_processor = PayloadProcessor::new(
919 WorkloadExecutor::default(),
920 EthEvmConfig::new(Arc::new(ChainSpec::default())),
921 &TreeConfig::default(),
922 PrecompileCacheMap::default(),
923 );
924
925 let block1_hash = B256::from([1u8; 32]);
927 payload_processor
928 .execution_cache
929 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
930
931 let wrong_parent = B256::from([99u8; 32]);
933 let block3_hash = B256::from([3u8; 32]);
934 let block_with_parent = BlockWithParent {
935 block: BlockNumHash { hash: block3_hash, number: 3 },
936 parent: wrong_parent,
937 };
938 let bundle_state = BundleState::default();
939
940 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
941
942 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
944 assert!(cached.is_some(), "Original cache should be preserved");
945
946 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
948 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
949 }
950
951 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
952 let mut rng = generators::rng();
953 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
954 let mut updates = Vec::with_capacity(updates_per_account);
955
956 for _ in 0..updates_per_account {
957 let num_accounts_in_update = rng.random_range(1..=num_accounts);
958 let mut state_update = EvmState::default();
959
960 let selected_addresses = &all_addresses[0..num_accounts_in_update];
961
962 for &address in selected_addresses {
963 let mut storage = HashMap::default();
964 if rng.random_bool(0.7) {
965 for _ in 0..rng.random_range(1..10) {
966 let slot = U256::from(rng.random::<u64>());
967 storage.insert(
968 slot,
969 EvmStorageSlot::new_changed(
970 U256::ZERO,
971 U256::from(rng.random::<u64>()),
972 0,
973 ),
974 );
975 }
976 }
977
978 let account = revm_state::Account {
979 info: AccountInfo {
980 balance: U256::from(rng.random::<u64>()),
981 nonce: rng.random::<u64>(),
982 code_hash: KECCAK_EMPTY,
983 code: Some(Default::default()),
984 },
985 storage,
986 status: AccountStatus::Touched,
987 transaction_id: 0,
988 };
989
990 state_update.insert(address, account);
991 }
992
993 updates.push(state_update);
994 }
995
996 updates
997 }
998
999 #[test]
1000 fn test_state_root() {
1001 reth_tracing::init_test_tracing();
1002
1003 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1004 let genesis_hash = init_genesis(&factory).unwrap();
1005
1006 let state_updates = create_mock_state_updates(10, 10);
1007 let mut hashed_state = HashedPostState::default();
1008 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1009 HashMap::default();
1010
1011 {
1012 let provider_rw = factory.provider_rw().expect("failed to get provider");
1013
1014 for update in &state_updates {
1015 let account_updates = update.iter().map(|(address, account)| {
1016 (*address, Some(Account::from_revm_account(account)))
1017 });
1018 provider_rw
1019 .insert_account_for_hashing(account_updates)
1020 .expect("failed to insert accounts");
1021
1022 let storage_updates = update.iter().map(|(address, account)| {
1023 let storage_entries = account.storage.iter().map(|(slot, value)| {
1024 StorageEntry { key: B256::from(*slot), value: value.present_value }
1025 });
1026 (*address, storage_entries)
1027 });
1028 provider_rw
1029 .insert_storage_for_hashing(storage_updates)
1030 .expect("failed to insert storage");
1031 }
1032 provider_rw.commit().expect("failed to commit changes");
1033 }
1034
1035 for update in &state_updates {
1036 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1037
1038 for (address, account) in update {
1039 let storage: HashMap<B256, U256> = account
1040 .storage
1041 .iter()
1042 .map(|(k, v)| (B256::from(*k), v.present_value))
1043 .collect();
1044
1045 let entry = accumulated_state.entry(*address).or_default();
1046 entry.0 = Account::from_revm_account(account);
1047 entry.1.extend(storage);
1048 }
1049 }
1050
1051 let mut payload_processor = PayloadProcessor::new(
1052 WorkloadExecutor::default(),
1053 EthEvmConfig::new(factory.chain_spec()),
1054 &TreeConfig::default(),
1055 PrecompileCacheMap::default(),
1056 );
1057
1058 let provider_factory = BlockchainProvider::new(factory).unwrap();
1059
1060 let mut handle =
1061 payload_processor.spawn(
1062 Default::default(),
1063 (
1064 core::iter::empty::<
1065 Result<Recovered<TransactionSigned>, core::convert::Infallible>,
1066 >(),
1067 std::convert::identity,
1068 ),
1069 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1070 OverlayStateProviderFactory::new(provider_factory),
1071 &TreeConfig::default(),
1072 );
1073
1074 let mut state_hook = handle.state_hook();
1075
1076 for (i, update) in state_updates.into_iter().enumerate() {
1077 state_hook.on_state(StateChangeSource::Transaction(i), &update);
1078 }
1079 drop(state_hook);
1080
1081 let root_from_task = handle.state_root().expect("task failed").state_root;
1082 let root_from_regular = state_root(accumulated_state);
1083
1084 assert_eq!(
1085 root_from_task, root_from_regular,
1086 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1087 );
1088 }
1089}