1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6 sparse_trie::SparseTrieCacheTask,
7 CacheWaitDurations, CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource,
8 ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
9 WaitForCaches,
10};
11use alloy_eip7928::bal::DecodedBal;
12use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
13use alloy_primitives::B256;
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use multiproof::*;
16use prewarm::PrewarmMetrics;
17use rayon::prelude::*;
18use reth_evm::{
19 block::ExecutableTxParts,
20 execute::{ExecutableTxFor, WithTxEnv},
21 ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
22 SpecFor, TxEnvFor,
23};
24use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
25use reth_provider::{
26 BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
27};
28use reth_revm::db::BundleState;
29use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
30use reth_trie::{
31 hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory, HashedPostState,
32};
33use reth_trie_parallel::{
34 proof_task::{ProofTaskCtx, ProofWorkerHandle},
35 root::ParallelStateRootError,
36};
37use reth_trie_sparse::{ArenaParallelSparseTrie, RevealableSparseTrie, SparseStateTrie};
38use std::{
39 ops::Not,
40 sync::{
41 atomic::{AtomicBool, AtomicUsize},
42 mpsc::{self, channel},
43 Arc, OnceLock,
44 },
45};
46use tracing::{debug, debug_span, instrument, trace, warn, Span};
47
48pub mod bal;
49pub(crate) mod bal_prewarm_pool;
50pub mod multiproof;
51mod preserved_sparse_trie;
52pub mod prewarm;
53pub mod receipt_root_task;
54pub mod sparse_trie;
55
56use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
57
58pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
61
62type IteratorTx<Evm, I> = RecoveredTx<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
64
65type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
66 IteratorTx<Evm, I>,
67 <I as ExecutableTxTuple>::Error,
68 <N as NodePrimitives>::Receipt,
69>;
70
71type IteratorPrewarmTxReceiver<Evm, I> =
72 PrewarmTxReceiver<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
73
74type IteratorExecuteTxReceiver<Evm, I> = ExecuteTxReceiver<
75 TxEnvFor<Evm>,
76 <I as ExecutableTxIterator<Evm>>::Recovered,
77 <I as ExecutableTxTuple>::Error,
78>;
79
80type RecoveredTx<TxEnv, Recovered> = WithTxEnv<TxEnv, Recovered>;
81type IndexedTxResult<Tx, Err> = (usize, Result<Tx, Err>);
82type IndexedTxReceiver<Tx, Err> = CrossbeamReceiver<IndexedTxResult<Tx, Err>>;
83type IndexedTxSender<Tx, Err> = CrossbeamSender<IndexedTxResult<Tx, Err>>;
84type PrewarmTxReceiver<TxEnv, Recovered> = mpsc::Receiver<(usize, RecoveredTx<TxEnv, Recovered>)>;
85type ExecuteTxReceiver<TxEnv, Recovered, Err> =
86 IndexedTxReceiver<RecoveredTx<TxEnv, Recovered>, Err>;
87type ExecuteTxSender<TxEnv, Recovered, Err> = IndexedTxSender<RecoveredTx<TxEnv, Recovered>, Err>;
88
89#[derive(Debug)]
91pub struct PayloadProcessor<Evm>
92where
93 Evm: ConfigureEvm,
94{
95 executor: Runtime,
97 execution_cache: PayloadExecutionCache,
99 cache_metrics: Option<CachedStateMetrics>,
101 cache_state_metrics: Option<CachedStateCacheMetrics>,
103 trie_metrics: MultiProofTaskMetrics,
105 cross_block_cache_size: usize,
107 disable_transaction_prewarming: bool,
109 disable_state_cache: bool,
111 evm_config: Evm,
113 precompile_cache_disabled: bool,
115 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
117 sparse_state_trie: SharedPreservedSparseTrie,
121 sparse_trie_max_hot_slots: usize,
123 sparse_trie_max_hot_accounts: usize,
125 disable_sparse_trie_cache_pruning: bool,
127 disable_bal_parallel_state_root: bool,
130 disable_bal_batch_io: bool,
132 bal_prewarm_pool: OnceLock<Arc<bal_prewarm_pool::BalPrewarmPool>>,
135}
136
137impl<N, Evm> PayloadProcessor<Evm>
138where
139 N: NodePrimitives,
140 Evm: ConfigureEvm<Primitives = N>,
141{
142 pub const fn executor(&self) -> &Runtime {
144 &self.executor
145 }
146
147 pub fn new(
149 executor: Runtime,
150 evm_config: Evm,
151 config: &TreeConfig,
152 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
153 ) -> Self {
154 Self {
155 executor,
156 execution_cache: Default::default(),
157 trie_metrics: Default::default(),
158 cross_block_cache_size: config.cross_block_cache_size(),
159 disable_transaction_prewarming: config.disable_prewarming(),
160 evm_config,
161 disable_state_cache: config.disable_state_cache(),
162 precompile_cache_disabled: config.precompile_cache_disabled(),
163 precompile_cache_map,
164 sparse_state_trie: SharedPreservedSparseTrie::default(),
165 sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
166 sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
167 disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
168 cache_metrics: (!config.disable_cache_metrics())
169 .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
170 cache_state_metrics: (!config.disable_cache_metrics())
171 .then(CachedStateCacheMetrics::default),
172 disable_bal_parallel_state_root: config.disable_bal_parallel_state_root(),
173 disable_bal_batch_io: config.disable_bal_batch_io(),
174 bal_prewarm_pool: OnceLock::new(),
175 }
176 }
177
178 fn bal_prewarm_pool(&self) -> Arc<bal_prewarm_pool::BalPrewarmPool> {
181 self.bal_prewarm_pool
182 .get_or_init(|| {
183 bal_prewarm_pool::BalPrewarmPool::new(bal_prewarm_pool::DEFAULT_BAL_PREWARM_THREADS)
184 })
185 .clone()
186 }
187}
188
189impl<Evm> WaitForCaches for PayloadProcessor<Evm>
190where
191 Evm: ConfigureEvm,
192{
193 fn wait_for_caches(&self) -> CacheWaitDurations {
194 debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
195
196 let execution_cache = self.execution_cache.clone();
198 let sparse_trie = self.sparse_state_trie.clone();
199
200 let (execution_tx, execution_rx) = std::sync::mpsc::channel();
202 let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
203
204 self.executor.spawn_blocking_named("wait-exec-cache", move || {
205 let _ = execution_tx.send(execution_cache.wait_for_availability());
206 });
207 self.executor.spawn_blocking_named("wait-sparse-tri", move || {
208 let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
209 });
210
211 let execution_cache_duration =
212 execution_rx.recv().expect("execution cache wait task failed to send result");
213 let sparse_trie_duration =
214 sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
215
216 debug!(
217 target: "engine::tree::payload_processor",
218 ?execution_cache_duration,
219 ?sparse_trie_duration,
220 "Execution cache and sparse trie locks acquired"
221 );
222 CacheWaitDurations {
223 execution_cache: execution_cache_duration,
224 sparse_trie: sparse_trie_duration,
225 }
226 }
227}
228
229impl<N, Evm> PayloadProcessor<Evm>
230where
231 N: NodePrimitives,
232 Evm: ConfigureEvm<Primitives = N> + 'static,
233{
234 #[instrument(
257 level = "debug",
258 target = "engine::tree::payload_processor",
259 name = "payload processor",
260 skip_all
261 )]
262 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
263 &mut self,
264 env: ExecutionEnv<Evm>,
265 transactions: I,
266 provider_builder: StateProviderBuilder<N, P>,
267 multiproof_provider_factory: F,
268 config: &TreeConfig,
269 parallel_bal_execution: bool,
270 ) -> IteratorPayloadHandle<Evm, I, N>
271 where
272 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
273 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
274 + Clone
275 + Send
276 + Sync
277 + 'static,
278 {
279 let (prewarm_rx, execution_rx) =
281 self.spawn_tx_iterator(transactions, env.transaction_count, parallel_bal_execution);
282
283 let span = Span::current();
284
285 let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
286 let state_root_handle = self.spawn_state_root(
287 multiproof_provider_factory,
288 env.parent_state_root,
289 halve_workers,
290 config,
291 );
292 let install_state_hook = !parallel_bal_execution;
301 let prewarm_handle = self.spawn_caching_with(
302 env,
303 prewarm_rx,
304 provider_builder,
305 Some(state_root_handle.updates_tx().clone()),
306 parallel_bal_execution,
307 );
308
309 PayloadHandle {
310 state_root_handle: Some(state_root_handle),
311 install_state_hook,
312 prewarm_handle,
313 transactions: execution_rx,
314 _span: span,
315 }
316 }
317
318 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
322 pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
323 &self,
324 env: ExecutionEnv<Evm>,
325 transactions: I,
326 provider_builder: StateProviderBuilder<N, P>,
327 parallel_bal_execution: bool,
328 ) -> IteratorPayloadHandle<Evm, I, N>
329 where
330 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
331 {
332 let (prewarm_rx, execution_rx) =
333 self.spawn_tx_iterator(transactions, env.transaction_count, parallel_bal_execution);
334 let prewarm_handle =
335 self.spawn_caching_with(env, prewarm_rx, provider_builder, None, false);
336 PayloadHandle {
337 state_root_handle: None,
338 install_state_hook: false,
339 prewarm_handle,
340 transactions: execution_rx,
341 _span: Span::current(),
342 }
343 }
344
345 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
358 pub fn spawn_state_root<F>(
359 &self,
360 multiproof_provider_factory: F,
361 parent_state_root: B256,
362 halve_workers: bool,
363 config: &TreeConfig,
364 ) -> StateRootHandle
365 where
366 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
367 + Clone
368 + Send
369 + Sync
370 + 'static,
371 {
372 let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
373
374 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
375 #[cfg(feature = "trie-debug")]
376 let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
377 let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
378
379 let (state_root_tx, state_root_rx) = channel();
380 let (hashed_state_tx, hashed_state_rx) = channel();
381
382 self.spawn_sparse_trie_task(
383 proof_handle,
384 state_root_tx,
385 hashed_state_tx,
386 from_multi_proof,
387 parent_state_root,
388 config.multiproof_chunk_size(),
389 );
390
391 StateRootHandle::new(parent_state_root, updates_tx, state_root_rx, hashed_state_rx)
392 }
393
394 const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
397
398 const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
405
406 const PARALLEL_PREFETCH_COUNT: usize = 4;
414
415 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
424 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
425 &self,
426 transactions: I,
427 transaction_count: usize,
428 parallel_bal_execution: bool,
429 ) -> (IteratorPrewarmTxReceiver<Evm, I>, IteratorExecuteTxReceiver<Evm, I>) {
430 let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
431 let (execute_tx, execute_rx) = crossbeam_channel::bounded(transaction_count);
432
433 if transaction_count == 0 {
434 } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
436 debug!(
439 target: "engine::tree::payload_processor",
440 transaction_count,
441 "using sequential sig recovery for small block"
442 );
443 self.executor.spawn_blocking_named("tx-iterator", move || {
444 let (transactions, convert) = transactions.into_parts();
445 convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
446 });
447 } else {
448 let executor = self.executor.clone();
451 self.executor.spawn_blocking_named("tx-iterator", move || {
452 let (transactions, convert) = transactions.into_parts();
453 if parallel_bal_execution {
454 executor.cpu_pool().install(|| {
457 transactions
458 .into_par_iter()
459 .enumerate()
460 .map(|(i, tx)| {
461 let tx = convert.convert(tx);
462 (i, tx)
463 })
464 .for_each(|(idx, tx)| {
465 let tx = tx.map(|tx| {
466 let tx = WithTxEnv::new(tx);
467 let _ = prewarm_tx.send((idx, tx.clone()));
468 tx
469 });
470 let _ = execute_tx.send((idx, tx));
471 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
472 });
473 });
474 } else {
475 let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
479 let mut all: Vec<_> = transactions.into_iter().collect();
480 let rest = all.split_off(prefetch);
481
482 convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
485
486 rest.into_par_iter()
489 .enumerate()
490 .map(|(i, tx)| {
491 let idx = i + prefetch;
492 let tx = convert.convert(tx);
493 (idx, tx)
494 })
495 .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
496 let tx = tx.map(|tx| {
497 let tx = WithTxEnv::new(tx);
498 let _ = prewarm_tx.send((idx, tx.clone()));
499 tx
500 });
501 let _ = execute_tx.send((idx, tx));
502 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
503 });
504 }
505 });
506 }
507
508 (prewarm_rx, execute_rx)
509 }
510
511 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
517 fn spawn_caching_with<P>(
518 &self,
519 env: ExecutionEnv<Evm>,
520 transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
521 provider_builder: StateProviderBuilder<N, P>,
522 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
523 parallel_bal_execution: bool,
524 ) -> CacheTaskHandle<N::Receipt>
525 where
526 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
527 {
528 let mode = if parallel_bal_execution {
529 PrewarmMode::BlockAccessList(
530 env.decoded_bal.clone().expect("BAL dispatch implies decoded BAL"),
531 )
532 } else if self.disable_transaction_prewarming ||
533 env.transaction_count < SMALL_BLOCK_TX_THRESHOLD
534 {
535 PrewarmMode::Skipped
536 } else {
537 PrewarmMode::Transactions(transactions)
538 };
539 let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
540
541 let executed_tx_index = Arc::new(AtomicUsize::new(0));
542 let prewarm_ctx = PrewarmContext {
544 env,
545 evm_config: self.evm_config.clone(),
546 saved_cache: saved_cache.clone(),
547 provider: provider_builder,
548 bal_prewarm_pool: parallel_bal_execution.then(|| self.bal_prewarm_pool()),
549 metrics: PrewarmMetrics::default(),
550 cache_metrics: self.cache_metrics.clone(),
551 cache_state_metrics: self.cache_state_metrics.clone(),
552 terminate_execution: Arc::new(AtomicBool::new(false)),
553 executed_tx_index: Arc::clone(&executed_tx_index),
554 precompile_cache_disabled: self.precompile_cache_disabled,
555 precompile_cache_map: self.precompile_cache_map.clone(),
556 disable_bal_parallel_state_root: self.disable_bal_parallel_state_root,
557 disable_bal_batch_io: self.disable_bal_batch_io,
558 };
559
560 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
561 self.executor.clone(),
562 self.execution_cache.clone(),
563 prewarm_ctx,
564 to_sparse_trie_task,
565 );
566 {
567 let to_prewarm_task = to_prewarm_task.clone();
568 self.executor.spawn_blocking_named("prewarm", move || {
569 prewarm_task.run(mode, to_prewarm_task);
570 });
571 }
572
573 CacheTaskHandle {
574 saved_cache,
575 to_prewarm_task: Some(to_prewarm_task),
576 executed_tx_index,
577 cache_metrics: self.cache_metrics.clone(),
578 }
579 }
580
581 #[instrument(level = "debug", target = "engine::caching", skip(self))]
586 pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
587 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
588 debug!("reusing execution cache");
589 cache
590 } else {
591 debug!("creating new execution cache on cache miss");
592 let start = Instant::now();
593 let cache = ExecutionCache::new(self.cross_block_cache_size);
594 if let Some(metrics) = &self.cache_metrics {
595 metrics.record_cache_creation(start.elapsed());
596 }
597 SavedCache::new(parent_hash, cache)
598 }
599 }
600
601 fn spawn_sparse_trie_task(
605 &self,
606 proof_worker_handle: ProofWorkerHandle,
607 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
608 hashed_state_tx: mpsc::Sender<HashedPostState>,
609 from_multi_proof: CrossbeamReceiver<StateRootMessage>,
610 parent_state_root: B256,
611 chunk_size: usize,
612 ) {
613 let preserved_sparse_trie = self.sparse_state_trie.clone();
614 let trie_metrics = self.trie_metrics.clone();
615 let max_hot_slots = self.sparse_trie_max_hot_slots;
616 let max_hot_accounts = self.sparse_trie_max_hot_accounts;
617 let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
618 let executor = self.executor.clone();
619
620 let parent_span = Span::current();
621 self.executor.spawn_blocking_named("sparse-trie", move || {
622 reth_tasks::once!(increase_thread_priority);
623
624 let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
625 .entered();
626
627 let start = Instant::now();
632 let preserved = preserved_sparse_trie.take();
633 trie_metrics
634 .sparse_trie_cache_wait_duration_histogram
635 .record(start.elapsed().as_secs_f64());
636
637 let mut sparse_state_trie = preserved
638 .map(|preserved| preserved.into_trie_for(parent_state_root))
639 .unwrap_or_else(|| {
640 debug!(
641 target: "engine::tree::payload_processor",
642 "Creating new sparse trie - no preserved trie available"
643 );
644 let default_trie =
645 RevealableSparseTrie::blind_from(ArenaParallelSparseTrie::default());
646 SparseStateTrie::default()
647 .with_accounts_trie(default_trie.clone())
648 .with_default_storage_trie(default_trie)
649 .with_updates(true)
650 });
651 sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
652
653 let mut task = SparseTrieCacheTask::new_with_trie(
654 &executor,
655 from_multi_proof,
656 hashed_state_tx,
657 proof_worker_handle,
658 trie_metrics.clone(),
659 sparse_state_trie,
660 parent_state_root,
661 chunk_size,
662 );
663
664 let result = task.run();
665
666 let mut guard = preserved_sparse_trie.lock();
672
673 let task_result = result.as_ref().ok().cloned();
674 if state_root_tx.send(result).is_err() {
676 debug!(
679 target: "engine::tree::payload_processor",
680 "State root receiver dropped, clearing trie"
681 );
682 let (trie, deferred) = task.into_cleared_trie();
683 guard.store(PreservedSparseTrie::cleared(trie));
684 drop(guard);
685 executor.spawn_drop(deferred);
686 return;
687 }
688
689 let _enter =
692 debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
693 let deferred = if let Some(result) = task_result {
694 let start = Instant::now();
695 let (trie, deferred) = task.into_trie_for_reuse(
696 max_hot_slots,
697 max_hot_accounts,
698 disable_cache_pruning,
699 );
700 trie_metrics
701 .into_trie_for_reuse_duration_histogram
702 .record(start.elapsed().as_secs_f64());
703 trie_metrics
704 .sparse_trie_retained_memory_bytes
705 .set(trie.memory_size() as f64);
706 trie_metrics
707 .sparse_trie_retained_storage_tries
708 .set(trie.retained_storage_tries_count() as f64);
709 guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
710 deferred
711 } else {
712 debug!(
713 target: "engine::tree::payload_processor",
714 "State root computation failed, clearing trie"
715 );
716 let (trie, deferred) = task.into_cleared_trie();
717 guard.store(PreservedSparseTrie::cleared(trie));
718 deferred
719 };
720 drop(guard);
721 executor.spawn_drop(deferred);
722 });
723 }
724
725 pub fn on_inserted_executed_block(
733 &self,
734 block_with_parent: BlockWithParent,
735 bundle_state: &BundleState,
736 ) {
737 let cache_state_metrics = self.cache_state_metrics.clone();
738 self.execution_cache.update_with_guard(|cached| {
739 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
740 debug!(
741 target: "engine::caching",
742 parent_hash = %block_with_parent.parent,
743 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
744 );
745 return
746 }
747
748 if let Some(cache) = cached.as_ref().filter(|cache| !cache.is_available()) {
749 debug!(
750 target: "engine::caching",
751 parent_hash = %block_with_parent.parent,
752 usage_count = cache.usage_count(),
753 "Execution cache is in use, skip updating cache with new state for inserted executed block",
754 );
755 return
756 }
757
758 let caches = match cached.take() {
760 Some(existing) => existing.cache().clone(),
761 None => ExecutionCache::new(self.cross_block_cache_size),
762 };
763
764 let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
766 if new_cache.cache().insert_state(bundle_state).is_err() {
767 *cached = None;
768 debug!(target: "engine::caching", "cleared execution cache on update error");
769 return
770 }
771 new_cache.update_metrics(cache_state_metrics.as_ref());
772
773 *cached = Some(new_cache);
775 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
776 });
777 }
778}
779
780fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
782 iter: impl Iterator<Item = RawTx>,
783 convert: &C,
784 prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
785 execute_tx: &ExecuteTxSender<TxEnv, Recovered, Err>,
786) where
787 Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
788 TxEnv: Clone,
789 C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
790{
791 for (idx, raw_tx) in iter.enumerate() {
792 let tx = convert.convert(raw_tx);
793 let tx = tx.map(|tx| WithTxEnv::new(tx));
794 if let Ok(tx) = &tx {
795 let _ = prewarm_tx.send((idx, tx.clone()));
796 }
797 let _ = execute_tx.send((idx, tx));
798 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
799 }
800}
801
802#[derive(Debug)]
807pub struct PayloadHandle<Tx, Err, R> {
808 state_root_handle: Option<StateRootHandle>,
810 install_state_hook: bool,
812 prewarm_handle: CacheTaskHandle<R>,
814 transactions: IndexedTxReceiver<Tx, Err>,
816 _span: Span,
818}
819
820impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
821 #[instrument(
827 level = "debug",
828 target = "engine::tree::payload_processor",
829 name = "await_state_root",
830 skip_all
831 )]
832 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
833 self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
834 }
835
836 pub const fn take_state_root_rx(
843 &mut self,
844 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
845 self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
846 }
847
848 pub fn state_hook(&self) -> Option<impl OnStateHook> {
852 self.install_state_hook
853 .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
854 .flatten()
855 }
856
857 pub fn sparse_trie_updates_tx(&self) -> Option<CrossbeamSender<StateRootMessage>> {
861 self.state_root_handle.as_ref().map(|handle| handle.updates_tx().clone())
862 }
863
864 pub fn caches(&self) -> Option<ExecutionCache> {
866 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
867 }
868
869 pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
871 self.prewarm_handle.cache_metrics.clone()
872 }
873
874 pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
879 &self.prewarm_handle.executed_tx_index
880 }
881
882 pub fn stop_prewarming_execution(&self) {
886 self.prewarm_handle.stop_prewarming_execution()
887 }
888
889 pub fn terminate_caching(
897 &mut self,
898 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
899 ) -> Option<mpsc::Sender<()>> {
900 self.prewarm_handle.terminate_caching(execution_outcome)
901 }
902
903 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
905 self.transactions.iter().map(|(_, tx)| tx)
906 }
907
908 pub fn clone_transaction_receiver(&self) -> IndexedTxReceiver<Tx, Err> {
910 self.transactions.clone()
911 }
912
913 pub fn take_hashed_state_rx(&mut self) -> Option<mpsc::Receiver<HashedPostState>> {
915 self.state_root_handle.as_mut().map(|handle| handle.take_hashed_state_rx())
916 }
917}
918
919#[derive(Debug)]
924pub struct CacheTaskHandle<R> {
925 saved_cache: Option<SavedCache>,
927 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
929 executed_tx_index: Arc<AtomicUsize>,
932 cache_metrics: Option<CachedStateMetrics>,
934}
935
936impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
937 pub fn stop_prewarming_execution(&self) {
941 self.to_prewarm_task
942 .as_ref()
943 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
944 }
945
946 #[must_use = "sender must be used and notified on block validation success"]
951 pub fn terminate_caching(
952 &mut self,
953 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
954 ) -> Option<mpsc::Sender<()>> {
955 if let Some(tx) = self.to_prewarm_task.take() {
956 let (valid_block_tx, valid_block_rx) = mpsc::channel();
957 let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
958 let _ = tx.send(event);
959
960 Some(valid_block_tx)
961 } else {
962 None
963 }
964 }
965}
966
967impl<R> Drop for CacheTaskHandle<R> {
968 fn drop(&mut self) {
969 if let Some(tx) = self.to_prewarm_task.take() {
971 let _ = tx.send(PrewarmTaskEvent::Terminate {
972 execution_outcome: None,
973 valid_block_rx: mpsc::channel().1,
974 });
975 }
976 }
977}
978
979#[derive(Debug, Clone)]
981pub struct ExecutionEnv<Evm: ConfigureEvm> {
982 pub evm_env: EvmEnvFor<Evm>,
984 pub hash: B256,
986 pub parent_hash: B256,
988 pub parent_state_root: B256,
992 pub transaction_count: usize,
995 pub gas_used: u64,
998 pub withdrawals: Option<Vec<Withdrawal>>,
1001 pub decoded_bal: Option<Arc<DecodedBal>>,
1004}
1005
1006impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
1007where
1008 EvmEnvFor<Evm>: Default,
1009{
1010 #[cfg(any(test, feature = "test-utils"))]
1012 pub fn test_default() -> Self {
1013 Self {
1014 evm_env: Default::default(),
1015 hash: Default::default(),
1016 parent_hash: Default::default(),
1017 parent_state_root: Default::default(),
1018 transaction_count: 0,
1019 gas_used: 0,
1020 withdrawals: None,
1021 decoded_bal: None,
1022 }
1023 }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use crate::tree::{
1029 payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1030 precompile_cache::PrecompileCacheMap,
1031 ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
1032 };
1033 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1034 use rand::Rng;
1035 use reth_chainspec::ChainSpec;
1036 use reth_db_common::init::init_genesis;
1037 use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
1038 use reth_evm::OnStateHook;
1039 use reth_evm_ethereum::EthEvmConfig;
1040 use reth_execution_cache::CachedStatus;
1041 use reth_primitives_traits::{Account, Recovered, StorageEntry};
1042 use reth_provider::{
1043 providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
1044 test_utils::create_test_provider_factory_with_chain_spec,
1045 ChainSpecProvider, HashingWriter,
1046 };
1047 use reth_revm::db::BundleState;
1048 use reth_testing_utils::generators;
1049 use reth_trie::{test_utils::state_root, HashedPostState};
1050 use reth_trie_db::ChangesetCache;
1051 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1052 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot, TransactionId};
1053 use std::sync::Arc;
1054
1055 fn make_saved_cache(hash: B256) -> SavedCache {
1056 let execution_cache = ExecutionCache::new(1_000);
1057 SavedCache::new(hash, execution_cache)
1058 }
1059
1060 #[test]
1061 fn execution_cache_allows_single_checkout() {
1062 let execution_cache = PayloadExecutionCache::default();
1063 let hash = B256::from([1u8; 32]);
1064
1065 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1066
1067 let first = execution_cache.get_cache_for(hash);
1068 assert!(first.is_some(), "expected initial checkout to succeed");
1069
1070 let second = execution_cache.get_cache_for(hash);
1071 assert!(second.is_none(), "second checkout should be blocked while guard is active");
1072
1073 drop(first);
1074
1075 let third = execution_cache.get_cache_for(hash);
1076 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1077 }
1078
1079 #[test]
1080 fn execution_cache_checkout_releases_on_drop() {
1081 let execution_cache = PayloadExecutionCache::default();
1082 let hash = B256::from([2u8; 32]);
1083
1084 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1085
1086 {
1087 let guard = execution_cache.get_cache_for(hash);
1088 assert!(guard.is_some(), "expected checkout to succeed");
1089 }
1091
1092 let retry = execution_cache.get_cache_for(hash);
1093 assert!(retry.is_some(), "checkout should succeed after guard drop");
1094 }
1095
1096 #[test]
1097 fn execution_cache_mismatch_parent_clears_and_returns() {
1098 let execution_cache = PayloadExecutionCache::default();
1099 let hash = B256::from([3u8; 32]);
1100
1101 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1102
1103 let different_hash = B256::from([4u8; 32]);
1106 let cache = execution_cache.get_cache_for(different_hash);
1107 assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1108
1109 drop(cache);
1110
1111 let original = execution_cache.get_cache_for(hash);
1114 assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1115 }
1116
1117 #[test]
1118 fn execution_cache_update_after_release_succeeds() {
1119 let execution_cache = PayloadExecutionCache::default();
1120 let initial = B256::from([5u8; 32]);
1121
1122 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1123
1124 let guard =
1125 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1126
1127 drop(guard);
1128
1129 let updated = B256::from([6u8; 32]);
1130 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1131
1132 let new_checkout = execution_cache.get_cache_for(updated);
1133 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1134 }
1135
1136 #[test]
1137 fn on_inserted_executed_block_populates_cache() {
1138 let payload_processor = PayloadProcessor::new(
1139 reth_tasks::Runtime::test(),
1140 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1141 &TreeConfig::default(),
1142 PrecompileCacheMap::default(),
1143 );
1144
1145 let parent_hash = B256::from([1u8; 32]);
1146 let block_hash = B256::from([10u8; 32]);
1147 let block_with_parent = BlockWithParent {
1148 block: BlockNumHash { hash: block_hash, number: 1 },
1149 parent: parent_hash,
1150 };
1151 let bundle_state = BundleState::default();
1152
1153 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1155
1156 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1158
1159 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1161 assert!(cached.is_some());
1162 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1163 }
1164
1165 #[test]
1166 fn on_inserted_executed_block_skips_on_parent_mismatch() {
1167 let payload_processor = PayloadProcessor::new(
1168 reth_tasks::Runtime::test(),
1169 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1170 &TreeConfig::default(),
1171 PrecompileCacheMap::default(),
1172 );
1173
1174 let block1_hash = B256::from([1u8; 32]);
1176 payload_processor
1177 .execution_cache
1178 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1179
1180 let wrong_parent = B256::from([99u8; 32]);
1182 let block3_hash = B256::from([3u8; 32]);
1183 let block_with_parent = BlockWithParent {
1184 block: BlockNumHash { hash: block3_hash, number: 3 },
1185 parent: wrong_parent,
1186 };
1187 let bundle_state = BundleState::default();
1188
1189 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1190
1191 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1193 assert!(cached.is_some(), "Original cache should be preserved");
1194
1195 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1197 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1198 }
1199
1200 #[test]
1201 fn on_inserted_executed_block_does_not_mutate_checked_out_parent_cache() {
1202 let payload_processor = PayloadProcessor::new(
1203 reth_tasks::Runtime::test(),
1204 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1205 &TreeConfig::default(),
1206 PrecompileCacheMap::default(),
1207 );
1208
1209 let parent_hash = B256::from([1u8; 32]);
1210 payload_processor
1211 .execution_cache
1212 .update_with_guard(|slot| *slot = Some(make_saved_cache(parent_hash)));
1213
1214 let checked_out = payload_processor
1218 .execution_cache
1219 .get_cache_for(parent_hash)
1220 .expect("expected parent cache checkout to succeed");
1221
1222 let polluted_address = Address::random();
1223 let bundle_state = BundleState::builder(2..=2)
1224 .state_present_account_info(
1225 polluted_address,
1226 AccountInfo {
1227 balance: U256::from(1337),
1228 nonce: 7,
1229 code_hash: KECCAK_EMPTY,
1230 code: None,
1231 account_id: None,
1232 },
1233 )
1234 .build();
1235
1236 let block_with_parent = BlockWithParent {
1239 block: BlockNumHash { hash: B256::from([2u8; 32]), number: 2 },
1240 parent: parent_hash,
1241 };
1242
1243 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1244
1245 let account = checked_out
1248 .cache()
1249 .get_or_try_insert_account_with(polluted_address, || Ok::<_, ()>(None))
1250 .expect("cache read should succeed");
1251
1252 assert_eq!(
1253 account,
1254 CachedStatus::NotCached(None),
1255 "checked-out parent cache should not observe state from inserted local block"
1256 );
1257 }
1258
1259 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1260 let mut rng = generators::rng();
1261 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1262 let mut updates = Vec::with_capacity(updates_per_account);
1263
1264 for _ in 0..updates_per_account {
1265 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1266 let mut state_update = EvmState::default();
1267
1268 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1269
1270 for &address in selected_addresses {
1271 let mut storage = HashMap::default();
1272 if rng.random_bool(0.7) {
1273 for _ in 0..rng.random_range(1..10) {
1274 let slot = U256::from(rng.random::<u64>());
1275 storage.insert(
1276 slot,
1277 EvmStorageSlot::new_changed(
1278 U256::ZERO,
1279 U256::from(rng.random::<u64>()),
1280 TransactionId::ZERO,
1281 ),
1282 );
1283 }
1284 }
1285
1286 let mut account = revm_state::Account::default();
1287 account.info = AccountInfo {
1288 balance: U256::from(rng.random::<u64>()),
1289 nonce: rng.random::<u64>(),
1290 code_hash: KECCAK_EMPTY,
1291 code: Some(Default::default()),
1292 account_id: None,
1293 };
1294 account.storage = storage;
1295 account.status = AccountStatus::Touched;
1296 account.transaction_id = TransactionId::ZERO;
1297
1298 state_update.insert(address, account);
1299 }
1300
1301 updates.push(state_update);
1302 }
1303
1304 updates
1305 }
1306
1307 #[test]
1308 fn test_state_root() {
1309 reth_tracing::init_test_tracing();
1310
1311 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1312 let genesis_hash = init_genesis(&factory).unwrap();
1313
1314 let state_updates = create_mock_state_updates(10, 10);
1315 let mut hashed_state = HashedPostState::default();
1316 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1317 HashMap::default();
1318
1319 {
1320 let provider_rw = factory.provider_rw().expect("failed to get provider");
1321
1322 for update in &state_updates {
1323 let account_updates = update.iter().map(|(address, account)| {
1324 (*address, Some(Account::from_revm_account(account)))
1325 });
1326 provider_rw
1327 .insert_account_for_hashing(account_updates)
1328 .expect("failed to insert accounts");
1329
1330 let storage_updates = update.iter().map(|(address, account)| {
1331 let storage_entries = account.storage.iter().map(|(slot, value)| {
1332 StorageEntry { key: B256::from(*slot), value: value.present_value }
1333 });
1334 (*address, storage_entries)
1335 });
1336 provider_rw
1337 .insert_storage_for_hashing(storage_updates)
1338 .expect("failed to insert storage");
1339 }
1340 provider_rw.commit().expect("failed to commit changes");
1341 }
1342
1343 for update in &state_updates {
1344 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1345
1346 for (address, account) in update {
1347 let storage: HashMap<B256, U256> = account
1348 .storage
1349 .iter()
1350 .map(|(k, v)| (B256::from(*k), v.present_value))
1351 .collect();
1352
1353 let entry = accumulated_state.entry(*address).or_default();
1354 entry.0 = Account::from_revm_account(account);
1355 entry.1.extend(storage);
1356 }
1357 }
1358
1359 let mut payload_processor = PayloadProcessor::new(
1360 reth_tasks::Runtime::test(),
1361 EthEvmConfig::new(factory.chain_spec()),
1362 &TreeConfig::default(),
1363 PrecompileCacheMap::default(),
1364 );
1365
1366 let provider_factory = BlockchainProvider::new(factory).unwrap();
1367
1368 let mut handle = payload_processor.spawn(
1369 ExecutionEnv::test_default(),
1370 (
1371 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1372 std::convert::identity,
1373 ),
1374 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1375 OverlayStateProviderFactory::new(
1376 provider_factory,
1377 OverlayBuilder::<EthPrimitives>::new(genesis_hash, ChangesetCache::new()),
1378 ),
1379 &TreeConfig::default(),
1380 false,
1381 );
1382
1383 let mut state_hook = handle.state_hook().expect("state hook is None");
1384
1385 for update in state_updates {
1386 state_hook.on_state(update);
1387 }
1388 drop(state_hook);
1389
1390 let root_from_task = handle.state_root().expect("task failed").state_root;
1391 let root_from_regular = state_root(accumulated_state);
1392
1393 assert_eq!(
1394 root_from_task, root_from_regular,
1395 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1396 );
1397 }
1398
1399 #[test]
1410 fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1411 let execution_cache = PayloadExecutionCache::default();
1412
1413 let block4_hash = B256::from([4u8; 32]);
1415 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1416
1417 let fork_parent = B256::from([2u8; 32]);
1420 let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1421 assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1422 let prewarm_cache = prewarm_cache.unwrap();
1423 assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1424
1425 let fork_addr = Address::from([0xBB; 20]);
1428 let fork_key = B256::from([0xCC; 32]);
1429 prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1430
1431 let during_prewarm = execution_cache.get_cache_for(block4_hash);
1433 assert!(
1434 during_prewarm.is_none(),
1435 "cache must be unavailable while prewarm holds a reference"
1436 );
1437
1438 drop(prewarm_cache);
1440
1441 let block5_cache = execution_cache.get_cache_for(block4_hash);
1445 assert!(
1446 block5_cache.is_some(),
1447 "canonical chain must get cache after fork prewarm is dropped"
1448 );
1449 assert_eq!(
1450 block5_cache.as_ref().unwrap().executed_block_hash(),
1451 block4_hash,
1452 "cache must carry the canonical parent hash, not the fork parent"
1453 );
1454 }
1455}