1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache},
6 payload_processor::{
7 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
8 sparse_trie::StateRootComputeOutcome,
9 },
10 sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
11 StateProviderBuilder, TreeConfig,
12};
13use alloy_eip7928::BlockAccessList;
14use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
15use alloy_evm::block::StateChangeSource;
16use alloy_primitives::B256;
17use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
18use metrics::{Counter, Histogram};
19use multiproof::{SparseTrieUpdate, *};
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use rayon::prelude::*;
23use reth_evm::{
24 block::ExecutableTxParts,
25 execute::{ExecutableTxFor, WithTxEnv},
26 ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
27 SpecFor, TxEnvFor,
28};
29use reth_metrics::Metrics;
30use reth_primitives_traits::NodePrimitives;
31use reth_provider::{
32 BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProvider,
33 StateProviderFactory, StateReader,
34};
35use reth_revm::{db::BundleState, state::EvmState};
36use reth_tasks::Runtime;
37use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
38use reth_trie_parallel::{
39 proof_task::{ProofTaskCtx, ProofWorkerHandle},
40 root::ParallelStateRootError,
41};
42use reth_trie_sparse::{
43 ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
44};
45use std::{
46 collections::BTreeMap,
47 ops::Not,
48 sync::{
49 atomic::AtomicBool,
50 mpsc::{self, channel},
51 Arc,
52 },
53 time::Instant,
54};
55use tracing::{debug, debug_span, instrument, warn, Span};
56
57pub mod bal;
58pub mod multiproof;
59mod preserved_sparse_trie;
60pub mod prewarm;
61pub mod receipt_root_task;
62pub mod sparse_trie;
63
64use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
65
66pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
72 ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
73
74pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
83
84pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
96
97pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
100type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
102 WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
103 <I as ExecutableTxTuple>::Error,
104 <N as NodePrimitives>::Receipt,
105>;
106
107#[derive(Debug)]
109pub struct PayloadProcessor<Evm>
110where
111 Evm: ConfigureEvm,
112{
113 executor: Runtime,
115 execution_cache: PayloadExecutionCache,
117 trie_metrics: MultiProofTaskMetrics,
119 cross_block_cache_size: usize,
121 disable_transaction_prewarming: bool,
123 disable_state_cache: bool,
125 evm_config: Evm,
127 precompile_cache_disabled: bool,
129 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
131 sparse_state_trie: SharedPreservedSparseTrie,
135 prewarm_max_concurrency: usize,
137 sparse_trie_prune_depth: usize,
139 sparse_trie_max_storage_tries: usize,
141 disable_sparse_trie_cache_pruning: bool,
143 disable_cache_metrics: bool,
145}
146
147impl<N, Evm> PayloadProcessor<Evm>
148where
149 N: NodePrimitives,
150 Evm: ConfigureEvm<Primitives = N>,
151{
152 pub const fn executor(&self) -> &Runtime {
154 &self.executor
155 }
156
157 pub fn new(
159 executor: Runtime,
160 evm_config: Evm,
161 config: &TreeConfig,
162 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
163 ) -> Self {
164 Self {
165 executor,
166 execution_cache: Default::default(),
167 trie_metrics: Default::default(),
168 cross_block_cache_size: config.cross_block_cache_size(),
169 disable_transaction_prewarming: config.disable_prewarming(),
170 evm_config,
171 disable_state_cache: config.disable_state_cache(),
172 precompile_cache_disabled: config.precompile_cache_disabled(),
173 precompile_cache_map,
174 sparse_state_trie: SharedPreservedSparseTrie::default(),
175 prewarm_max_concurrency: config.prewarm_max_concurrency(),
176 sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
177 sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
178 disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
179 disable_cache_metrics: config.disable_cache_metrics(),
180 }
181 }
182}
183
184impl<N, Evm> PayloadProcessor<Evm>
185where
186 N: NodePrimitives,
187 Evm: ConfigureEvm<Primitives = N> + 'static,
188{
189 #[instrument(
222 level = "debug",
223 target = "engine::tree::payload_processor",
224 name = "payload processor",
225 skip_all
226 )]
227 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
228 &mut self,
229 env: ExecutionEnv<Evm>,
230 transactions: I,
231 provider_builder: StateProviderBuilder<N, P>,
232 multiproof_provider_factory: F,
233 config: &TreeConfig,
234 bal: Option<Arc<BlockAccessList>>,
235 ) -> IteratorPayloadHandle<Evm, I, N>
236 where
237 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
238 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
239 + Clone
240 + Send
241 + 'static,
242 {
243 let (prewarm_rx, execution_rx) =
245 self.spawn_tx_iterator(transactions, env.transaction_count);
246
247 let span = Span::current();
248 let (to_sparse_trie, sparse_trie_rx) = channel();
249 let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
250
251 let v2_proofs_enabled = !config.disable_proof_v2();
253
254 let parent_state_root = env.parent_state_root;
256
257 let prewarm_handle = if let Some(bal) = bal {
259 debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
261
262 self.spawn_caching_with(
265 env,
266 prewarm_rx,
267 provider_builder.clone(),
268 Some(to_multi_proof.clone()),
269 Some(bal),
270 v2_proofs_enabled,
271 )
272 } else {
273 self.spawn_caching_with(
275 env,
276 prewarm_rx,
277 provider_builder.clone(),
278 Some(to_multi_proof.clone()),
279 None,
280 v2_proofs_enabled,
281 )
282 };
283
284 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
286 let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
287
288 if config.disable_trie_cache() {
289 let multi_proof_task = MultiProofTask::new(
290 proof_handle.clone(),
291 to_sparse_trie,
292 config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
293 to_multi_proof.clone(),
294 from_multi_proof.clone(),
295 )
296 .with_v2_proofs_enabled(v2_proofs_enabled);
297
298 let parent_span = span.clone();
300 let saved_cache = prewarm_handle.saved_cache.clone();
301 self.executor.spawn_blocking(move || {
302 let _enter = parent_span.entered();
303 let provider = provider_builder.build().expect("failed to build provider");
305 let provider = if let Some(saved_cache) = saved_cache {
306 let (cache, metrics, _disable_metrics) = saved_cache.split();
307 Box::new(CachedStateProvider::new(provider, cache, metrics))
308 as Box<dyn StateProvider>
309 } else {
310 Box::new(provider)
311 };
312 multi_proof_task.run(provider);
313 });
314 }
315
316 let (state_root_tx, state_root_rx) = channel();
318
319 self.spawn_sparse_trie_task(
321 sparse_trie_rx,
322 proof_handle,
323 state_root_tx,
324 from_multi_proof,
325 config,
326 parent_state_root,
327 );
328
329 PayloadHandle {
330 to_multi_proof: Some(to_multi_proof),
331 prewarm_handle,
332 state_root: Some(state_root_rx),
333 transactions: execution_rx,
334 _span: span,
335 }
336 }
337
338 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
342 pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
343 &self,
344 env: ExecutionEnv<Evm>,
345 transactions: I,
346 provider_builder: StateProviderBuilder<N, P>,
347 bal: Option<Arc<BlockAccessList>>,
348 ) -> IteratorPayloadHandle<Evm, I, N>
349 where
350 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
351 {
352 let (prewarm_rx, execution_rx) =
353 self.spawn_tx_iterator(transactions, env.transaction_count);
354 let prewarm_handle =
356 self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
357 PayloadHandle {
358 to_multi_proof: None,
359 prewarm_handle,
360 state_root: None,
361 transactions: execution_rx,
362 _span: Span::current(),
363 }
364 }
365
366 const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
373
374 #[expect(clippy::type_complexity)]
379 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
380 &self,
381 transactions: I,
382 transaction_count: usize,
383 ) -> (
384 mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
385 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
386 ) {
387 let (ooo_tx, ooo_rx) = mpsc::channel();
388 let (prewarm_tx, prewarm_rx) = mpsc::channel();
389 let (execute_tx, execute_rx) = mpsc::channel();
390
391 if transaction_count == 0 {
392 } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
394 debug!(
397 target: "engine::tree::payload_processor",
398 transaction_count,
399 "using sequential sig recovery for small block"
400 );
401 self.executor.spawn_blocking(move || {
402 let (transactions, convert) = transactions.into_parts();
403 for (idx, tx) in transactions.into_iter().enumerate() {
404 let tx = convert.convert(tx);
405 let tx = tx.map(|tx| {
406 let (tx_env, tx) = tx.into_parts();
407 WithTxEnv { tx_env, tx: Arc::new(tx) }
408 });
409 if let Ok(tx) = &tx {
410 let _ = prewarm_tx.send(tx.clone());
411 }
412 let _ = ooo_tx.send((idx, tx));
413 }
414 });
415 } else {
416 rayon::spawn(move || {
418 let (transactions, convert) = transactions.into_parts();
419 transactions.into_par_iter().enumerate().for_each_with(
420 ooo_tx,
421 |ooo_tx, (idx, tx)| {
422 let tx = convert.convert(tx);
423 let tx = tx.map(|tx| {
424 let (tx_env, tx) = tx.into_parts();
425 WithTxEnv { tx_env, tx: Arc::new(tx) }
426 });
427 if let Ok(tx) = &tx {
429 let _ = prewarm_tx.send(tx.clone());
430 }
431 let _ = ooo_tx.send((idx, tx));
432 },
433 );
434 });
435 }
436
437 self.executor.spawn_blocking(move || {
440 let mut next_for_execution = 0;
441 let mut queue = BTreeMap::new();
442 while let Ok((idx, tx)) = ooo_rx.recv() {
443 if next_for_execution == idx {
444 let _ = execute_tx.send(tx);
445 next_for_execution += 1;
446
447 while let Some(entry) = queue.first_entry() &&
448 *entry.key() == next_for_execution
449 {
450 let _ = execute_tx.send(entry.remove());
451 next_for_execution += 1;
452 }
453 } else {
454 queue.insert(idx, tx);
455 }
456 }
457 });
458
459 (prewarm_rx, execute_rx)
460 }
461
462 fn spawn_caching_with<P>(
464 &self,
465 env: ExecutionEnv<Evm>,
466 transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
467 provider_builder: StateProviderBuilder<N, P>,
468 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
469 bal: Option<Arc<BlockAccessList>>,
470 v2_proofs_enabled: bool,
471 ) -> CacheTaskHandle<N::Receipt>
472 where
473 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
474 {
475 let skip_prewarm =
476 self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
477
478 let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
479
480 let prewarm_ctx = PrewarmContext {
482 env,
483 evm_config: self.evm_config.clone(),
484 saved_cache: saved_cache.clone(),
485 provider: provider_builder,
486 metrics: PrewarmMetrics::default(),
487 terminate_execution: Arc::new(AtomicBool::new(false)),
488 precompile_cache_disabled: self.precompile_cache_disabled,
489 precompile_cache_map: self.precompile_cache_map.clone(),
490 v2_proofs_enabled,
491 };
492
493 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
494 self.executor.clone(),
495 self.execution_cache.clone(),
496 prewarm_ctx,
497 to_multi_proof,
498 self.prewarm_max_concurrency,
499 );
500
501 {
503 let to_prewarm_task = to_prewarm_task.clone();
504 self.executor.spawn_blocking(move || {
505 let mode = if skip_prewarm {
506 PrewarmMode::Skipped
507 } else if let Some(bal) = bal {
508 PrewarmMode::BlockAccessList(bal)
509 } else {
510 PrewarmMode::Transactions(transactions)
511 };
512 prewarm_task.run(mode, to_prewarm_task);
513 });
514 }
515
516 CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task) }
517 }
518
519 #[instrument(level = "debug", target = "engine::caching", skip(self))]
524 fn cache_for(&self, parent_hash: B256) -> SavedCache {
525 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
526 debug!("reusing execution cache");
527 cache
528 } else {
529 debug!("creating new execution cache on cache miss");
530 let start = Instant::now();
531 let cache = ExecutionCache::new(self.cross_block_cache_size);
532 let metrics = CachedStateMetrics::zeroed();
533 metrics.record_cache_creation(start.elapsed());
534 SavedCache::new(parent_hash, cache, metrics)
535 .with_disable_cache_metrics(self.disable_cache_metrics)
536 }
537 }
538
539 fn spawn_sparse_trie_task(
543 &self,
544 sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
545 proof_worker_handle: ProofWorkerHandle,
546 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
547 from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
548 config: &TreeConfig,
549 parent_state_root: B256,
550 ) {
551 let preserved_sparse_trie = self.sparse_state_trie.clone();
552 let trie_metrics = self.trie_metrics.clone();
553 let disable_trie_cache = config.disable_trie_cache();
554 let prune_depth = self.sparse_trie_prune_depth;
555 let max_storage_tries = self.sparse_trie_max_storage_tries;
556 let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
557 let chunk_size =
558 config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
559 let executor = self.executor.clone();
560
561 let parent_span = Span::current();
562 self.executor.spawn_blocking(move || {
563 let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
564 .entered();
565
566 let start = Instant::now();
571 let preserved = preserved_sparse_trie.take();
572 trie_metrics
573 .sparse_trie_cache_wait_duration_histogram
574 .record(start.elapsed().as_secs_f64());
575
576 let sparse_state_trie = preserved
577 .map(|preserved| preserved.into_trie_for(parent_state_root))
578 .unwrap_or_else(|| {
579 debug!(
580 target: "engine::tree::payload_processor",
581 "Creating new sparse trie - no preserved trie available"
582 );
583 let default_trie = RevealableSparseTrie::blind_from(
584 ParallelSparseTrie::default().with_parallelism_thresholds(
585 PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
586 ),
587 );
588 SparseStateTrie::new()
589 .with_accounts_trie(default_trie.clone())
590 .with_default_storage_trie(default_trie)
591 .with_updates(true)
592 });
593
594 let mut task = if disable_trie_cache {
595 SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
596 sparse_trie_rx,
597 proof_worker_handle,
598 trie_metrics.clone(),
599 sparse_state_trie,
600 ))
601 } else {
602 SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie(
603 &executor,
604 from_multi_proof,
605 proof_worker_handle,
606 trie_metrics.clone(),
607 sparse_state_trie.with_skip_proof_node_filtering(true),
608 chunk_size,
609 ))
610 };
611
612 let result = task.run();
613 let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
615
616 let mut guard = preserved_sparse_trie.lock();
622
623 if state_root_tx.send(result).is_err() {
625 debug!(
628 target: "engine::tree::payload_processor",
629 "State root receiver dropped, clearing trie"
630 );
631 let (trie, deferred) = task.into_cleared_trie(
632 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
633 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
634 );
635 guard.store(PreservedSparseTrie::cleared(trie));
636 drop(guard);
638 drop(deferred);
639 return;
640 }
641
642 let _enter =
645 debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
646 let deferred = if let Some(state_root) = computed_state_root {
647 let start = std::time::Instant::now();
648 let (trie, deferred) = task.into_trie_for_reuse(
649 prune_depth,
650 max_storage_tries,
651 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
652 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
653 disable_cache_pruning,
654 );
655 trie_metrics
656 .into_trie_for_reuse_duration_histogram
657 .record(start.elapsed().as_secs_f64());
658 guard.store(PreservedSparseTrie::anchored(trie, state_root));
659 deferred
660 } else {
661 debug!(
662 target: "engine::tree::payload_processor",
663 "State root computation failed, clearing trie"
664 );
665 let (trie, deferred) = task.into_cleared_trie(
666 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
667 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
668 );
669 guard.store(PreservedSparseTrie::cleared(trie));
670 deferred
671 };
672 drop(guard);
674 drop(deferred);
675 });
676 }
677
678 pub fn on_inserted_executed_block(
686 &self,
687 block_with_parent: BlockWithParent,
688 bundle_state: &BundleState,
689 ) {
690 let disable_cache_metrics = self.disable_cache_metrics;
691 self.execution_cache.update_with_guard(|cached| {
692 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
693 debug!(
694 target: "engine::caching",
695 parent_hash = %block_with_parent.parent,
696 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
697 );
698 return
699 }
700
701 let (caches, cache_metrics, _) = match cached.take() {
703 Some(existing) => existing.split(),
704 None => (
705 ExecutionCache::new(self.cross_block_cache_size),
706 CachedStateMetrics::zeroed(),
707 false,
708 ),
709 };
710
711 let new_cache =
713 SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
714 .with_disable_cache_metrics(disable_cache_metrics);
715 if new_cache.cache().insert_state(bundle_state).is_err() {
716 *cached = None;
717 debug!(target: "engine::caching", "cleared execution cache on update error");
718 return
719 }
720 new_cache.update_metrics();
721
722 *cached = Some(new_cache);
724 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
725 });
726 }
727}
728
729#[derive(Debug)]
734pub struct PayloadHandle<Tx, Err, R> {
735 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
737 prewarm_handle: CacheTaskHandle<R>,
739 transactions: mpsc::Receiver<Result<Tx, Err>>,
741 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
743 _span: Span,
745}
746
747impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
748 #[instrument(
754 level = "debug",
755 target = "engine::tree::payload_processor",
756 name = "await_state_root",
757 skip_all
758 )]
759 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
760 self.state_root
761 .take()
762 .expect("state_root is None")
763 .recv()
764 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
765 }
766
767 pub const fn take_state_root_rx(
774 &mut self,
775 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
776 self.state_root.take().expect("state_root is None")
777 }
778
779 pub fn state_hook(&self) -> impl OnStateHook {
783 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
785
786 move |source: StateChangeSource, state: &EvmState| {
787 if let Some(sender) = &to_multi_proof {
788 let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
789 }
790 }
791 }
792
793 pub fn caches(&self) -> Option<ExecutionCache> {
795 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
796 }
797
798 pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
800 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
801 }
802
803 pub fn stop_prewarming_execution(&self) {
807 self.prewarm_handle.stop_prewarming_execution()
808 }
809
810 pub fn terminate_caching(
818 &mut self,
819 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
820 ) -> Option<mpsc::Sender<()>> {
821 self.prewarm_handle.terminate_caching(execution_outcome)
822 }
823
824 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
826 core::iter::repeat_with(|| self.transactions.recv())
827 .take_while(|res| res.is_ok())
828 .map(|res| res.unwrap())
829 }
830}
831
832#[derive(Debug)]
837pub struct CacheTaskHandle<R> {
838 saved_cache: Option<SavedCache>,
840 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
842}
843
844impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
845 pub fn stop_prewarming_execution(&self) {
849 self.to_prewarm_task
850 .as_ref()
851 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
852 }
853
854 #[must_use = "sender must be used and notified on block validation success"]
859 pub fn terminate_caching(
860 &mut self,
861 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
862 ) -> Option<mpsc::Sender<()>> {
863 if let Some(tx) = self.to_prewarm_task.take() {
864 let (valid_block_tx, valid_block_rx) = mpsc::channel();
865 let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
866 let _ = tx.send(event);
867
868 Some(valid_block_tx)
869 } else {
870 None
871 }
872 }
873}
874
875impl<R> Drop for CacheTaskHandle<R> {
876 fn drop(&mut self) {
877 if let Some(tx) = self.to_prewarm_task.take() {
879 let _ = tx.send(PrewarmTaskEvent::Terminate {
880 execution_outcome: None,
881 valid_block_rx: mpsc::channel().1,
882 });
883 }
884 }
885}
886
887#[derive(Clone, Debug, Default)]
913pub struct PayloadExecutionCache {
914 inner: Arc<RwLock<Option<SavedCache>>>,
916 metrics: ExecutionCacheMetrics,
918}
919
920impl PayloadExecutionCache {
921 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
927 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
928 let start = Instant::now();
929 let cache = self.inner.read();
930
931 let elapsed = start.elapsed();
932 self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
933 if elapsed.as_millis() > 5 {
934 warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
935 }
936
937 if let Some(c) = cache.as_ref() {
938 let cached_hash = c.executed_block_hash();
939 let hash_matches = cached_hash == parent_hash;
942 let available = c.is_available();
945 let usage_count = c.usage_count();
946
947 debug!(
948 target: "engine::caching",
949 %cached_hash,
950 %parent_hash,
951 hash_matches,
952 available,
953 usage_count,
954 "Existing cache found"
955 );
956
957 if available {
958 if !hash_matches {
962 c.clear();
963 }
964 return Some(c.clone())
965 } else if hash_matches {
966 self.metrics.execution_cache_in_use.increment(1);
967 }
968 } else {
969 debug!(target: "engine::caching", %parent_hash, "No cache found");
970 }
971
972 None
973 }
974
975 #[expect(unused)]
977 pub(crate) fn clear(&self) {
978 self.inner.write().take();
979 }
980
981 pub fn update_with_guard<F>(&self, update_fn: F)
995 where
996 F: FnOnce(&mut Option<SavedCache>),
997 {
998 let mut guard = self.inner.write();
999 update_fn(&mut guard);
1000 }
1001}
1002
1003#[derive(Metrics, Clone)]
1005#[metrics(scope = "consensus.engine.beacon")]
1006pub(crate) struct ExecutionCacheMetrics {
1007 pub(crate) execution_cache_in_use: Counter,
1010 pub(crate) execution_cache_wait_duration: Histogram,
1012}
1013
1014#[derive(Debug, Clone)]
1016pub struct ExecutionEnv<Evm: ConfigureEvm> {
1017 pub evm_env: EvmEnvFor<Evm>,
1019 pub hash: B256,
1021 pub parent_hash: B256,
1023 pub parent_state_root: B256,
1027 pub transaction_count: usize,
1031 pub withdrawals: Option<Vec<Withdrawal>>,
1034}
1035
1036impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
1037where
1038 EvmEnvFor<Evm>: Default,
1039{
1040 fn default() -> Self {
1041 Self {
1042 evm_env: Default::default(),
1043 hash: Default::default(),
1044 parent_hash: Default::default(),
1045 parent_state_root: Default::default(),
1046 transaction_count: 0,
1047 withdrawals: None,
1048 }
1049 }
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use super::PayloadExecutionCache;
1055 use crate::tree::{
1056 cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
1057 payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
1058 precompile_cache::PrecompileCacheMap,
1059 StateProviderBuilder, TreeConfig,
1060 };
1061 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1062 use alloy_evm::block::StateChangeSource;
1063 use rand::Rng;
1064 use reth_chainspec::ChainSpec;
1065 use reth_db_common::init::init_genesis;
1066 use reth_ethereum_primitives::TransactionSigned;
1067 use reth_evm::OnStateHook;
1068 use reth_evm_ethereum::EthEvmConfig;
1069 use reth_primitives_traits::{Account, Recovered, StorageEntry};
1070 use reth_provider::{
1071 providers::{BlockchainProvider, OverlayStateProviderFactory},
1072 test_utils::create_test_provider_factory_with_chain_spec,
1073 ChainSpecProvider, HashingWriter,
1074 };
1075 use reth_revm::db::BundleState;
1076 use reth_testing_utils::generators;
1077 use reth_trie::{test_utils::state_root, HashedPostState};
1078 use reth_trie_db::ChangesetCache;
1079 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1080 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1081 use std::sync::Arc;
1082
1083 fn make_saved_cache(hash: B256) -> SavedCache {
1084 let execution_cache = ExecutionCache::new(1_000);
1085 SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
1086 }
1087
1088 #[test]
1089 fn execution_cache_allows_single_checkout() {
1090 let execution_cache = PayloadExecutionCache::default();
1091 let hash = B256::from([1u8; 32]);
1092
1093 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1094
1095 let first = execution_cache.get_cache_for(hash);
1096 assert!(first.is_some(), "expected initial checkout to succeed");
1097
1098 let second = execution_cache.get_cache_for(hash);
1099 assert!(second.is_none(), "second checkout should be blocked while guard is active");
1100
1101 drop(first);
1102
1103 let third = execution_cache.get_cache_for(hash);
1104 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1105 }
1106
1107 #[test]
1108 fn execution_cache_checkout_releases_on_drop() {
1109 let execution_cache = PayloadExecutionCache::default();
1110 let hash = B256::from([2u8; 32]);
1111
1112 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1113
1114 {
1115 let guard = execution_cache.get_cache_for(hash);
1116 assert!(guard.is_some(), "expected checkout to succeed");
1117 }
1119
1120 let retry = execution_cache.get_cache_for(hash);
1121 assert!(retry.is_some(), "checkout should succeed after guard drop");
1122 }
1123
1124 #[test]
1125 fn execution_cache_mismatch_parent_clears_and_returns() {
1126 let execution_cache = PayloadExecutionCache::default();
1127 let hash = B256::from([3u8; 32]);
1128
1129 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1130
1131 let different_hash = B256::from([4u8; 32]);
1133 let cache = execution_cache.get_cache_for(different_hash);
1134 assert!(cache.is_some(), "cache should be returned for reuse after clearing")
1135 }
1136
1137 #[test]
1138 fn execution_cache_update_after_release_succeeds() {
1139 let execution_cache = PayloadExecutionCache::default();
1140 let initial = B256::from([5u8; 32]);
1141
1142 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1143
1144 let guard =
1145 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1146
1147 drop(guard);
1148
1149 let updated = B256::from([6u8; 32]);
1150 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1151
1152 let new_checkout = execution_cache.get_cache_for(updated);
1153 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1154 }
1155
1156 #[test]
1157 fn on_inserted_executed_block_populates_cache() {
1158 let payload_processor = PayloadProcessor::new(
1159 reth_tasks::Runtime::test(),
1160 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1161 &TreeConfig::default(),
1162 PrecompileCacheMap::default(),
1163 );
1164
1165 let parent_hash = B256::from([1u8; 32]);
1166 let block_hash = B256::from([10u8; 32]);
1167 let block_with_parent = BlockWithParent {
1168 block: BlockNumHash { hash: block_hash, number: 1 },
1169 parent: parent_hash,
1170 };
1171 let bundle_state = BundleState::default();
1172
1173 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1175
1176 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1178
1179 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1181 assert!(cached.is_some());
1182 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1183 }
1184
1185 #[test]
1186 fn on_inserted_executed_block_skips_on_parent_mismatch() {
1187 let payload_processor = PayloadProcessor::new(
1188 reth_tasks::Runtime::test(),
1189 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1190 &TreeConfig::default(),
1191 PrecompileCacheMap::default(),
1192 );
1193
1194 let block1_hash = B256::from([1u8; 32]);
1196 payload_processor
1197 .execution_cache
1198 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1199
1200 let wrong_parent = B256::from([99u8; 32]);
1202 let block3_hash = B256::from([3u8; 32]);
1203 let block_with_parent = BlockWithParent {
1204 block: BlockNumHash { hash: block3_hash, number: 3 },
1205 parent: wrong_parent,
1206 };
1207 let bundle_state = BundleState::default();
1208
1209 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1210
1211 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1213 assert!(cached.is_some(), "Original cache should be preserved");
1214
1215 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1217 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1218 }
1219
1220 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1221 let mut rng = generators::rng();
1222 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1223 let mut updates = Vec::with_capacity(updates_per_account);
1224
1225 for _ in 0..updates_per_account {
1226 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1227 let mut state_update = EvmState::default();
1228
1229 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1230
1231 for &address in selected_addresses {
1232 let mut storage = HashMap::default();
1233 if rng.random_bool(0.7) {
1234 for _ in 0..rng.random_range(1..10) {
1235 let slot = U256::from(rng.random::<u64>());
1236 storage.insert(
1237 slot,
1238 EvmStorageSlot::new_changed(
1239 U256::ZERO,
1240 U256::from(rng.random::<u64>()),
1241 0,
1242 ),
1243 );
1244 }
1245 }
1246
1247 let account = revm_state::Account {
1248 info: AccountInfo {
1249 balance: U256::from(rng.random::<u64>()),
1250 nonce: rng.random::<u64>(),
1251 code_hash: KECCAK_EMPTY,
1252 code: Some(Default::default()),
1253 account_id: None,
1254 },
1255 original_info: Box::new(AccountInfo::default()),
1256 storage,
1257 status: AccountStatus::Touched,
1258 transaction_id: 0,
1259 };
1260
1261 state_update.insert(address, account);
1262 }
1263
1264 updates.push(state_update);
1265 }
1266
1267 updates
1268 }
1269
1270 #[test]
1271 fn test_state_root() {
1272 reth_tracing::init_test_tracing();
1273
1274 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1275 let genesis_hash = init_genesis(&factory).unwrap();
1276
1277 let state_updates = create_mock_state_updates(10, 10);
1278 let mut hashed_state = HashedPostState::default();
1279 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1280 HashMap::default();
1281
1282 {
1283 let provider_rw = factory.provider_rw().expect("failed to get provider");
1284
1285 for update in &state_updates {
1286 let account_updates = update.iter().map(|(address, account)| {
1287 (*address, Some(Account::from_revm_account(account)))
1288 });
1289 provider_rw
1290 .insert_account_for_hashing(account_updates)
1291 .expect("failed to insert accounts");
1292
1293 let storage_updates = update.iter().map(|(address, account)| {
1294 let storage_entries = account.storage.iter().map(|(slot, value)| {
1295 StorageEntry { key: B256::from(*slot), value: value.present_value }
1296 });
1297 (*address, storage_entries)
1298 });
1299 provider_rw
1300 .insert_storage_for_hashing(storage_updates)
1301 .expect("failed to insert storage");
1302 }
1303 provider_rw.commit().expect("failed to commit changes");
1304 }
1305
1306 for update in &state_updates {
1307 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1308
1309 for (address, account) in update {
1310 let storage: HashMap<B256, U256> = account
1311 .storage
1312 .iter()
1313 .map(|(k, v)| (B256::from(*k), v.present_value))
1314 .collect();
1315
1316 let entry = accumulated_state.entry(*address).or_default();
1317 entry.0 = Account::from_revm_account(account);
1318 entry.1.extend(storage);
1319 }
1320 }
1321
1322 let mut payload_processor = PayloadProcessor::new(
1323 reth_tasks::Runtime::test(),
1324 EthEvmConfig::new(factory.chain_spec()),
1325 &TreeConfig::default(),
1326 PrecompileCacheMap::default(),
1327 );
1328
1329 let provider_factory = BlockchainProvider::new(factory).unwrap();
1330
1331 let mut handle = payload_processor.spawn(
1332 Default::default(),
1333 (
1334 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1335 std::convert::identity,
1336 ),
1337 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1338 OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1339 &TreeConfig::default(),
1340 None, );
1342
1343 let mut state_hook = handle.state_hook();
1344
1345 for (i, update) in state_updates.into_iter().enumerate() {
1346 state_hook.on_state(StateChangeSource::Transaction(i), &update);
1347 }
1348 drop(state_hook);
1349
1350 let root_from_task = handle.state_root().expect("task failed").state_root;
1351 let root_from_regular = state_root(accumulated_state);
1352
1353 assert_eq!(
1354 root_from_task, root_from_regular,
1355 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1356 );
1357 }
1358}