1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6 sparse_trie::SparseTrieCacheTask,
7 CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
8 PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
9};
10use alloy_eip7928::BlockAccessList;
11use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
12use alloy_primitives::B256;
13use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
14use multiproof::*;
15use prewarm::PrewarmMetrics;
16use rayon::prelude::*;
17use reth_evm::{
18 block::ExecutableTxParts,
19 execute::{ExecutableTxFor, WithTxEnv},
20 ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
21 SpecFor, TxEnvFor,
22};
23use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
24use reth_provider::{
25 BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
26};
27use reth_revm::db::BundleState;
28use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
29use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
30use reth_trie_parallel::{
31 proof_task::{ProofTaskCtx, ProofWorkerHandle},
32 root::ParallelStateRootError,
33};
34use reth_trie_sparse::{
35 ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
36};
37use std::{
38 ops::Not,
39 sync::{
40 atomic::{AtomicBool, AtomicUsize},
41 mpsc::{self, channel},
42 Arc,
43 },
44};
45use tracing::{debug, debug_span, instrument, trace, warn, Span};
46
47pub mod multiproof;
48mod preserved_sparse_trie;
49pub mod prewarm;
50pub mod receipt_root_task;
51pub mod sparse_trie;
52
53use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
54
55pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
64
65pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
77
78pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
81
82type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
84 WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
85 <I as ExecutableTxTuple>::Error,
86 <N as NodePrimitives>::Receipt,
87>;
88
89#[derive(Debug)]
91pub struct PayloadProcessor<Evm>
92where
93 Evm: ConfigureEvm,
94{
95 executor: Runtime,
97 execution_cache: PayloadExecutionCache,
99 cache_metrics: Option<CachedStateMetrics>,
101 trie_metrics: MultiProofTaskMetrics,
103 cross_block_cache_size: usize,
105 disable_transaction_prewarming: bool,
107 disable_state_cache: bool,
109 evm_config: Evm,
111 precompile_cache_disabled: bool,
113 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
115 sparse_state_trie: SharedPreservedSparseTrie,
119 sparse_trie_max_hot_slots: usize,
121 sparse_trie_max_hot_accounts: usize,
123 disable_sparse_trie_cache_pruning: bool,
125}
126
127impl<N, Evm> PayloadProcessor<Evm>
128where
129 N: NodePrimitives,
130 Evm: ConfigureEvm<Primitives = N>,
131{
132 pub const fn executor(&self) -> &Runtime {
134 &self.executor
135 }
136
137 pub fn new(
139 executor: Runtime,
140 evm_config: Evm,
141 config: &TreeConfig,
142 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
143 ) -> Self {
144 Self {
145 executor,
146 execution_cache: Default::default(),
147 trie_metrics: Default::default(),
148 cross_block_cache_size: config.cross_block_cache_size(),
149 disable_transaction_prewarming: config.disable_prewarming(),
150 evm_config,
151 disable_state_cache: config.disable_state_cache(),
152 precompile_cache_disabled: config.precompile_cache_disabled(),
153 precompile_cache_map,
154 sparse_state_trie: SharedPreservedSparseTrie::default(),
155 sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
156 sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
157 disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
158 cache_metrics: (!config.disable_cache_metrics())
159 .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
160 }
161 }
162}
163
164impl<Evm> WaitForCaches for PayloadProcessor<Evm>
165where
166 Evm: ConfigureEvm,
167{
168 fn wait_for_caches(&self) -> CacheWaitDurations {
169 debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
170
171 let execution_cache = self.execution_cache.clone();
173 let sparse_trie = self.sparse_state_trie.clone();
174
175 let (execution_tx, execution_rx) = std::sync::mpsc::channel();
177 let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
178
179 self.executor.spawn_blocking_named("wait-exec-cache", move || {
180 let _ = execution_tx.send(execution_cache.wait_for_availability());
181 });
182 self.executor.spawn_blocking_named("wait-sparse-tri", move || {
183 let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
184 });
185
186 let execution_cache_duration =
187 execution_rx.recv().expect("execution cache wait task failed to send result");
188 let sparse_trie_duration =
189 sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
190
191 debug!(
192 target: "engine::tree::payload_processor",
193 ?execution_cache_duration,
194 ?sparse_trie_duration,
195 "Execution cache and sparse trie locks acquired"
196 );
197 CacheWaitDurations {
198 execution_cache: execution_cache_duration,
199 sparse_trie: sparse_trie_duration,
200 }
201 }
202}
203
204impl<N, Evm> PayloadProcessor<Evm>
205where
206 N: NodePrimitives,
207 Evm: ConfigureEvm<Primitives = N> + 'static,
208{
209 #[instrument(
232 level = "debug",
233 target = "engine::tree::payload_processor",
234 name = "payload processor",
235 skip_all
236 )]
237 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
238 &mut self,
239 env: ExecutionEnv<Evm>,
240 transactions: I,
241 provider_builder: StateProviderBuilder<N, P>,
242 multiproof_provider_factory: F,
243 config: &TreeConfig,
244 bal: Option<Arc<BlockAccessList>>,
245 ) -> IteratorPayloadHandle<Evm, I, N>
246 where
247 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
248 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
249 + Clone
250 + Send
251 + Sync
252 + 'static,
253 {
254 let (prewarm_rx, execution_rx) =
256 self.spawn_tx_iterator(transactions, env.transaction_count);
257
258 let span = Span::current();
259
260 let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
261 let state_root_handle = self.spawn_state_root(
262 multiproof_provider_factory,
263 env.parent_state_root,
264 halve_workers,
265 config,
266 );
267 let install_state_hook = bal.is_none();
268 let prewarm_handle = self.spawn_caching_with(
269 env,
270 prewarm_rx,
271 provider_builder,
272 Some(state_root_handle.updates_tx().clone()),
273 bal,
274 );
275
276 PayloadHandle {
277 state_root_handle: Some(state_root_handle),
278 install_state_hook,
279 prewarm_handle,
280 transactions: execution_rx,
281 _span: span,
282 }
283 }
284
285 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
289 pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
290 &self,
291 env: ExecutionEnv<Evm>,
292 transactions: I,
293 provider_builder: StateProviderBuilder<N, P>,
294 bal: Option<Arc<BlockAccessList>>,
295 ) -> IteratorPayloadHandle<Evm, I, N>
296 where
297 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
298 {
299 let (prewarm_rx, execution_rx) =
300 self.spawn_tx_iterator(transactions, env.transaction_count);
301 let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
302 PayloadHandle {
303 state_root_handle: None,
304 install_state_hook: false,
305 prewarm_handle,
306 transactions: execution_rx,
307 _span: Span::current(),
308 }
309 }
310
311 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
324 pub fn spawn_state_root<F>(
325 &self,
326 multiproof_provider_factory: F,
327 parent_state_root: B256,
328 halve_workers: bool,
329 config: &TreeConfig,
330 ) -> StateRootHandle
331 where
332 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
333 + Clone
334 + Send
335 + Sync
336 + 'static,
337 {
338 let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
339
340 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
341 #[cfg(feature = "trie-debug")]
342 let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
343 let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
344
345 let (state_root_tx, state_root_rx) = channel();
346
347 self.spawn_sparse_trie_task(
348 proof_handle,
349 state_root_tx,
350 from_multi_proof,
351 parent_state_root,
352 config.multiproof_chunk_size(),
353 );
354
355 StateRootHandle::new(parent_state_root, updates_tx, state_root_rx)
356 }
357
358 const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
361
362 const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
369
370 const PARALLEL_PREFETCH_COUNT: usize = 4;
378
379 #[expect(clippy::type_complexity)]
386 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
387 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
388 &self,
389 transactions: I,
390 transaction_count: usize,
391 ) -> (
392 mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
393 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
394 ) {
395 let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
396 let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
397
398 if transaction_count == 0 {
399 } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
401 debug!(
404 target: "engine::tree::payload_processor",
405 transaction_count,
406 "using sequential sig recovery for small block"
407 );
408 self.executor.spawn_blocking_named("tx-iterator", move || {
409 let (transactions, convert) = transactions.into_parts();
410 convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
411 });
412 } else {
413 let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
420 let executor = self.executor.clone();
421 self.executor.spawn_blocking_named("tx-iterator", move || {
422 let (transactions, convert) = transactions.into_parts();
423 let mut all: Vec<_> = transactions.into_iter().collect();
424 let rest = all.split_off(prefetch.min(all.len()));
425
426 convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
429
430 rest.into_par_iter()
432 .enumerate()
433 .map(|(i, tx)| {
434 let idx = i + prefetch;
435 let tx = convert.convert(tx);
436 (idx, tx)
437 })
438 .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
439 let tx = tx.map(|tx| {
440 let (tx_env, tx) = tx.into_parts();
441 let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
442 let _ = prewarm_tx.send((idx, tx.clone()));
443 tx
444 });
445 let _ = execute_tx.send(tx);
446 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
447 });
448 });
449 }
450
451 (prewarm_rx, execute_rx)
452 }
453
454 #[instrument(
456 level = "debug",
457 target = "engine::tree::payload_processor",
458 skip_all,
459 fields(bal=%bal.is_some())
460 )]
461 fn spawn_caching_with<P>(
462 &self,
463 env: ExecutionEnv<Evm>,
464 transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
465 provider_builder: StateProviderBuilder<N, P>,
466 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
467 bal: Option<Arc<BlockAccessList>>,
468 ) -> CacheTaskHandle<N::Receipt>
469 where
470 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
471 {
472 let skip_prewarm =
473 self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
474
475 let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
476
477 let executed_tx_index = Arc::new(AtomicUsize::new(0));
478
479 let prewarm_ctx = PrewarmContext {
481 env,
482 evm_config: self.evm_config.clone(),
483 saved_cache: saved_cache.clone(),
484 provider: provider_builder,
485 metrics: PrewarmMetrics::default(),
486 cache_metrics: self.cache_metrics.clone(),
487 terminate_execution: Arc::new(AtomicBool::new(false)),
488 executed_tx_index: Arc::clone(&executed_tx_index),
489 precompile_cache_disabled: self.precompile_cache_disabled,
490 precompile_cache_map: self.precompile_cache_map.clone(),
491 };
492
493 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
494 self.executor.clone(),
495 self.execution_cache.clone(),
496 prewarm_ctx,
497 to_sparse_trie_task,
498 );
499
500 {
501 let to_prewarm_task = to_prewarm_task.clone();
502 self.executor.spawn_blocking_named("prewarm", move || {
503 let mode = if skip_prewarm {
504 PrewarmMode::Skipped
505 } else if let Some(bal) = bal {
506 PrewarmMode::BlockAccessList(bal)
507 } else {
508 PrewarmMode::Transactions(transactions)
509 };
510 prewarm_task.run(mode, to_prewarm_task);
511 });
512 }
513
514 CacheTaskHandle {
515 saved_cache,
516 to_prewarm_task: Some(to_prewarm_task),
517 executed_tx_index,
518 cache_metrics: self.cache_metrics.clone(),
519 }
520 }
521
522 #[instrument(level = "debug", target = "engine::caching", skip(self))]
527 pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
528 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
529 debug!("reusing execution cache");
530 cache
531 } else {
532 debug!("creating new execution cache on cache miss");
533 let start = Instant::now();
534 let cache = ExecutionCache::new(self.cross_block_cache_size);
535 if let Some(metrics) = &self.cache_metrics {
536 metrics.record_cache_creation(start.elapsed());
537 }
538 SavedCache::new(parent_hash, cache)
539 }
540 }
541
542 fn spawn_sparse_trie_task(
546 &self,
547 proof_worker_handle: ProofWorkerHandle,
548 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
549 from_multi_proof: CrossbeamReceiver<StateRootMessage>,
550 parent_state_root: B256,
551 chunk_size: usize,
552 ) {
553 let preserved_sparse_trie = self.sparse_state_trie.clone();
554 let trie_metrics = self.trie_metrics.clone();
555 let max_hot_slots = self.sparse_trie_max_hot_slots;
556 let max_hot_accounts = self.sparse_trie_max_hot_accounts;
557 let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
558 let executor = self.executor.clone();
559
560 let parent_span = Span::current();
561 self.executor.spawn_blocking_named("sparse-trie", move || {
562 reth_tasks::once!(increase_thread_priority);
563
564 let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
565 .entered();
566
567 let start = Instant::now();
572 let preserved = preserved_sparse_trie.take();
573 trie_metrics
574 .sparse_trie_cache_wait_duration_histogram
575 .record(start.elapsed().as_secs_f64());
576
577 let mut sparse_state_trie = preserved
578 .map(|preserved| preserved.into_trie_for(parent_state_root))
579 .unwrap_or_else(|| {
580 debug!(
581 target: "engine::tree::payload_processor",
582 "Creating new sparse trie - no preserved trie available"
583 );
584 let default_trie = RevealableSparseTrie::blind_from(
585 ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
586 );
587 SparseStateTrie::default()
588 .with_accounts_trie(default_trie.clone())
589 .with_default_storage_trie(default_trie)
590 .with_updates(true)
591 });
592 sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
593
594 let mut task = SparseTrieCacheTask::new_with_trie(
595 &executor,
596 from_multi_proof,
597 proof_worker_handle,
598 trie_metrics.clone(),
599 sparse_state_trie,
600 chunk_size,
601 );
602
603 let result = task.run();
604
605 let mut guard = preserved_sparse_trie.lock();
611
612 let task_result = result.as_ref().ok().cloned();
613 if state_root_tx.send(result).is_err() {
615 debug!(
618 target: "engine::tree::payload_processor",
619 "State root receiver dropped, clearing trie"
620 );
621 let (trie, deferred) = task.into_cleared_trie(
622 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
623 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
624 );
625 guard.store(PreservedSparseTrie::cleared(trie));
626 drop(guard);
627 executor.spawn_drop(deferred);
628 return;
629 }
630
631 let _enter =
634 debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
635 let deferred = if let Some(result) = task_result {
636 let start = Instant::now();
637 let (trie, deferred) = task.into_trie_for_reuse(
638 max_hot_slots,
639 max_hot_accounts,
640 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
641 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
642 disable_cache_pruning,
643 &result.trie_updates,
644 );
645 trie_metrics
646 .into_trie_for_reuse_duration_histogram
647 .record(start.elapsed().as_secs_f64());
648 trie_metrics
649 .sparse_trie_retained_memory_bytes
650 .set(trie.memory_size() as f64);
651 trie_metrics
652 .sparse_trie_retained_storage_tries
653 .set(trie.retained_storage_tries_count() as f64);
654 guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
655 deferred
656 } else {
657 debug!(
658 target: "engine::tree::payload_processor",
659 "State root computation failed, clearing trie"
660 );
661 let (trie, deferred) = task.into_cleared_trie(
662 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
663 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
664 );
665 guard.store(PreservedSparseTrie::cleared(trie));
666 deferred
667 };
668 drop(guard);
669 executor.spawn_drop(deferred);
670 });
671 }
672
673 pub fn on_inserted_executed_block(
681 &self,
682 block_with_parent: BlockWithParent,
683 bundle_state: &BundleState,
684 ) {
685 let cache_metrics = self.cache_metrics.clone();
686 self.execution_cache.update_with_guard(|cached| {
687 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
688 debug!(
689 target: "engine::caching",
690 parent_hash = %block_with_parent.parent,
691 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
692 );
693 return
694 }
695
696 let caches = match cached.take() {
698 Some(existing) => existing.cache().clone(),
699 None => ExecutionCache::new(self.cross_block_cache_size),
700 };
701
702 let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
704 if new_cache.cache().insert_state(bundle_state).is_err() {
705 *cached = None;
706 debug!(target: "engine::caching", "cleared execution cache on update error");
707 return
708 }
709 new_cache.update_metrics(cache_metrics.as_ref());
710
711 *cached = Some(new_cache);
713 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
714 });
715 }
716}
717
718fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
720 iter: impl Iterator<Item = RawTx>,
721 convert: &C,
722 prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
723 execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
724) where
725 Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
726 TxEnv: Clone,
727 C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
728{
729 for (idx, raw_tx) in iter.enumerate() {
730 let tx = convert.convert(raw_tx);
731 let tx = tx.map(|tx| {
732 let (tx_env, tx) = tx.into_parts();
733 WithTxEnv { tx_env, tx: Arc::new(tx) }
734 });
735 if let Ok(tx) = &tx {
736 let _ = prewarm_tx.send((idx, tx.clone()));
737 }
738 let _ = execute_tx.send(tx);
739 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
740 }
741}
742
743#[derive(Debug)]
748pub struct PayloadHandle<Tx, Err, R> {
749 state_root_handle: Option<StateRootHandle>,
751 install_state_hook: bool,
753 prewarm_handle: CacheTaskHandle<R>,
755 transactions: mpsc::Receiver<Result<Tx, Err>>,
757 _span: Span,
759}
760
761impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
762 #[instrument(
768 level = "debug",
769 target = "engine::tree::payload_processor",
770 name = "await_state_root",
771 skip_all
772 )]
773 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
774 self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
775 }
776
777 pub const fn take_state_root_rx(
784 &mut self,
785 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
786 self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
787 }
788
789 pub fn state_hook(&self) -> Option<impl OnStateHook> {
793 self.install_state_hook
794 .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
795 .flatten()
796 }
797
798 pub fn caches(&self) -> Option<ExecutionCache> {
800 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
801 }
802
803 pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
805 self.prewarm_handle.cache_metrics.clone()
806 }
807
808 pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
813 &self.prewarm_handle.executed_tx_index
814 }
815
816 pub fn stop_prewarming_execution(&self) {
820 self.prewarm_handle.stop_prewarming_execution()
821 }
822
823 pub fn terminate_caching(
831 &mut self,
832 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
833 ) -> Option<mpsc::Sender<()>> {
834 self.prewarm_handle.terminate_caching(execution_outcome)
835 }
836
837 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
839 self.transactions.iter()
840 }
841}
842
843#[derive(Debug)]
848pub struct CacheTaskHandle<R> {
849 saved_cache: Option<SavedCache>,
851 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
853 executed_tx_index: Arc<AtomicUsize>,
856 cache_metrics: Option<CachedStateMetrics>,
858}
859
860impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
861 pub fn stop_prewarming_execution(&self) {
865 self.to_prewarm_task
866 .as_ref()
867 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
868 }
869
870 #[must_use = "sender must be used and notified on block validation success"]
875 pub fn terminate_caching(
876 &mut self,
877 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
878 ) -> Option<mpsc::Sender<()>> {
879 if let Some(tx) = self.to_prewarm_task.take() {
880 let (valid_block_tx, valid_block_rx) = mpsc::channel();
881 let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
882 let _ = tx.send(event);
883
884 Some(valid_block_tx)
885 } else {
886 None
887 }
888 }
889}
890
891impl<R> Drop for CacheTaskHandle<R> {
892 fn drop(&mut self) {
893 if let Some(tx) = self.to_prewarm_task.take() {
895 let _ = tx.send(PrewarmTaskEvent::Terminate {
896 execution_outcome: None,
897 valid_block_rx: mpsc::channel().1,
898 });
899 }
900 }
901}
902
903#[derive(Debug, Clone)]
905pub struct ExecutionEnv<Evm: ConfigureEvm> {
906 pub evm_env: EvmEnvFor<Evm>,
908 pub hash: B256,
910 pub parent_hash: B256,
912 pub parent_state_root: B256,
916 pub transaction_count: usize,
920 pub gas_used: u64,
923 pub withdrawals: Option<Vec<Withdrawal>>,
926}
927
928impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
929where
930 EvmEnvFor<Evm>: Default,
931{
932 #[cfg(any(test, feature = "test-utils"))]
934 pub fn test_default() -> Self {
935 Self {
936 evm_env: Default::default(),
937 hash: Default::default(),
938 parent_hash: Default::default(),
939 parent_state_root: Default::default(),
940 transaction_count: 0,
941 gas_used: 0,
942 withdrawals: None,
943 }
944 }
945}
946
947#[cfg(test)]
948mod tests {
949 use crate::tree::{
950 payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
951 precompile_cache::PrecompileCacheMap,
952 ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
953 };
954 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
955 use alloy_evm::block::StateChangeSource;
956 use rand::Rng;
957 use reth_chainspec::ChainSpec;
958 use reth_db_common::init::init_genesis;
959 use reth_ethereum_primitives::TransactionSigned;
960 use reth_evm::OnStateHook;
961 use reth_evm_ethereum::EthEvmConfig;
962 use reth_primitives_traits::{Account, Recovered, StorageEntry};
963 use reth_provider::{
964 providers::{BlockchainProvider, OverlayStateProviderFactory},
965 test_utils::create_test_provider_factory_with_chain_spec,
966 ChainSpecProvider, HashingWriter,
967 };
968 use reth_revm::db::BundleState;
969 use reth_testing_utils::generators;
970 use reth_trie::{test_utils::state_root, HashedPostState};
971 use reth_trie_db::ChangesetCache;
972 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
973 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
974 use std::sync::Arc;
975
976 fn make_saved_cache(hash: B256) -> SavedCache {
977 let execution_cache = ExecutionCache::new(1_000);
978 SavedCache::new(hash, execution_cache)
979 }
980
981 #[test]
982 fn execution_cache_allows_single_checkout() {
983 let execution_cache = PayloadExecutionCache::default();
984 let hash = B256::from([1u8; 32]);
985
986 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
987
988 let first = execution_cache.get_cache_for(hash);
989 assert!(first.is_some(), "expected initial checkout to succeed");
990
991 let second = execution_cache.get_cache_for(hash);
992 assert!(second.is_none(), "second checkout should be blocked while guard is active");
993
994 drop(first);
995
996 let third = execution_cache.get_cache_for(hash);
997 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
998 }
999
1000 #[test]
1001 fn execution_cache_checkout_releases_on_drop() {
1002 let execution_cache = PayloadExecutionCache::default();
1003 let hash = B256::from([2u8; 32]);
1004
1005 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1006
1007 {
1008 let guard = execution_cache.get_cache_for(hash);
1009 assert!(guard.is_some(), "expected checkout to succeed");
1010 }
1012
1013 let retry = execution_cache.get_cache_for(hash);
1014 assert!(retry.is_some(), "checkout should succeed after guard drop");
1015 }
1016
1017 #[test]
1018 fn execution_cache_mismatch_parent_clears_and_returns() {
1019 let execution_cache = PayloadExecutionCache::default();
1020 let hash = B256::from([3u8; 32]);
1021
1022 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1023
1024 let different_hash = B256::from([4u8; 32]);
1027 let cache = execution_cache.get_cache_for(different_hash);
1028 assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1029
1030 drop(cache);
1031
1032 let original = execution_cache.get_cache_for(hash);
1035 assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1036 }
1037
1038 #[test]
1039 fn execution_cache_update_after_release_succeeds() {
1040 let execution_cache = PayloadExecutionCache::default();
1041 let initial = B256::from([5u8; 32]);
1042
1043 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1044
1045 let guard =
1046 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1047
1048 drop(guard);
1049
1050 let updated = B256::from([6u8; 32]);
1051 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1052
1053 let new_checkout = execution_cache.get_cache_for(updated);
1054 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1055 }
1056
1057 #[test]
1058 fn on_inserted_executed_block_populates_cache() {
1059 let payload_processor = PayloadProcessor::new(
1060 reth_tasks::Runtime::test(),
1061 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1062 &TreeConfig::default(),
1063 PrecompileCacheMap::default(),
1064 );
1065
1066 let parent_hash = B256::from([1u8; 32]);
1067 let block_hash = B256::from([10u8; 32]);
1068 let block_with_parent = BlockWithParent {
1069 block: BlockNumHash { hash: block_hash, number: 1 },
1070 parent: parent_hash,
1071 };
1072 let bundle_state = BundleState::default();
1073
1074 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1076
1077 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1079
1080 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1082 assert!(cached.is_some());
1083 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1084 }
1085
1086 #[test]
1087 fn on_inserted_executed_block_skips_on_parent_mismatch() {
1088 let payload_processor = PayloadProcessor::new(
1089 reth_tasks::Runtime::test(),
1090 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1091 &TreeConfig::default(),
1092 PrecompileCacheMap::default(),
1093 );
1094
1095 let block1_hash = B256::from([1u8; 32]);
1097 payload_processor
1098 .execution_cache
1099 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1100
1101 let wrong_parent = B256::from([99u8; 32]);
1103 let block3_hash = B256::from([3u8; 32]);
1104 let block_with_parent = BlockWithParent {
1105 block: BlockNumHash { hash: block3_hash, number: 3 },
1106 parent: wrong_parent,
1107 };
1108 let bundle_state = BundleState::default();
1109
1110 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1111
1112 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1114 assert!(cached.is_some(), "Original cache should be preserved");
1115
1116 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1118 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1119 }
1120
1121 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1122 let mut rng = generators::rng();
1123 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1124 let mut updates = Vec::with_capacity(updates_per_account);
1125
1126 for _ in 0..updates_per_account {
1127 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1128 let mut state_update = EvmState::default();
1129
1130 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1131
1132 for &address in selected_addresses {
1133 let mut storage = HashMap::default();
1134 if rng.random_bool(0.7) {
1135 for _ in 0..rng.random_range(1..10) {
1136 let slot = U256::from(rng.random::<u64>());
1137 storage.insert(
1138 slot,
1139 EvmStorageSlot::new_changed(
1140 U256::ZERO,
1141 U256::from(rng.random::<u64>()),
1142 0,
1143 ),
1144 );
1145 }
1146 }
1147
1148 let account = revm_state::Account {
1149 info: AccountInfo {
1150 balance: U256::from(rng.random::<u64>()),
1151 nonce: rng.random::<u64>(),
1152 code_hash: KECCAK_EMPTY,
1153 code: Some(Default::default()),
1154 account_id: None,
1155 },
1156 original_info: Box::new(AccountInfo::default()),
1157 storage,
1158 status: AccountStatus::Touched,
1159 transaction_id: 0,
1160 };
1161
1162 state_update.insert(address, account);
1163 }
1164
1165 updates.push(state_update);
1166 }
1167
1168 updates
1169 }
1170
1171 #[test]
1172 fn test_state_root() {
1173 reth_tracing::init_test_tracing();
1174
1175 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1176 let genesis_hash = init_genesis(&factory).unwrap();
1177
1178 let state_updates = create_mock_state_updates(10, 10);
1179 let mut hashed_state = HashedPostState::default();
1180 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1181 HashMap::default();
1182
1183 {
1184 let provider_rw = factory.provider_rw().expect("failed to get provider");
1185
1186 for update in &state_updates {
1187 let account_updates = update.iter().map(|(address, account)| {
1188 (*address, Some(Account::from_revm_account(account)))
1189 });
1190 provider_rw
1191 .insert_account_for_hashing(account_updates)
1192 .expect("failed to insert accounts");
1193
1194 let storage_updates = update.iter().map(|(address, account)| {
1195 let storage_entries = account.storage.iter().map(|(slot, value)| {
1196 StorageEntry { key: B256::from(*slot), value: value.present_value }
1197 });
1198 (*address, storage_entries)
1199 });
1200 provider_rw
1201 .insert_storage_for_hashing(storage_updates)
1202 .expect("failed to insert storage");
1203 }
1204 provider_rw.commit().expect("failed to commit changes");
1205 }
1206
1207 for update in &state_updates {
1208 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1209
1210 for (address, account) in update {
1211 let storage: HashMap<B256, U256> = account
1212 .storage
1213 .iter()
1214 .map(|(k, v)| (B256::from(*k), v.present_value))
1215 .collect();
1216
1217 let entry = accumulated_state.entry(*address).or_default();
1218 entry.0 = Account::from_revm_account(account);
1219 entry.1.extend(storage);
1220 }
1221 }
1222
1223 let mut payload_processor = PayloadProcessor::new(
1224 reth_tasks::Runtime::test(),
1225 EthEvmConfig::new(factory.chain_spec()),
1226 &TreeConfig::default(),
1227 PrecompileCacheMap::default(),
1228 );
1229
1230 let provider_factory = BlockchainProvider::new(factory).unwrap();
1231
1232 let mut handle = payload_processor.spawn(
1233 ExecutionEnv::test_default(),
1234 (
1235 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1236 std::convert::identity,
1237 ),
1238 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1239 OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1240 &TreeConfig::default(),
1241 None, );
1243
1244 let mut state_hook = handle.state_hook().expect("state hook is None");
1245
1246 for (i, update) in state_updates.into_iter().enumerate() {
1247 state_hook.on_state(StateChangeSource::Transaction(i), &update);
1248 }
1249 drop(state_hook);
1250
1251 let root_from_task = handle.state_root().expect("task failed").state_root;
1252 let root_from_regular = state_root(accumulated_state);
1253
1254 assert_eq!(
1255 root_from_task, root_from_regular,
1256 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1257 );
1258 }
1259
1260 #[test]
1271 fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1272 let execution_cache = PayloadExecutionCache::default();
1273
1274 let block4_hash = B256::from([4u8; 32]);
1276 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1277
1278 let fork_parent = B256::from([2u8; 32]);
1281 let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1282 assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1283 let prewarm_cache = prewarm_cache.unwrap();
1284 assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1285
1286 let fork_addr = Address::from([0xBB; 20]);
1289 let fork_key = B256::from([0xCC; 32]);
1290 prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1291
1292 let during_prewarm = execution_cache.get_cache_for(block4_hash);
1294 assert!(
1295 during_prewarm.is_none(),
1296 "cache must be unavailable while prewarm holds a reference"
1297 );
1298
1299 drop(prewarm_cache);
1301
1302 let block5_cache = execution_cache.get_cache_for(block4_hash);
1306 assert!(
1307 block5_cache.is_some(),
1308 "canonical chain must get cache after fork prewarm is dropped"
1309 );
1310 assert_eq!(
1311 block5_cache.as_ref().unwrap().executed_block_hash(),
1312 block4_hash,
1313 "cache must carry the canonical parent hash, not the fork parent"
1314 );
1315 }
1316}