1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 cached_state::{
6 CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache,
7 ExecutionCacheBuilder, SavedCache,
8 },
9 payload_processor::{
10 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
11 sparse_trie::StateRootComputeOutcome,
12 },
13 sparse_trie::SparseTrieTask,
14 StateProviderBuilder, TreeConfig,
15};
16use alloy_eip7928::BlockAccessList;
17use alloy_eips::eip1898::BlockWithParent;
18use alloy_evm::{block::StateChangeSource, ToTxEnv};
19use alloy_primitives::B256;
20use crossbeam_channel::Sender as CrossbeamSender;
21use executor::WorkloadExecutor;
22use multiproof::{SparseTrieUpdate, *};
23use parking_lot::RwLock;
24use prewarm::PrewarmMetrics;
25use rayon::prelude::*;
26use reth_evm::{
27 execute::{ExecutableTxFor, WithTxEnv},
28 ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
29 TxEnvFor,
30};
31use reth_execution_types::ExecutionOutcome;
32use reth_primitives_traits::NodePrimitives;
33use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
34use reth_revm::{db::BundleState, state::EvmState};
35use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
36use reth_trie_parallel::{
37 proof_task::{ProofTaskCtx, ProofWorkerHandle},
38 root::ParallelStateRootError,
39};
40use reth_trie_sparse::{
41 provider::{TrieNodeProvider, TrieNodeProviderFactory},
42 ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
43};
44use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
45use std::{
46 collections::BTreeMap,
47 sync::{
48 atomic::AtomicBool,
49 mpsc::{self, channel},
50 Arc,
51 },
52 time::Instant,
53};
54use tracing::{debug, debug_span, instrument, warn, Span};
55
56pub mod bal;
57mod configured_sparse_trie;
58pub mod executor;
59pub mod multiproof;
60pub mod prewarm;
61pub mod sparse_trie;
62
63use configured_sparse_trie::ConfiguredSparseTrie;
64
65pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
71 ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
72
73pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
82
83pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
95
96type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
98 WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxTuple>::Tx>,
99 <I as ExecutableTxTuple>::Error,
100 <N as NodePrimitives>::Receipt,
101>;
102
103#[derive(Debug)]
105pub struct PayloadProcessor<Evm>
106where
107 Evm: ConfigureEvm,
108{
109 executor: WorkloadExecutor,
111 execution_cache: ExecutionCache,
113 trie_metrics: MultiProofTaskMetrics,
115 cross_block_cache_size: u64,
117 disable_transaction_prewarming: bool,
119 disable_state_cache: bool,
121 evm_config: Evm,
123 precompile_cache_disabled: bool,
125 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
127 sparse_state_trie: Arc<
130 parking_lot::Mutex<
131 Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
132 >,
133 >,
134 disable_parallel_sparse_trie: bool,
136 prewarm_max_concurrency: usize,
138}
139
140impl<N, Evm> PayloadProcessor<Evm>
141where
142 N: NodePrimitives,
143 Evm: ConfigureEvm<Primitives = N>,
144{
145 pub(super) const fn executor(&self) -> &WorkloadExecutor {
147 &self.executor
148 }
149
150 pub fn new(
152 executor: WorkloadExecutor,
153 evm_config: Evm,
154 config: &TreeConfig,
155 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
156 ) -> Self {
157 Self {
158 executor,
159 execution_cache: Default::default(),
160 trie_metrics: Default::default(),
161 cross_block_cache_size: config.cross_block_cache_size(),
162 disable_transaction_prewarming: config.disable_prewarming(),
163 evm_config,
164 disable_state_cache: config.disable_state_cache(),
165 precompile_cache_disabled: config.precompile_cache_disabled(),
166 precompile_cache_map,
167 sparse_state_trie: Arc::default(),
168 disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
169 prewarm_max_concurrency: config.prewarm_max_concurrency(),
170 }
171 }
172}
173
174impl<N, Evm> PayloadProcessor<Evm>
175where
176 N: NodePrimitives,
177 Evm: ConfigureEvm<Primitives = N> + 'static,
178{
179 #[instrument(
212 level = "debug",
213 target = "engine::tree::payload_processor",
214 name = "payload processor",
215 skip_all
216 )]
217 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
218 &mut self,
219 env: ExecutionEnv<Evm>,
220 transactions: I,
221 provider_builder: StateProviderBuilder<N, P>,
222 multiproof_provider_factory: F,
223 config: &TreeConfig,
224 bal: Option<Arc<BlockAccessList>>,
225 ) -> IteratorPayloadHandle<Evm, I, N>
226 where
227 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
228 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
229 + Clone
230 + Send
231 + 'static,
232 {
233 let parent_hash = env.parent_hash;
234
235 let (prewarm_rx, execution_rx, transaction_count_hint) =
237 self.spawn_tx_iterator(transactions);
238
239 let span = Span::current();
240 let (to_sparse_trie, sparse_trie_rx) = channel();
241 let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
242
243 let prewarm_handle = if let Some(bal) = bal {
245 debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
247
248 let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(Arc::clone(&bal)));
250
251 self.spawn_caching_with(
253 env,
254 prewarm_rx,
255 transaction_count_hint,
256 provider_builder.clone(),
257 None, Some(bal),
259 )
260 } else {
261 self.spawn_caching_with(
263 env,
264 prewarm_rx,
265 transaction_count_hint,
266 provider_builder.clone(),
267 Some(to_multi_proof.clone()),
268 None,
269 )
270 };
271
272 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
274 let storage_worker_count = config.storage_worker_count();
275 let account_worker_count = config.account_worker_count();
276 let proof_handle = ProofWorkerHandle::new(
277 self.executor.handle().clone(),
278 task_ctx,
279 storage_worker_count,
280 account_worker_count,
281 );
282
283 let multi_proof_task = MultiProofTask::new(
284 proof_handle.clone(),
285 to_sparse_trie,
286 config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
287 to_multi_proof,
288 from_multi_proof,
289 );
290
291 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
293
294 let parent_span = span.clone();
296 self.executor.spawn_blocking({
297 let saved_cache = self.cache_for(parent_hash);
298 let cache = saved_cache.cache().clone();
299 let cache_metrics = saved_cache.metrics().clone();
300 move || {
301 let _enter = parent_span.entered();
302 let provider = provider_builder.build().expect("failed to build provider");
304 let provider = CachedStateProvider::new(provider, cache, cache_metrics);
305 multi_proof_task.run(provider);
306 }
307 });
308
309 let (state_root_tx, state_root_rx) = channel();
311
312 self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
314
315 PayloadHandle {
316 to_multi_proof,
317 prewarm_handle,
318 state_root: Some(state_root_rx),
319 transactions: execution_rx,
320 _span: span,
321 }
322 }
323
324 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
328 pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
329 &self,
330 env: ExecutionEnv<Evm>,
331 transactions: I,
332 provider_builder: StateProviderBuilder<N, P>,
333 bal: Option<Arc<BlockAccessList>>,
334 ) -> IteratorPayloadHandle<Evm, I, N>
335 where
336 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
337 {
338 let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
339 let prewarm_handle =
340 self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal);
341 PayloadHandle {
342 to_multi_proof: None,
343 prewarm_handle,
344 state_root: None,
345 transactions: execution_rx,
346 _span: Span::current(),
347 }
348 }
349
350 #[expect(clippy::type_complexity)]
352 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
353 &self,
354 transactions: I,
355 ) -> (
356 mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
357 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
358 usize,
359 ) {
360 let (transactions, convert) = transactions.into();
361 let transactions = transactions.into_par_iter();
362 let transaction_count_hint = transactions.len();
363
364 let (ooo_tx, ooo_rx) = mpsc::channel();
365 let (prewarm_tx, prewarm_rx) = mpsc::channel();
366 let (execute_tx, execute_rx) = mpsc::channel();
367
368 self.executor.spawn_blocking(move || {
370 transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
371 let tx = convert(tx);
372 let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
373 if let Ok(tx) = &tx {
375 let _ = prewarm_tx.send(tx.clone());
376 }
377 let _ = ooo_tx.send((idx, tx));
378 });
379 });
380
381 self.executor.spawn_blocking(move || {
384 let mut next_for_execution = 0;
385 let mut queue = BTreeMap::new();
386 while let Ok((idx, tx)) = ooo_rx.recv() {
387 if next_for_execution == idx {
388 let _ = execute_tx.send(tx);
389 next_for_execution += 1;
390
391 while let Some(entry) = queue.first_entry() &&
392 *entry.key() == next_for_execution
393 {
394 let _ = execute_tx.send(entry.remove());
395 next_for_execution += 1;
396 }
397 } else {
398 queue.insert(idx, tx);
399 }
400 }
401 });
402
403 (prewarm_rx, execute_rx, transaction_count_hint)
404 }
405
406 fn spawn_caching_with<P>(
408 &self,
409 env: ExecutionEnv<Evm>,
410 mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
411 transaction_count_hint: usize,
412 provider_builder: StateProviderBuilder<N, P>,
413 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
414 bal: Option<Arc<BlockAccessList>>,
415 ) -> CacheTaskHandle<N::Receipt>
416 where
417 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
418 {
419 if self.disable_transaction_prewarming {
420 transactions = mpsc::channel().1;
423 }
424
425 let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
426 (None, None, None)
427 } else {
428 let saved_cache = self.cache_for(env.parent_hash);
429 let cache = saved_cache.cache().clone();
430 let cache_metrics = saved_cache.metrics().clone();
431 (Some(saved_cache), Some(cache), Some(cache_metrics))
432 };
433
434 let prewarm_ctx = PrewarmContext {
436 env,
437 evm_config: self.evm_config.clone(),
438 saved_cache,
439 provider: provider_builder,
440 metrics: PrewarmMetrics::default(),
441 terminate_execution: Arc::new(AtomicBool::new(false)),
442 precompile_cache_disabled: self.precompile_cache_disabled,
443 precompile_cache_map: self.precompile_cache_map.clone(),
444 };
445
446 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
447 self.executor.clone(),
448 self.execution_cache.clone(),
449 prewarm_ctx,
450 to_multi_proof,
451 transaction_count_hint,
452 self.prewarm_max_concurrency,
453 );
454
455 {
457 let to_prewarm_task = to_prewarm_task.clone();
458 self.executor.spawn_blocking(move || {
459 let mode = if let Some(bal) = bal {
460 PrewarmMode::BlockAccessList(bal)
461 } else {
462 PrewarmMode::Transactions(transactions)
463 };
464 prewarm_task.run(mode, to_prewarm_task);
465 });
466 }
467
468 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
469 }
470
471 #[instrument(level = "debug", target = "engine::caching", skip(self))]
476 fn cache_for(&self, parent_hash: B256) -> SavedCache {
477 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
478 debug!("reusing execution cache");
479 cache
480 } else {
481 debug!("creating new execution cache on cache miss");
482 let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
483 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
484 }
485 }
486
487 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
489 fn spawn_sparse_trie_task<BPF>(
490 &self,
491 sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
492 proof_worker_handle: BPF,
493 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
494 ) where
495 BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
496 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
497 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
498 {
499 let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
502 let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
503 let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
504 ConfiguredSparseTrie::Serial(Default::default())
505 } else {
506 ConfiguredSparseTrie::Parallel(Box::new(
507 ParallelSparseTrie::default()
508 .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
509 ))
510 });
511 ClearedSparseStateTrie::from_state_trie(
512 SparseStateTrie::new()
513 .with_accounts_trie(default_trie.clone())
514 .with_default_storage_trie(default_trie)
515 .with_updates(true),
516 )
517 });
518
519 let task =
520 SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
521 sparse_trie_rx,
522 proof_worker_handle,
523 self.trie_metrics.clone(),
524 sparse_state_trie,
525 );
526
527 let span = Span::current();
528 self.executor.spawn_blocking(move || {
529 let _enter = span.entered();
530
531 let (result, trie) = task.run();
532 let _ = state_root_tx.send(result);
534
535 let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
539 let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
540
541 cleared_trie.shrink_to(
543 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
544 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
545 );
546
547 cleared_sparse_trie.lock().replace(cleared_trie);
548 });
549 }
550
551 pub(crate) fn on_inserted_executed_block(
559 &self,
560 block_with_parent: BlockWithParent,
561 bundle_state: &BundleState,
562 ) {
563 self.execution_cache.update_with_guard(|cached| {
564 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
565 debug!(
566 target: "engine::caching",
567 parent_hash = %block_with_parent.parent,
568 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
569 );
570 return;
571 }
572
573 let (caches, cache_metrics) = match cached.take() {
575 Some(existing) => {
576 existing.split()
577 }
578 None => (
579 ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
580 CachedStateMetrics::zeroed(),
581 ),
582 };
583
584 let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics);
586 if new_cache.cache().insert_state(bundle_state).is_err() {
587 *cached = None;
588 debug!(target: "engine::caching", "cleared execution cache on update error");
589 return;
590 }
591 new_cache.update_metrics();
592
593 *cached = Some(new_cache);
595 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
596 });
597 }
598}
599
600#[derive(Debug)]
605pub struct PayloadHandle<Tx, Err, R> {
606 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
608 prewarm_handle: CacheTaskHandle<R>,
610 transactions: mpsc::Receiver<Result<Tx, Err>>,
612 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
614 _span: Span,
616}
617
618impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
619 #[instrument(
625 level = "debug",
626 target = "engine::tree::payload_processor",
627 name = "await_state_root",
628 skip_all
629 )]
630 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
631 self.state_root
632 .take()
633 .expect("state_root is None")
634 .recv()
635 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
636 }
637
638 pub fn state_hook(&self) -> impl OnStateHook {
642 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
644
645 move |source: StateChangeSource, state: &EvmState| {
646 if let Some(sender) = &to_multi_proof {
647 let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
648 }
649 }
650 }
651
652 pub(super) fn caches(&self) -> Option<StateExecutionCache> {
654 self.prewarm_handle.cache.clone()
655 }
656
657 pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
659 self.prewarm_handle.cache_metrics.clone()
660 }
661
662 pub(super) fn stop_prewarming_execution(&self) {
666 self.prewarm_handle.stop_prewarming_execution()
667 }
668
669 pub(super) fn terminate_caching(
675 &mut self,
676 execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
677 ) {
678 self.prewarm_handle.terminate_caching(execution_outcome)
679 }
680
681 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
683 core::iter::repeat_with(|| self.transactions.recv())
684 .take_while(|res| res.is_ok())
685 .map(|res| res.unwrap())
686 }
687}
688
689#[derive(Debug)]
694pub(crate) struct CacheTaskHandle<R> {
695 cache: Option<StateExecutionCache>,
697 cache_metrics: Option<CachedStateMetrics>,
699 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
701}
702
703impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
704 pub(super) fn stop_prewarming_execution(&self) {
708 self.to_prewarm_task
709 .as_ref()
710 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
711 }
712
713 pub(super) fn terminate_caching(
718 &mut self,
719 execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
720 ) {
721 if let Some(tx) = self.to_prewarm_task.take() {
722 let event = PrewarmTaskEvent::Terminate { execution_outcome };
723 let _ = tx.send(event);
724 }
725 }
726}
727
728impl<R> Drop for CacheTaskHandle<R> {
729 fn drop(&mut self) {
730 if let Some(tx) = self.to_prewarm_task.take() {
732 let _ = tx.send(PrewarmTaskEvent::Terminate { execution_outcome: None });
733 }
734 }
735}
736
737#[derive(Clone, Debug, Default)]
763struct ExecutionCache {
764 inner: Arc<RwLock<Option<SavedCache>>>,
766}
767
768impl ExecutionCache {
769 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
775 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
776 let start = Instant::now();
777 let cache = self.inner.read();
778
779 let elapsed = start.elapsed();
780 if elapsed.as_millis() > 5 {
781 warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
782 }
783
784 cache
785 .as_ref()
786 .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
789 .cloned()
790 }
791
792 #[expect(unused)]
794 pub(crate) fn clear(&self) {
795 self.inner.write().take();
796 }
797
798 pub(crate) fn update_with_guard<F>(&self, update_fn: F)
812 where
813 F: FnOnce(&mut Option<SavedCache>),
814 {
815 let mut guard = self.inner.write();
816 update_fn(&mut guard);
817 }
818}
819
820#[derive(Debug, Clone)]
822pub struct ExecutionEnv<Evm: ConfigureEvm> {
823 pub evm_env: EvmEnvFor<Evm>,
825 pub hash: B256,
827 pub parent_hash: B256,
829}
830
831impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
832where
833 EvmEnvFor<Evm>: Default,
834{
835 fn default() -> Self {
836 Self {
837 evm_env: Default::default(),
838 hash: Default::default(),
839 parent_hash: Default::default(),
840 }
841 }
842}
843
844#[cfg(test)]
845mod tests {
846 use super::ExecutionCache;
847 use crate::tree::{
848 cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
849 payload_processor::{
850 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
851 },
852 precompile_cache::PrecompileCacheMap,
853 StateProviderBuilder, TreeConfig,
854 };
855 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
856 use alloy_evm::block::StateChangeSource;
857 use rand::Rng;
858 use reth_chainspec::ChainSpec;
859 use reth_db_common::init::init_genesis;
860 use reth_ethereum_primitives::TransactionSigned;
861 use reth_evm::OnStateHook;
862 use reth_evm_ethereum::EthEvmConfig;
863 use reth_primitives_traits::{Account, Recovered, StorageEntry};
864 use reth_provider::{
865 providers::{BlockchainProvider, OverlayStateProviderFactory},
866 test_utils::create_test_provider_factory_with_chain_spec,
867 ChainSpecProvider, HashingWriter,
868 };
869 use reth_revm::db::BundleState;
870 use reth_testing_utils::generators;
871 use reth_trie::{test_utils::state_root, HashedPostState};
872 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
873 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
874 use std::sync::Arc;
875
876 fn make_saved_cache(hash: B256) -> SavedCache {
877 let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
878 SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
879 }
880
881 #[test]
882 fn execution_cache_allows_single_checkout() {
883 let execution_cache = ExecutionCache::default();
884 let hash = B256::from([1u8; 32]);
885
886 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
887
888 let first = execution_cache.get_cache_for(hash);
889 assert!(first.is_some(), "expected initial checkout to succeed");
890
891 let second = execution_cache.get_cache_for(hash);
892 assert!(second.is_none(), "second checkout should be blocked while guard is active");
893
894 drop(first);
895
896 let third = execution_cache.get_cache_for(hash);
897 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
898 }
899
900 #[test]
901 fn execution_cache_checkout_releases_on_drop() {
902 let execution_cache = ExecutionCache::default();
903 let hash = B256::from([2u8; 32]);
904
905 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
906
907 {
908 let guard = execution_cache.get_cache_for(hash);
909 assert!(guard.is_some(), "expected checkout to succeed");
910 }
912
913 let retry = execution_cache.get_cache_for(hash);
914 assert!(retry.is_some(), "checkout should succeed after guard drop");
915 }
916
917 #[test]
918 fn execution_cache_mismatch_parent_returns_none() {
919 let execution_cache = ExecutionCache::default();
920 let hash = B256::from([3u8; 32]);
921
922 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
923
924 let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
925 assert!(miss.is_none(), "checkout should fail for different parent hash");
926 }
927
928 #[test]
929 fn execution_cache_update_after_release_succeeds() {
930 let execution_cache = ExecutionCache::default();
931 let initial = B256::from([5u8; 32]);
932
933 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
934
935 let guard =
936 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
937
938 drop(guard);
939
940 let updated = B256::from([6u8; 32]);
941 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
942
943 let new_checkout = execution_cache.get_cache_for(updated);
944 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
945 }
946
947 #[test]
948 fn on_inserted_executed_block_populates_cache() {
949 let payload_processor = PayloadProcessor::new(
950 WorkloadExecutor::default(),
951 EthEvmConfig::new(Arc::new(ChainSpec::default())),
952 &TreeConfig::default(),
953 PrecompileCacheMap::default(),
954 );
955
956 let parent_hash = B256::from([1u8; 32]);
957 let block_hash = B256::from([10u8; 32]);
958 let block_with_parent = BlockWithParent {
959 block: BlockNumHash { hash: block_hash, number: 1 },
960 parent: parent_hash,
961 };
962 let bundle_state = BundleState::default();
963
964 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
966
967 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
969
970 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
972 assert!(cached.is_some());
973 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
974 }
975
976 #[test]
977 fn on_inserted_executed_block_skips_on_parent_mismatch() {
978 let payload_processor = PayloadProcessor::new(
979 WorkloadExecutor::default(),
980 EthEvmConfig::new(Arc::new(ChainSpec::default())),
981 &TreeConfig::default(),
982 PrecompileCacheMap::default(),
983 );
984
985 let block1_hash = B256::from([1u8; 32]);
987 payload_processor
988 .execution_cache
989 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
990
991 let wrong_parent = B256::from([99u8; 32]);
993 let block3_hash = B256::from([3u8; 32]);
994 let block_with_parent = BlockWithParent {
995 block: BlockNumHash { hash: block3_hash, number: 3 },
996 parent: wrong_parent,
997 };
998 let bundle_state = BundleState::default();
999
1000 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1001
1002 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1004 assert!(cached.is_some(), "Original cache should be preserved");
1005
1006 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1008 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1009 }
1010
1011 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1012 let mut rng = generators::rng();
1013 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1014 let mut updates = Vec::with_capacity(updates_per_account);
1015
1016 for _ in 0..updates_per_account {
1017 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1018 let mut state_update = EvmState::default();
1019
1020 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1021
1022 for &address in selected_addresses {
1023 let mut storage = HashMap::default();
1024 if rng.random_bool(0.7) {
1025 for _ in 0..rng.random_range(1..10) {
1026 let slot = U256::from(rng.random::<u64>());
1027 storage.insert(
1028 slot,
1029 EvmStorageSlot::new_changed(
1030 U256::ZERO,
1031 U256::from(rng.random::<u64>()),
1032 0,
1033 ),
1034 );
1035 }
1036 }
1037
1038 let account = revm_state::Account {
1039 info: AccountInfo {
1040 balance: U256::from(rng.random::<u64>()),
1041 nonce: rng.random::<u64>(),
1042 code_hash: KECCAK_EMPTY,
1043 code: Some(Default::default()),
1044 },
1045 storage,
1046 status: AccountStatus::Touched,
1047 transaction_id: 0,
1048 };
1049
1050 state_update.insert(address, account);
1051 }
1052
1053 updates.push(state_update);
1054 }
1055
1056 updates
1057 }
1058
1059 #[test]
1060 fn test_state_root() {
1061 reth_tracing::init_test_tracing();
1062
1063 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1064 let genesis_hash = init_genesis(&factory).unwrap();
1065
1066 let state_updates = create_mock_state_updates(10, 10);
1067 let mut hashed_state = HashedPostState::default();
1068 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1069 HashMap::default();
1070
1071 {
1072 let provider_rw = factory.provider_rw().expect("failed to get provider");
1073
1074 for update in &state_updates {
1075 let account_updates = update.iter().map(|(address, account)| {
1076 (*address, Some(Account::from_revm_account(account)))
1077 });
1078 provider_rw
1079 .insert_account_for_hashing(account_updates)
1080 .expect("failed to insert accounts");
1081
1082 let storage_updates = update.iter().map(|(address, account)| {
1083 let storage_entries = account.storage.iter().map(|(slot, value)| {
1084 StorageEntry { key: B256::from(*slot), value: value.present_value }
1085 });
1086 (*address, storage_entries)
1087 });
1088 provider_rw
1089 .insert_storage_for_hashing(storage_updates)
1090 .expect("failed to insert storage");
1091 }
1092 provider_rw.commit().expect("failed to commit changes");
1093 }
1094
1095 for update in &state_updates {
1096 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1097
1098 for (address, account) in update {
1099 let storage: HashMap<B256, U256> = account
1100 .storage
1101 .iter()
1102 .map(|(k, v)| (B256::from(*k), v.present_value))
1103 .collect();
1104
1105 let entry = accumulated_state.entry(*address).or_default();
1106 entry.0 = Account::from_revm_account(account);
1107 entry.1.extend(storage);
1108 }
1109 }
1110
1111 let mut payload_processor = PayloadProcessor::new(
1112 WorkloadExecutor::default(),
1113 EthEvmConfig::new(factory.chain_spec()),
1114 &TreeConfig::default(),
1115 PrecompileCacheMap::default(),
1116 );
1117
1118 let provider_factory = BlockchainProvider::new(factory).unwrap();
1119
1120 let mut handle = payload_processor.spawn(
1121 Default::default(),
1122 (
1123 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1124 std::convert::identity,
1125 ),
1126 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1127 OverlayStateProviderFactory::new(provider_factory),
1128 &TreeConfig::default(),
1129 None, );
1131
1132 let mut state_hook = handle.state_hook();
1133
1134 for (i, update) in state_updates.into_iter().enumerate() {
1135 state_hook.on_state(StateChangeSource::Transaction(i), &update);
1136 }
1137 drop(state_hook);
1138
1139 let root_from_task = handle.state_root().expect("task failed").state_root;
1140 let root_from_regular = state_root(accumulated_state);
1141
1142 assert_eq!(
1143 root_from_task, root_from_regular,
1144 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1145 );
1146 }
1147}