1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
6 payload_processor::{
7 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
8 sparse_trie::StateRootComputeOutcome,
9 },
10 sparse_trie::SparseTrieCacheTask,
11 CacheWaitDurations, StateProviderBuilder, TreeConfig, WaitForCaches,
12};
13use alloy_eip7928::BlockAccessList;
14use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
15use alloy_evm::block::StateChangeSource;
16use alloy_primitives::B256;
17use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
18use metrics::{Counter, Histogram};
19use multiproof::*;
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use rayon::prelude::*;
23use reth_evm::{
24 block::ExecutableTxParts,
25 execute::{ExecutableTxFor, WithTxEnv},
26 ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
27 SpecFor, TxEnvFor,
28};
29use reth_metrics::Metrics;
30use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
31use reth_provider::{
32 BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
33};
34use reth_revm::{db::BundleState, state::EvmState};
35use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
36use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
37use reth_trie_parallel::{
38 proof_task::{ProofTaskCtx, ProofWorkerHandle},
39 root::ParallelStateRootError,
40};
41use reth_trie_sparse::{
42 ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
43};
44use std::{
45 ops::Not,
46 sync::{
47 atomic::{AtomicBool, AtomicUsize},
48 mpsc::{self, channel},
49 Arc,
50 },
51 time::Duration,
52};
53use tracing::{debug, debug_span, instrument, warn, Span};
54
55pub mod bal;
56pub mod multiproof;
57mod preserved_sparse_trie;
58pub mod prewarm;
59pub mod receipt_root_task;
60pub mod sparse_trie;
61
62use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
63
64pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
70 ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
71
72pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
81
82pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
94
95pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
98
99type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
101 WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
102 <I as ExecutableTxTuple>::Error,
103 <N as NodePrimitives>::Receipt,
104>;
105
106#[derive(Debug)]
108pub struct PayloadProcessor<Evm>
109where
110 Evm: ConfigureEvm,
111{
112 executor: Runtime,
114 execution_cache: PayloadExecutionCache,
116 trie_metrics: MultiProofTaskMetrics,
118 cross_block_cache_size: usize,
120 disable_transaction_prewarming: bool,
122 disable_state_cache: bool,
124 evm_config: Evm,
126 precompile_cache_disabled: bool,
128 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
130 sparse_state_trie: SharedPreservedSparseTrie,
134 sparse_trie_prune_depth: usize,
136 sparse_trie_max_storage_tries: usize,
138 disable_sparse_trie_cache_pruning: bool,
140 disable_cache_metrics: bool,
142}
143
144impl<N, Evm> PayloadProcessor<Evm>
145where
146 N: NodePrimitives,
147 Evm: ConfigureEvm<Primitives = N>,
148{
149 pub const fn executor(&self) -> &Runtime {
151 &self.executor
152 }
153
154 pub fn new(
156 executor: Runtime,
157 evm_config: Evm,
158 config: &TreeConfig,
159 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
160 ) -> Self {
161 Self {
162 executor,
163 execution_cache: Default::default(),
164 trie_metrics: Default::default(),
165 cross_block_cache_size: config.cross_block_cache_size(),
166 disable_transaction_prewarming: config.disable_prewarming(),
167 evm_config,
168 disable_state_cache: config.disable_state_cache(),
169 precompile_cache_disabled: config.precompile_cache_disabled(),
170 precompile_cache_map,
171 sparse_state_trie: SharedPreservedSparseTrie::default(),
172 sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
173 sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
174 disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
175 disable_cache_metrics: config.disable_cache_metrics(),
176 }
177 }
178}
179
180impl<Evm> WaitForCaches for PayloadProcessor<Evm>
181where
182 Evm: ConfigureEvm,
183{
184 fn wait_for_caches(&self) -> CacheWaitDurations {
185 debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
186
187 let execution_cache = self.execution_cache.clone();
189 let sparse_trie = self.sparse_state_trie.clone();
190
191 let (execution_tx, execution_rx) = std::sync::mpsc::channel();
193 let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
194
195 self.executor.spawn_blocking_named("wait-exec-cache", move || {
196 let _ = execution_tx.send(execution_cache.wait_for_availability());
197 });
198 self.executor.spawn_blocking_named("wait-sparse-tri", move || {
199 let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
200 });
201
202 let execution_cache_duration =
203 execution_rx.recv().expect("execution cache wait task failed to send result");
204 let sparse_trie_duration =
205 sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
206
207 debug!(
208 target: "engine::tree::payload_processor",
209 ?execution_cache_duration,
210 ?sparse_trie_duration,
211 "Execution cache and sparse trie locks acquired"
212 );
213 CacheWaitDurations {
214 execution_cache: execution_cache_duration,
215 sparse_trie: sparse_trie_duration,
216 }
217 }
218}
219
220impl<N, Evm> PayloadProcessor<Evm>
221where
222 N: NodePrimitives,
223 Evm: ConfigureEvm<Primitives = N> + 'static,
224{
225 #[instrument(
258 level = "debug",
259 target = "engine::tree::payload_processor",
260 name = "payload processor",
261 skip_all
262 )]
263 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
264 &mut self,
265 env: ExecutionEnv<Evm>,
266 transactions: I,
267 provider_builder: StateProviderBuilder<N, P>,
268 multiproof_provider_factory: F,
269 config: &TreeConfig,
270 bal: Option<Arc<BlockAccessList>>,
271 ) -> IteratorPayloadHandle<Evm, I, N>
272 where
273 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
274 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
275 + Clone
276 + Send
277 + Sync
278 + 'static,
279 {
280 let (prewarm_rx, execution_rx) =
282 self.spawn_tx_iterator(transactions, env.transaction_count);
283
284 let span = Span::current();
285
286 let state_root_handle = self.spawn_state_root(multiproof_provider_factory, &env, config);
287 let prewarm_handle = self.spawn_caching_with(
288 env,
289 prewarm_rx,
290 provider_builder,
291 Some(state_root_handle.to_multi_proof.clone()),
292 bal,
293 );
294
295 PayloadHandle {
296 state_root_handle: Some(state_root_handle),
297 prewarm_handle,
298 transactions: execution_rx,
299 _span: span,
300 }
301 }
302
303 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
307 pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
308 &self,
309 env: ExecutionEnv<Evm>,
310 transactions: I,
311 provider_builder: StateProviderBuilder<N, P>,
312 bal: Option<Arc<BlockAccessList>>,
313 ) -> IteratorPayloadHandle<Evm, I, N>
314 where
315 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
316 {
317 let (prewarm_rx, execution_rx) =
318 self.spawn_tx_iterator(transactions, env.transaction_count);
319 let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
320 PayloadHandle {
321 state_root_handle: None,
322 prewarm_handle,
323 transactions: execution_rx,
324 _span: Span::current(),
325 }
326 }
327
328 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
338 pub fn spawn_state_root<F>(
339 &mut self,
340 multiproof_provider_factory: F,
341 env: &ExecutionEnv<Evm>,
342 config: &TreeConfig,
343 ) -> StateRootHandle
344 where
345 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
346 + Clone
347 + Send
348 + Sync
349 + 'static,
350 {
351 let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
352
353 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
354 let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
355 let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
356
357 let (state_root_tx, state_root_rx) = channel();
358
359 self.spawn_sparse_trie_task(
360 proof_handle,
361 state_root_tx,
362 from_multi_proof,
363 env.parent_state_root,
364 config.multiproof_chunk_size(),
365 );
366
367 StateRootHandle::new(to_multi_proof, state_root_rx)
368 }
369
370 const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
373
374 const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
381
382 const PARALLEL_PREFETCH_COUNT: usize = 4;
390
391 #[expect(clippy::type_complexity)]
398 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
399 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
400 &self,
401 transactions: I,
402 transaction_count: usize,
403 ) -> (
404 mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
405 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
406 ) {
407 let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
408 let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
409
410 if transaction_count == 0 {
411 } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
413 debug!(
416 target: "engine::tree::payload_processor",
417 transaction_count,
418 "using sequential sig recovery for small block"
419 );
420 self.executor.spawn_blocking_named("tx-iterator", move || {
421 let (transactions, convert) = transactions.into_parts();
422 convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
423 });
424 } else {
425 let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
432 let executor = self.executor.clone();
433 self.executor.spawn_blocking_named("tx-iterator", move || {
434 let (transactions, convert) = transactions.into_parts();
435 let mut all: Vec<_> = transactions.into_iter().collect();
436 let rest = all.split_off(prefetch.min(all.len()));
437
438 convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
441
442 rest.into_par_iter()
444 .enumerate()
445 .map(|(i, tx)| {
446 let idx = i + prefetch;
447 let tx = convert.convert(tx);
448 (idx, tx)
449 })
450 .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
451 let tx = tx.map(|tx| {
452 let (tx_env, tx) = tx.into_parts();
453 let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
454 let _ = prewarm_tx.send((idx, tx.clone()));
455 tx
456 });
457 let _ = execute_tx.send(tx);
458 debug!(target: "engine::tree::payload_processor", idx, "yielded transaction");
459 });
460 });
461 }
462
463 (prewarm_rx, execute_rx)
464 }
465
466 #[instrument(
468 level = "debug",
469 target = "engine::tree::payload_processor",
470 skip_all,
471 fields(bal=%bal.is_some())
472 )]
473 fn spawn_caching_with<P>(
474 &self,
475 env: ExecutionEnv<Evm>,
476 transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
477 provider_builder: StateProviderBuilder<N, P>,
478 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
479 bal: Option<Arc<BlockAccessList>>,
480 ) -> CacheTaskHandle<N::Receipt>
481 where
482 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
483 {
484 let skip_prewarm =
485 self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
486
487 let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
488
489 let executed_tx_index = Arc::new(AtomicUsize::new(0));
490
491 let prewarm_ctx = PrewarmContext {
493 env,
494 evm_config: self.evm_config.clone(),
495 saved_cache: saved_cache.clone(),
496 provider: provider_builder,
497 metrics: PrewarmMetrics::default(),
498 terminate_execution: Arc::new(AtomicBool::new(false)),
499 executed_tx_index: Arc::clone(&executed_tx_index),
500 precompile_cache_disabled: self.precompile_cache_disabled,
501 precompile_cache_map: self.precompile_cache_map.clone(),
502 };
503
504 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
505 self.executor.clone(),
506 self.execution_cache.clone(),
507 prewarm_ctx,
508 to_multi_proof,
509 );
510
511 {
512 let to_prewarm_task = to_prewarm_task.clone();
513 self.executor.spawn_blocking_named("prewarm", move || {
514 let mode = if skip_prewarm {
515 PrewarmMode::Skipped
516 } else if let Some(bal) = bal {
517 PrewarmMode::BlockAccessList(bal)
518 } else {
519 PrewarmMode::Transactions(transactions)
520 };
521 prewarm_task.run(mode, to_prewarm_task);
522 });
523 }
524
525 CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index }
526 }
527
528 #[instrument(level = "debug", target = "engine::caching", skip(self))]
533 fn cache_for(&self, parent_hash: B256) -> SavedCache {
534 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
535 debug!("reusing execution cache");
536 cache
537 } else {
538 debug!("creating new execution cache on cache miss");
539 let start = Instant::now();
540 let cache = ExecutionCache::new(self.cross_block_cache_size);
541 let metrics = CachedStateMetrics::zeroed();
542 metrics.record_cache_creation(start.elapsed());
543 SavedCache::new(parent_hash, cache, metrics)
544 .with_disable_cache_metrics(self.disable_cache_metrics)
545 }
546 }
547
548 fn spawn_sparse_trie_task(
552 &self,
553 proof_worker_handle: ProofWorkerHandle,
554 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
555 from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
556 parent_state_root: B256,
557 chunk_size: usize,
558 ) {
559 let preserved_sparse_trie = self.sparse_state_trie.clone();
560 let trie_metrics = self.trie_metrics.clone();
561 let prune_depth = self.sparse_trie_prune_depth;
562 let max_storage_tries = self.sparse_trie_max_storage_tries;
563 let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
564 let executor = self.executor.clone();
565
566 let parent_span = Span::current();
567 self.executor.spawn_blocking_named("sparse-trie", move || {
568 reth_tasks::once!(increase_thread_priority);
569
570 let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
571 .entered();
572
573 let start = Instant::now();
578 let preserved = preserved_sparse_trie.take();
579 trie_metrics
580 .sparse_trie_cache_wait_duration_histogram
581 .record(start.elapsed().as_secs_f64());
582
583 let sparse_state_trie = preserved
584 .map(|preserved| preserved.into_trie_for(parent_state_root))
585 .unwrap_or_else(|| {
586 debug!(
587 target: "engine::tree::payload_processor",
588 "Creating new sparse trie - no preserved trie available"
589 );
590 let default_trie = RevealableSparseTrie::blind_from(
591 ParallelSparseTrie::default().with_parallelism_thresholds(
592 PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
593 ),
594 );
595 SparseStateTrie::new()
596 .with_accounts_trie(default_trie.clone())
597 .with_default_storage_trie(default_trie)
598 .with_updates(true)
599 });
600
601 let mut task = SparseTrieCacheTask::new_with_trie(
602 &executor,
603 from_multi_proof,
604 proof_worker_handle,
605 trie_metrics.clone(),
606 sparse_state_trie,
607 chunk_size,
608 );
609
610 let result = task.run();
611
612 let mut guard = preserved_sparse_trie.lock();
618
619 let task_result = result.as_ref().ok().cloned();
620 if state_root_tx.send(result).is_err() {
622 debug!(
625 target: "engine::tree::payload_processor",
626 "State root receiver dropped, clearing trie"
627 );
628 let (trie, deferred) = task.into_cleared_trie(
629 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
630 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
631 );
632 guard.store(PreservedSparseTrie::cleared(trie));
633 drop(guard);
635 drop(deferred);
636 return;
637 }
638
639 let _enter =
642 debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
643 let deferred = if let Some(result) = task_result {
644 let start = Instant::now();
645 let (trie, deferred) = task.into_trie_for_reuse(
646 prune_depth,
647 max_storage_tries,
648 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
649 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
650 disable_cache_pruning,
651 &result.trie_updates,
652 );
653 trie_metrics
654 .into_trie_for_reuse_duration_histogram
655 .record(start.elapsed().as_secs_f64());
656 trie_metrics
657 .sparse_trie_retained_memory_bytes
658 .set(trie.memory_size() as f64);
659 trie_metrics
660 .sparse_trie_retained_storage_tries
661 .set(trie.retained_storage_tries_count() as f64);
662 guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
663 deferred
664 } else {
665 debug!(
666 target: "engine::tree::payload_processor",
667 "State root computation failed, clearing trie"
668 );
669 let (trie, deferred) = task.into_cleared_trie(
670 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
671 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
672 );
673 guard.store(PreservedSparseTrie::cleared(trie));
674 deferred
675 };
676 drop(guard);
678 drop(deferred);
679 });
680 }
681
682 pub fn on_inserted_executed_block(
690 &self,
691 block_with_parent: BlockWithParent,
692 bundle_state: &BundleState,
693 ) {
694 let disable_cache_metrics = self.disable_cache_metrics;
695 self.execution_cache.update_with_guard(|cached| {
696 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
697 debug!(
698 target: "engine::caching",
699 parent_hash = %block_with_parent.parent,
700 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
701 );
702 return
703 }
704
705 let (caches, cache_metrics, _) = match cached.take() {
707 Some(existing) => existing.split(),
708 None => (
709 ExecutionCache::new(self.cross_block_cache_size),
710 CachedStateMetrics::zeroed(),
711 false,
712 ),
713 };
714
715 let new_cache =
717 SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
718 .with_disable_cache_metrics(disable_cache_metrics);
719 if new_cache.cache().insert_state(bundle_state).is_err() {
720 *cached = None;
721 debug!(target: "engine::caching", "cleared execution cache on update error");
722 return
723 }
724 new_cache.update_metrics();
725
726 *cached = Some(new_cache);
728 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
729 });
730 }
731}
732
733fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
735 iter: impl Iterator<Item = RawTx>,
736 convert: &C,
737 prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
738 execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
739) where
740 Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
741 TxEnv: Clone,
742 C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
743{
744 for (idx, raw_tx) in iter.enumerate() {
745 let tx = convert.convert(raw_tx);
746 let tx = tx.map(|tx| {
747 let (tx_env, tx) = tx.into_parts();
748 WithTxEnv { tx_env, tx: Arc::new(tx) }
749 });
750 if let Ok(tx) = &tx {
751 let _ = prewarm_tx.send((idx, tx.clone()));
752 }
753 let _ = execute_tx.send(tx);
754 debug!(target: "engine::tree::payload_processor", idx, "yielded transaction");
755 }
756}
757
758#[derive(Debug)]
766pub struct StateRootHandle {
767 to_multi_proof: CrossbeamSender<MultiProofMessage>,
769 state_root_rx: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
771}
772
773impl StateRootHandle {
774 pub const fn new(
776 to_multi_proof: CrossbeamSender<MultiProofMessage>,
777 state_root_rx: mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>,
778 ) -> Self {
779 Self { to_multi_proof, state_root_rx: Some(state_root_rx) }
780 }
781
782 pub fn state_hook(&self) -> impl OnStateHook {
786 let to_multi_proof = StateHookSender::new(self.to_multi_proof.clone());
787
788 move |source: StateChangeSource, state: &EvmState| {
789 let _ =
790 to_multi_proof.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
791 }
792 }
793
794 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
800 self.state_root_rx
801 .take()
802 .expect("state_root already taken")
803 .recv()
804 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
805 }
806
807 pub const fn take_state_root_rx(
813 &mut self,
814 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
815 self.state_root_rx.take().expect("state_root already taken")
816 }
817}
818
819#[derive(Debug)]
824pub struct PayloadHandle<Tx, Err, R> {
825 state_root_handle: Option<StateRootHandle>,
827 prewarm_handle: CacheTaskHandle<R>,
829 transactions: mpsc::Receiver<Result<Tx, Err>>,
831 _span: Span,
833}
834
835impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
836 #[instrument(
842 level = "debug",
843 target = "engine::tree::payload_processor",
844 name = "await_state_root",
845 skip_all
846 )]
847 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
848 self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
849 }
850
851 pub const fn take_state_root_rx(
858 &mut self,
859 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
860 self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
861 }
862
863 pub fn state_hook(&self) -> Option<impl OnStateHook> {
867 self.state_root_handle.as_ref().map(|handle| handle.state_hook())
868 }
869
870 pub fn caches(&self) -> Option<ExecutionCache> {
872 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
873 }
874
875 pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
877 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
878 }
879
880 pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
885 &self.prewarm_handle.executed_tx_index
886 }
887
888 pub fn stop_prewarming_execution(&self) {
892 self.prewarm_handle.stop_prewarming_execution()
893 }
894
895 pub fn terminate_caching(
903 &mut self,
904 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
905 ) -> Option<mpsc::Sender<()>> {
906 self.prewarm_handle.terminate_caching(execution_outcome)
907 }
908
909 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
911 self.transactions.iter()
912 }
913}
914
915#[derive(Debug)]
920pub struct CacheTaskHandle<R> {
921 saved_cache: Option<SavedCache>,
923 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
925 executed_tx_index: Arc<AtomicUsize>,
928}
929
930impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
931 pub fn stop_prewarming_execution(&self) {
935 self.to_prewarm_task
936 .as_ref()
937 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
938 }
939
940 #[must_use = "sender must be used and notified on block validation success"]
945 pub fn terminate_caching(
946 &mut self,
947 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
948 ) -> Option<mpsc::Sender<()>> {
949 if let Some(tx) = self.to_prewarm_task.take() {
950 let (valid_block_tx, valid_block_rx) = mpsc::channel();
951 let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
952 let _ = tx.send(event);
953
954 Some(valid_block_tx)
955 } else {
956 None
957 }
958 }
959}
960
961impl<R> Drop for CacheTaskHandle<R> {
962 fn drop(&mut self) {
963 if let Some(tx) = self.to_prewarm_task.take() {
965 let _ = tx.send(PrewarmTaskEvent::Terminate {
966 execution_outcome: None,
967 valid_block_rx: mpsc::channel().1,
968 });
969 }
970 }
971}
972
973#[derive(Clone, Debug, Default)]
999pub struct PayloadExecutionCache {
1000 inner: Arc<RwLock<Option<SavedCache>>>,
1002 metrics: ExecutionCacheMetrics,
1004}
1005
1006impl PayloadExecutionCache {
1007 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
1013 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
1014 let start = Instant::now();
1015 let cache = self.inner.read();
1016
1017 let elapsed = start.elapsed();
1018 self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
1019 if elapsed.as_millis() > 5 {
1020 warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
1021 }
1022
1023 if let Some(c) = cache.as_ref() {
1024 let cached_hash = c.executed_block_hash();
1025 let hash_matches = cached_hash == parent_hash;
1028 let available = c.is_available();
1031 let usage_count = c.usage_count();
1032
1033 debug!(
1034 target: "engine::caching",
1035 %cached_hash,
1036 %parent_hash,
1037 hash_matches,
1038 available,
1039 usage_count,
1040 "Existing cache found"
1041 );
1042
1043 if available {
1044 if !hash_matches {
1048 c.clear();
1049 }
1050 return Some(c.clone())
1051 } else if hash_matches {
1052 self.metrics.execution_cache_in_use.increment(1);
1053 }
1054 } else {
1055 debug!(target: "engine::caching", %parent_hash, "No cache found");
1056 }
1057
1058 None
1059 }
1060
1061 #[expect(unused)]
1063 pub(crate) fn clear(&self) {
1064 self.inner.write().take();
1065 }
1066
1067 pub fn wait_for_availability(&self) -> Duration {
1074 let start = Instant::now();
1075 let _guard = self.inner.write();
1077 let elapsed = start.elapsed();
1078 if elapsed.as_millis() > 5 {
1079 debug!(
1080 target: "engine::tree::payload_processor",
1081 blocked_for=?elapsed,
1082 "Waited for execution cache to become available"
1083 );
1084 }
1085 elapsed
1086 }
1087
1088 pub fn update_with_guard<F>(&self, update_fn: F)
1102 where
1103 F: FnOnce(&mut Option<SavedCache>),
1104 {
1105 let mut guard = self.inner.write();
1106 update_fn(&mut guard);
1107 }
1108}
1109
1110#[derive(Metrics, Clone)]
1112#[metrics(scope = "consensus.engine.beacon")]
1113pub(crate) struct ExecutionCacheMetrics {
1114 pub(crate) execution_cache_in_use: Counter,
1117 pub(crate) execution_cache_wait_duration: Histogram,
1119}
1120
1121#[derive(Debug, Clone)]
1123pub struct ExecutionEnv<Evm: ConfigureEvm> {
1124 pub evm_env: EvmEnvFor<Evm>,
1126 pub hash: B256,
1128 pub parent_hash: B256,
1130 pub parent_state_root: B256,
1134 pub transaction_count: usize,
1138 pub gas_used: u64,
1141 pub withdrawals: Option<Vec<Withdrawal>>,
1144}
1145
1146impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
1147where
1148 EvmEnvFor<Evm>: Default,
1149{
1150 #[cfg(any(test, feature = "test-utils"))]
1152 pub fn test_default() -> Self {
1153 Self {
1154 evm_env: Default::default(),
1155 hash: Default::default(),
1156 parent_hash: Default::default(),
1157 parent_state_root: Default::default(),
1158 transaction_count: 0,
1159 gas_used: 0,
1160 withdrawals: None,
1161 }
1162 }
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167 use super::PayloadExecutionCache;
1168 use crate::tree::{
1169 cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
1170 payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1171 precompile_cache::PrecompileCacheMap,
1172 StateProviderBuilder, TreeConfig,
1173 };
1174 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1175 use alloy_evm::block::StateChangeSource;
1176 use rand::Rng;
1177 use reth_chainspec::ChainSpec;
1178 use reth_db_common::init::init_genesis;
1179 use reth_ethereum_primitives::TransactionSigned;
1180 use reth_evm::OnStateHook;
1181 use reth_evm_ethereum::EthEvmConfig;
1182 use reth_primitives_traits::{Account, Recovered, StorageEntry};
1183 use reth_provider::{
1184 providers::{BlockchainProvider, OverlayStateProviderFactory},
1185 test_utils::create_test_provider_factory_with_chain_spec,
1186 ChainSpecProvider, HashingWriter,
1187 };
1188 use reth_revm::db::BundleState;
1189 use reth_testing_utils::generators;
1190 use reth_trie::{test_utils::state_root, HashedPostState};
1191 use reth_trie_db::ChangesetCache;
1192 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1193 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1194 use std::sync::Arc;
1195
1196 fn make_saved_cache(hash: B256) -> SavedCache {
1197 let execution_cache = ExecutionCache::new(1_000);
1198 SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
1199 }
1200
1201 #[test]
1202 fn execution_cache_allows_single_checkout() {
1203 let execution_cache = PayloadExecutionCache::default();
1204 let hash = B256::from([1u8; 32]);
1205
1206 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1207
1208 let first = execution_cache.get_cache_for(hash);
1209 assert!(first.is_some(), "expected initial checkout to succeed");
1210
1211 let second = execution_cache.get_cache_for(hash);
1212 assert!(second.is_none(), "second checkout should be blocked while guard is active");
1213
1214 drop(first);
1215
1216 let third = execution_cache.get_cache_for(hash);
1217 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1218 }
1219
1220 #[test]
1221 fn execution_cache_checkout_releases_on_drop() {
1222 let execution_cache = PayloadExecutionCache::default();
1223 let hash = B256::from([2u8; 32]);
1224
1225 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1226
1227 {
1228 let guard = execution_cache.get_cache_for(hash);
1229 assert!(guard.is_some(), "expected checkout to succeed");
1230 }
1232
1233 let retry = execution_cache.get_cache_for(hash);
1234 assert!(retry.is_some(), "checkout should succeed after guard drop");
1235 }
1236
1237 #[test]
1238 fn execution_cache_mismatch_parent_clears_and_returns() {
1239 let execution_cache = PayloadExecutionCache::default();
1240 let hash = B256::from([3u8; 32]);
1241
1242 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1243
1244 let different_hash = B256::from([4u8; 32]);
1246 let cache = execution_cache.get_cache_for(different_hash);
1247 assert!(cache.is_some(), "cache should be returned for reuse after clearing")
1248 }
1249
1250 #[test]
1251 fn execution_cache_update_after_release_succeeds() {
1252 let execution_cache = PayloadExecutionCache::default();
1253 let initial = B256::from([5u8; 32]);
1254
1255 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1256
1257 let guard =
1258 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1259
1260 drop(guard);
1261
1262 let updated = B256::from([6u8; 32]);
1263 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1264
1265 let new_checkout = execution_cache.get_cache_for(updated);
1266 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1267 }
1268
1269 #[test]
1270 fn on_inserted_executed_block_populates_cache() {
1271 let payload_processor = PayloadProcessor::new(
1272 reth_tasks::Runtime::test(),
1273 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1274 &TreeConfig::default(),
1275 PrecompileCacheMap::default(),
1276 );
1277
1278 let parent_hash = B256::from([1u8; 32]);
1279 let block_hash = B256::from([10u8; 32]);
1280 let block_with_parent = BlockWithParent {
1281 block: BlockNumHash { hash: block_hash, number: 1 },
1282 parent: parent_hash,
1283 };
1284 let bundle_state = BundleState::default();
1285
1286 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1288
1289 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1291
1292 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1294 assert!(cached.is_some());
1295 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1296 }
1297
1298 #[test]
1299 fn on_inserted_executed_block_skips_on_parent_mismatch() {
1300 let payload_processor = PayloadProcessor::new(
1301 reth_tasks::Runtime::test(),
1302 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1303 &TreeConfig::default(),
1304 PrecompileCacheMap::default(),
1305 );
1306
1307 let block1_hash = B256::from([1u8; 32]);
1309 payload_processor
1310 .execution_cache
1311 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1312
1313 let wrong_parent = B256::from([99u8; 32]);
1315 let block3_hash = B256::from([3u8; 32]);
1316 let block_with_parent = BlockWithParent {
1317 block: BlockNumHash { hash: block3_hash, number: 3 },
1318 parent: wrong_parent,
1319 };
1320 let bundle_state = BundleState::default();
1321
1322 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1323
1324 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1326 assert!(cached.is_some(), "Original cache should be preserved");
1327
1328 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1330 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1331 }
1332
1333 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1334 let mut rng = generators::rng();
1335 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1336 let mut updates = Vec::with_capacity(updates_per_account);
1337
1338 for _ in 0..updates_per_account {
1339 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1340 let mut state_update = EvmState::default();
1341
1342 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1343
1344 for &address in selected_addresses {
1345 let mut storage = HashMap::default();
1346 if rng.random_bool(0.7) {
1347 for _ in 0..rng.random_range(1..10) {
1348 let slot = U256::from(rng.random::<u64>());
1349 storage.insert(
1350 slot,
1351 EvmStorageSlot::new_changed(
1352 U256::ZERO,
1353 U256::from(rng.random::<u64>()),
1354 0,
1355 ),
1356 );
1357 }
1358 }
1359
1360 let account = revm_state::Account {
1361 info: AccountInfo {
1362 balance: U256::from(rng.random::<u64>()),
1363 nonce: rng.random::<u64>(),
1364 code_hash: KECCAK_EMPTY,
1365 code: Some(Default::default()),
1366 account_id: None,
1367 },
1368 original_info: Box::new(AccountInfo::default()),
1369 storage,
1370 status: AccountStatus::Touched,
1371 transaction_id: 0,
1372 };
1373
1374 state_update.insert(address, account);
1375 }
1376
1377 updates.push(state_update);
1378 }
1379
1380 updates
1381 }
1382
1383 #[test]
1384 fn test_state_root() {
1385 reth_tracing::init_test_tracing();
1386
1387 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1388 let genesis_hash = init_genesis(&factory).unwrap();
1389
1390 let state_updates = create_mock_state_updates(10, 10);
1391 let mut hashed_state = HashedPostState::default();
1392 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1393 HashMap::default();
1394
1395 {
1396 let provider_rw = factory.provider_rw().expect("failed to get provider");
1397
1398 for update in &state_updates {
1399 let account_updates = update.iter().map(|(address, account)| {
1400 (*address, Some(Account::from_revm_account(account)))
1401 });
1402 provider_rw
1403 .insert_account_for_hashing(account_updates)
1404 .expect("failed to insert accounts");
1405
1406 let storage_updates = update.iter().map(|(address, account)| {
1407 let storage_entries = account.storage.iter().map(|(slot, value)| {
1408 StorageEntry { key: B256::from(*slot), value: value.present_value }
1409 });
1410 (*address, storage_entries)
1411 });
1412 provider_rw
1413 .insert_storage_for_hashing(storage_updates)
1414 .expect("failed to insert storage");
1415 }
1416 provider_rw.commit().expect("failed to commit changes");
1417 }
1418
1419 for update in &state_updates {
1420 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1421
1422 for (address, account) in update {
1423 let storage: HashMap<B256, U256> = account
1424 .storage
1425 .iter()
1426 .map(|(k, v)| (B256::from(*k), v.present_value))
1427 .collect();
1428
1429 let entry = accumulated_state.entry(*address).or_default();
1430 entry.0 = Account::from_revm_account(account);
1431 entry.1.extend(storage);
1432 }
1433 }
1434
1435 let mut payload_processor = PayloadProcessor::new(
1436 reth_tasks::Runtime::test(),
1437 EthEvmConfig::new(factory.chain_spec()),
1438 &TreeConfig::default(),
1439 PrecompileCacheMap::default(),
1440 );
1441
1442 let provider_factory = BlockchainProvider::new(factory).unwrap();
1443
1444 let mut handle = payload_processor.spawn(
1445 ExecutionEnv::test_default(),
1446 (
1447 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1448 std::convert::identity,
1449 ),
1450 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1451 OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1452 &TreeConfig::default(),
1453 None, );
1455
1456 let mut state_hook = handle.state_hook().expect("state hook is None");
1457
1458 for (i, update) in state_updates.into_iter().enumerate() {
1459 state_hook.on_state(StateChangeSource::Transaction(i), &update);
1460 }
1461 drop(state_hook);
1462
1463 let root_from_task = handle.state_root().expect("task failed").state_root;
1464 let root_from_regular = state_root(accumulated_state);
1465
1466 assert_eq!(
1467 root_from_task, root_from_regular,
1468 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1469 );
1470 }
1471}