1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6 sparse_trie::SparseTrieCacheTask,
7 CacheWaitDurations, CachedStateCacheMetrics, CachedStateMetrics, CachedStateMetricsSource,
8 ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
9 WaitForCaches,
10};
11use alloy_eip7928::bal::DecodedBal;
12use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
13use alloy_primitives::B256;
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use multiproof::*;
16use prewarm::PrewarmMetrics;
17use rayon::prelude::*;
18use reth_evm::{
19 block::ExecutableTxParts,
20 execute::{ExecutableTxFor, WithTxEnv},
21 ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
22 SpecFor, TxEnvFor,
23};
24use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
25use reth_provider::{
26 BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
27};
28use reth_revm::db::BundleState;
29use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
30use reth_trie::{
31 hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory, HashedPostState,
32};
33use reth_trie_parallel::{
34 proof_task::{ProofTaskCtx, ProofWorkerHandle},
35 root::ParallelStateRootError,
36};
37use reth_trie_sparse::{
38 ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
39};
40use std::{
41 ops::Not,
42 sync::{
43 atomic::{AtomicBool, AtomicUsize},
44 mpsc::{self, channel},
45 Arc,
46 },
47};
48use tracing::{debug, debug_span, instrument, trace, warn, Span};
49
50pub mod bal;
51pub mod multiproof;
52mod preserved_sparse_trie;
53pub mod prewarm;
54pub mod receipt_root_task;
55pub mod sparse_trie;
56
57use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
58
59pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
68
69pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
81
82pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
85
86type IteratorTx<Evm, I> = RecoveredTx<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
88
89type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
90 IteratorTx<Evm, I>,
91 <I as ExecutableTxTuple>::Error,
92 <N as NodePrimitives>::Receipt,
93>;
94
95type IteratorPrewarmTxReceiver<Evm, I> =
96 PrewarmTxReceiver<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
97
98type IteratorExecuteTxReceiver<Evm, I> = ExecuteTxReceiver<
99 TxEnvFor<Evm>,
100 <I as ExecutableTxIterator<Evm>>::Recovered,
101 <I as ExecutableTxTuple>::Error,
102>;
103
104type RecoveredTx<TxEnv, Recovered> = WithTxEnv<TxEnv, Recovered>;
105type IndexedTxResult<Tx, Err> = (usize, Result<Tx, Err>);
106type IndexedTxReceiver<Tx, Err> = CrossbeamReceiver<IndexedTxResult<Tx, Err>>;
107type IndexedTxSender<Tx, Err> = CrossbeamSender<IndexedTxResult<Tx, Err>>;
108type PrewarmTxReceiver<TxEnv, Recovered> = mpsc::Receiver<(usize, RecoveredTx<TxEnv, Recovered>)>;
109type ExecuteTxReceiver<TxEnv, Recovered, Err> =
110 IndexedTxReceiver<RecoveredTx<TxEnv, Recovered>, Err>;
111type ExecuteTxSender<TxEnv, Recovered, Err> = IndexedTxSender<RecoveredTx<TxEnv, Recovered>, Err>;
112
113#[derive(Debug)]
115pub struct PayloadProcessor<Evm>
116where
117 Evm: ConfigureEvm,
118{
119 executor: Runtime,
121 execution_cache: PayloadExecutionCache,
123 cache_metrics: Option<CachedStateMetrics>,
125 cache_state_metrics: Option<CachedStateCacheMetrics>,
127 trie_metrics: MultiProofTaskMetrics,
129 cross_block_cache_size: usize,
131 disable_transaction_prewarming: bool,
133 disable_state_cache: bool,
135 evm_config: Evm,
137 precompile_cache_disabled: bool,
139 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
141 sparse_state_trie: SharedPreservedSparseTrie,
145 sparse_trie_max_hot_slots: usize,
147 sparse_trie_max_hot_accounts: usize,
149 disable_sparse_trie_cache_pruning: bool,
151 disable_bal_parallel_state_root: bool,
154 disable_bal_batch_io: bool,
156}
157
158impl<N, Evm> PayloadProcessor<Evm>
159where
160 N: NodePrimitives,
161 Evm: ConfigureEvm<Primitives = N>,
162{
163 pub const fn executor(&self) -> &Runtime {
165 &self.executor
166 }
167
168 pub fn new(
170 executor: Runtime,
171 evm_config: Evm,
172 config: &TreeConfig,
173 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
174 ) -> Self {
175 Self {
176 executor,
177 execution_cache: Default::default(),
178 trie_metrics: Default::default(),
179 cross_block_cache_size: config.cross_block_cache_size(),
180 disable_transaction_prewarming: config.disable_prewarming(),
181 evm_config,
182 disable_state_cache: config.disable_state_cache(),
183 precompile_cache_disabled: config.precompile_cache_disabled(),
184 precompile_cache_map,
185 sparse_state_trie: SharedPreservedSparseTrie::default(),
186 sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
187 sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
188 disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
189 cache_metrics: (!config.disable_cache_metrics())
190 .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
191 cache_state_metrics: (!config.disable_cache_metrics())
192 .then(CachedStateCacheMetrics::default),
193 disable_bal_parallel_state_root: config.disable_bal_parallel_state_root(),
194 disable_bal_batch_io: config.disable_bal_batch_io(),
195 }
196 }
197}
198
199impl<Evm> WaitForCaches for PayloadProcessor<Evm>
200where
201 Evm: ConfigureEvm,
202{
203 fn wait_for_caches(&self) -> CacheWaitDurations {
204 debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
205
206 let execution_cache = self.execution_cache.clone();
208 let sparse_trie = self.sparse_state_trie.clone();
209
210 let (execution_tx, execution_rx) = std::sync::mpsc::channel();
212 let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
213
214 self.executor.spawn_blocking_named("wait-exec-cache", move || {
215 let _ = execution_tx.send(execution_cache.wait_for_availability());
216 });
217 self.executor.spawn_blocking_named("wait-sparse-tri", move || {
218 let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
219 });
220
221 let execution_cache_duration =
222 execution_rx.recv().expect("execution cache wait task failed to send result");
223 let sparse_trie_duration =
224 sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
225
226 debug!(
227 target: "engine::tree::payload_processor",
228 ?execution_cache_duration,
229 ?sparse_trie_duration,
230 "Execution cache and sparse trie locks acquired"
231 );
232 CacheWaitDurations {
233 execution_cache: execution_cache_duration,
234 sparse_trie: sparse_trie_duration,
235 }
236 }
237}
238
239impl<N, Evm> PayloadProcessor<Evm>
240where
241 N: NodePrimitives,
242 Evm: ConfigureEvm<Primitives = N> + 'static,
243{
244 #[instrument(
267 level = "debug",
268 target = "engine::tree::payload_processor",
269 name = "payload processor",
270 skip_all
271 )]
272 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
273 &mut self,
274 env: ExecutionEnv<Evm>,
275 transactions: I,
276 provider_builder: StateProviderBuilder<N, P>,
277 multiproof_provider_factory: F,
278 config: &TreeConfig,
279 parallel_bal_execution: bool,
280 ) -> IteratorPayloadHandle<Evm, I, N>
281 where
282 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
283 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
284 + Clone
285 + Send
286 + Sync
287 + 'static,
288 {
289 let (prewarm_rx, execution_rx) =
291 self.spawn_tx_iterator(transactions, env.transaction_count);
292
293 let span = Span::current();
294
295 let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
296 let state_root_handle = self.spawn_state_root(
297 multiproof_provider_factory,
298 env.parent_state_root,
299 halve_workers,
300 config,
301 );
302 let install_state_hook = !parallel_bal_execution;
311 let prewarm_handle = self.spawn_caching_with(
312 env,
313 prewarm_rx,
314 provider_builder,
315 Some(state_root_handle.updates_tx().clone()),
316 parallel_bal_execution,
317 );
318
319 PayloadHandle {
320 state_root_handle: Some(state_root_handle),
321 install_state_hook,
322 prewarm_handle,
323 transactions: execution_rx,
324 _span: span,
325 }
326 }
327
328 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
332 pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
333 &self,
334 env: ExecutionEnv<Evm>,
335 transactions: I,
336 provider_builder: StateProviderBuilder<N, P>,
337 ) -> IteratorPayloadHandle<Evm, I, N>
338 where
339 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
340 {
341 let (prewarm_rx, execution_rx) =
342 self.spawn_tx_iterator(transactions, env.transaction_count);
343 let prewarm_handle =
344 self.spawn_caching_with(env, prewarm_rx, provider_builder, None, false);
345 PayloadHandle {
346 state_root_handle: None,
347 install_state_hook: false,
348 prewarm_handle,
349 transactions: execution_rx,
350 _span: Span::current(),
351 }
352 }
353
354 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
367 pub fn spawn_state_root<F>(
368 &self,
369 multiproof_provider_factory: F,
370 parent_state_root: B256,
371 halve_workers: bool,
372 config: &TreeConfig,
373 ) -> StateRootHandle
374 where
375 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
376 + Clone
377 + Send
378 + Sync
379 + 'static,
380 {
381 let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
382
383 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
384 #[cfg(feature = "trie-debug")]
385 let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
386 let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
387
388 let (state_root_tx, state_root_rx) = channel();
389 let (hashed_state_tx, hashed_state_rx) = channel();
390
391 self.spawn_sparse_trie_task(
392 proof_handle,
393 state_root_tx,
394 hashed_state_tx,
395 from_multi_proof,
396 parent_state_root,
397 config.multiproof_chunk_size(),
398 );
399
400 StateRootHandle::new(parent_state_root, updates_tx, state_root_rx, hashed_state_rx)
401 }
402
403 const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
406
407 const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
414
415 const PARALLEL_PREFETCH_COUNT: usize = 4;
423
424 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
431 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
432 &self,
433 transactions: I,
434 transaction_count: usize,
435 ) -> (IteratorPrewarmTxReceiver<Evm, I>, IteratorExecuteTxReceiver<Evm, I>) {
436 let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
437 let (execute_tx, execute_rx) = crossbeam_channel::bounded(transaction_count);
438
439 if transaction_count == 0 {
440 } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
442 debug!(
445 target: "engine::tree::payload_processor",
446 transaction_count,
447 "using sequential sig recovery for small block"
448 );
449 self.executor.spawn_blocking_named("tx-iterator", move || {
450 let (transactions, convert) = transactions.into_parts();
451 convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
452 });
453 } else {
454 let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
461 let executor = self.executor.clone();
462 self.executor.spawn_blocking_named("tx-iterator", move || {
463 let (transactions, convert) = transactions.into_parts();
464 let mut all: Vec<_> = transactions.into_iter().collect();
465 let rest = all.split_off(prefetch.min(all.len()));
466
467 convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
470
471 rest.into_par_iter()
473 .enumerate()
474 .map(|(i, tx)| {
475 let idx = i + prefetch;
476 let tx = convert.convert(tx);
477 (idx, tx)
478 })
479 .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
480 let tx = tx.map(|tx| {
481 let tx = WithTxEnv::new(tx);
482 let _ = prewarm_tx.send((idx, tx.clone()));
483 tx
484 });
485 let _ = execute_tx.send((idx, tx));
486 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
487 });
488 });
489 }
490
491 (prewarm_rx, execute_rx)
492 }
493
494 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
500 fn spawn_caching_with<P>(
501 &self,
502 env: ExecutionEnv<Evm>,
503 transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
504 provider_builder: StateProviderBuilder<N, P>,
505 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
506 parallel_bal_execution: bool,
507 ) -> CacheTaskHandle<N::Receipt>
508 where
509 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
510 {
511 let mode = if parallel_bal_execution {
512 PrewarmMode::BlockAccessList(
513 env.decoded_bal.clone().expect("BAL dispatch implies decoded BAL"),
514 )
515 } else if self.disable_transaction_prewarming ||
516 env.transaction_count < SMALL_BLOCK_TX_THRESHOLD
517 {
518 PrewarmMode::Skipped
519 } else {
520 PrewarmMode::Transactions(transactions)
521 };
522 let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
523
524 let executed_tx_index = Arc::new(AtomicUsize::new(0));
525 let prewarm_ctx = PrewarmContext {
527 env,
528 evm_config: self.evm_config.clone(),
529 saved_cache: saved_cache.clone(),
530 provider: provider_builder,
531 metrics: PrewarmMetrics::default(),
532 cache_metrics: self.cache_metrics.clone(),
533 cache_state_metrics: self.cache_state_metrics.clone(),
534 terminate_execution: Arc::new(AtomicBool::new(false)),
535 executed_tx_index: Arc::clone(&executed_tx_index),
536 precompile_cache_disabled: self.precompile_cache_disabled,
537 precompile_cache_map: self.precompile_cache_map.clone(),
538 disable_bal_parallel_state_root: self.disable_bal_parallel_state_root,
539 disable_bal_batch_io: self.disable_bal_batch_io,
540 };
541
542 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
543 self.executor.clone(),
544 self.execution_cache.clone(),
545 prewarm_ctx,
546 to_sparse_trie_task,
547 );
548 {
549 let to_prewarm_task = to_prewarm_task.clone();
550 self.executor.spawn_blocking_named("prewarm", move || {
551 prewarm_task.run(mode, to_prewarm_task);
552 });
553 }
554
555 CacheTaskHandle {
556 saved_cache,
557 to_prewarm_task: Some(to_prewarm_task),
558 executed_tx_index,
559 cache_metrics: self.cache_metrics.clone(),
560 }
561 }
562
563 #[instrument(level = "debug", target = "engine::caching", skip(self))]
568 pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
569 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
570 debug!("reusing execution cache");
571 cache
572 } else {
573 debug!("creating new execution cache on cache miss");
574 let start = Instant::now();
575 let cache = ExecutionCache::new(self.cross_block_cache_size);
576 if let Some(metrics) = &self.cache_metrics {
577 metrics.record_cache_creation(start.elapsed());
578 }
579 SavedCache::new(parent_hash, cache)
580 }
581 }
582
583 fn spawn_sparse_trie_task(
587 &self,
588 proof_worker_handle: ProofWorkerHandle,
589 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
590 hashed_state_tx: mpsc::Sender<HashedPostState>,
591 from_multi_proof: CrossbeamReceiver<StateRootMessage>,
592 parent_state_root: B256,
593 chunk_size: usize,
594 ) {
595 let preserved_sparse_trie = self.sparse_state_trie.clone();
596 let trie_metrics = self.trie_metrics.clone();
597 let max_hot_slots = self.sparse_trie_max_hot_slots;
598 let max_hot_accounts = self.sparse_trie_max_hot_accounts;
599 let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
600 let executor = self.executor.clone();
601
602 let parent_span = Span::current();
603 self.executor.spawn_blocking_named("sparse-trie", move || {
604 reth_tasks::once!(increase_thread_priority);
605
606 let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
607 .entered();
608
609 let start = Instant::now();
614 let preserved = preserved_sparse_trie.take();
615 trie_metrics
616 .sparse_trie_cache_wait_duration_histogram
617 .record(start.elapsed().as_secs_f64());
618
619 let mut sparse_state_trie = preserved
620 .map(|preserved| preserved.into_trie_for(parent_state_root))
621 .unwrap_or_else(|| {
622 debug!(
623 target: "engine::tree::payload_processor",
624 "Creating new sparse trie - no preserved trie available"
625 );
626 let default_trie = RevealableSparseTrie::blind_from(
627 ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
628 );
629 SparseStateTrie::default()
630 .with_accounts_trie(default_trie.clone())
631 .with_default_storage_trie(default_trie)
632 .with_updates(true)
633 });
634 sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
635
636 let mut task = SparseTrieCacheTask::new_with_trie(
637 &executor,
638 from_multi_proof,
639 hashed_state_tx,
640 proof_worker_handle,
641 trie_metrics.clone(),
642 sparse_state_trie,
643 parent_state_root,
644 chunk_size,
645 );
646
647 let result = task.run();
648
649 let mut guard = preserved_sparse_trie.lock();
655
656 let task_result = result.as_ref().ok().cloned();
657 if state_root_tx.send(result).is_err() {
659 debug!(
662 target: "engine::tree::payload_processor",
663 "State root receiver dropped, clearing trie"
664 );
665 let (trie, deferred) = task.into_cleared_trie(
666 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
667 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
668 );
669 guard.store(PreservedSparseTrie::cleared(trie));
670 drop(guard);
671 executor.spawn_drop(deferred);
672 return;
673 }
674
675 let _enter =
678 debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
679 let deferred = if let Some(result) = task_result {
680 let start = Instant::now();
681 let (trie, deferred) = task.into_trie_for_reuse(
682 max_hot_slots,
683 max_hot_accounts,
684 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
685 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
686 disable_cache_pruning,
687 &result.trie_updates,
688 );
689 trie_metrics
690 .into_trie_for_reuse_duration_histogram
691 .record(start.elapsed().as_secs_f64());
692 trie_metrics
693 .sparse_trie_retained_memory_bytes
694 .set(trie.memory_size() as f64);
695 trie_metrics
696 .sparse_trie_retained_storage_tries
697 .set(trie.retained_storage_tries_count() as f64);
698 guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
699 deferred
700 } else {
701 debug!(
702 target: "engine::tree::payload_processor",
703 "State root computation failed, clearing trie"
704 );
705 let (trie, deferred) = task.into_cleared_trie(
706 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
707 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
708 );
709 guard.store(PreservedSparseTrie::cleared(trie));
710 deferred
711 };
712 drop(guard);
713 executor.spawn_drop(deferred);
714 });
715 }
716
717 pub fn on_inserted_executed_block(
725 &self,
726 block_with_parent: BlockWithParent,
727 bundle_state: &BundleState,
728 ) {
729 let cache_state_metrics = self.cache_state_metrics.clone();
730 self.execution_cache.update_with_guard(|cached| {
731 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
732 debug!(
733 target: "engine::caching",
734 parent_hash = %block_with_parent.parent,
735 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
736 );
737 return
738 }
739
740 if let Some(cache) = cached.as_ref().filter(|cache| !cache.is_available()) {
741 debug!(
742 target: "engine::caching",
743 parent_hash = %block_with_parent.parent,
744 usage_count = cache.usage_count(),
745 "Execution cache is in use, skip updating cache with new state for inserted executed block",
746 );
747 return
748 }
749
750 let caches = match cached.take() {
752 Some(existing) => existing.cache().clone(),
753 None => ExecutionCache::new(self.cross_block_cache_size),
754 };
755
756 let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
758 if new_cache.cache().insert_state(bundle_state).is_err() {
759 *cached = None;
760 debug!(target: "engine::caching", "cleared execution cache on update error");
761 return
762 }
763 new_cache.update_metrics(cache_state_metrics.as_ref());
764
765 *cached = Some(new_cache);
767 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
768 });
769 }
770}
771
772fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
774 iter: impl Iterator<Item = RawTx>,
775 convert: &C,
776 prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
777 execute_tx: &ExecuteTxSender<TxEnv, Recovered, Err>,
778) where
779 Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
780 TxEnv: Clone,
781 C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
782{
783 for (idx, raw_tx) in iter.enumerate() {
784 let tx = convert.convert(raw_tx);
785 let tx = tx.map(|tx| WithTxEnv::new(tx));
786 if let Ok(tx) = &tx {
787 let _ = prewarm_tx.send((idx, tx.clone()));
788 }
789 let _ = execute_tx.send((idx, tx));
790 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
791 }
792}
793
794#[derive(Debug)]
799pub struct PayloadHandle<Tx, Err, R> {
800 state_root_handle: Option<StateRootHandle>,
802 install_state_hook: bool,
804 prewarm_handle: CacheTaskHandle<R>,
806 transactions: IndexedTxReceiver<Tx, Err>,
808 _span: Span,
810}
811
812impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
813 #[instrument(
819 level = "debug",
820 target = "engine::tree::payload_processor",
821 name = "await_state_root",
822 skip_all
823 )]
824 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
825 self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
826 }
827
828 pub const fn take_state_root_rx(
835 &mut self,
836 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
837 self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
838 }
839
840 pub fn state_hook(&self) -> Option<impl OnStateHook> {
844 self.install_state_hook
845 .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
846 .flatten()
847 }
848
849 pub fn sparse_trie_updates_tx(&self) -> Option<CrossbeamSender<StateRootMessage>> {
853 self.state_root_handle.as_ref().map(|handle| handle.updates_tx().clone())
854 }
855
856 pub fn caches(&self) -> Option<ExecutionCache> {
858 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
859 }
860
861 pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
863 self.prewarm_handle.cache_metrics.clone()
864 }
865
866 pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
871 &self.prewarm_handle.executed_tx_index
872 }
873
874 pub fn stop_prewarming_execution(&self) {
878 self.prewarm_handle.stop_prewarming_execution()
879 }
880
881 pub fn terminate_caching(
889 &mut self,
890 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
891 ) -> Option<mpsc::Sender<()>> {
892 self.prewarm_handle.terminate_caching(execution_outcome)
893 }
894
895 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
897 self.transactions.iter().map(|(_, tx)| tx)
898 }
899
900 pub fn clone_transaction_receiver(&self) -> IndexedTxReceiver<Tx, Err> {
902 self.transactions.clone()
903 }
904
905 pub fn take_hashed_state_rx(&mut self) -> Option<mpsc::Receiver<HashedPostState>> {
907 self.state_root_handle.as_mut().map(|handle| handle.take_hashed_state_rx())
908 }
909}
910
911#[derive(Debug)]
916pub struct CacheTaskHandle<R> {
917 saved_cache: Option<SavedCache>,
919 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
921 executed_tx_index: Arc<AtomicUsize>,
924 cache_metrics: Option<CachedStateMetrics>,
926}
927
928impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
929 pub fn stop_prewarming_execution(&self) {
933 self.to_prewarm_task
934 .as_ref()
935 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
936 }
937
938 #[must_use = "sender must be used and notified on block validation success"]
943 pub fn terminate_caching(
944 &mut self,
945 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
946 ) -> Option<mpsc::Sender<()>> {
947 if let Some(tx) = self.to_prewarm_task.take() {
948 let (valid_block_tx, valid_block_rx) = mpsc::channel();
949 let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
950 let _ = tx.send(event);
951
952 Some(valid_block_tx)
953 } else {
954 None
955 }
956 }
957}
958
959impl<R> Drop for CacheTaskHandle<R> {
960 fn drop(&mut self) {
961 if let Some(tx) = self.to_prewarm_task.take() {
963 let _ = tx.send(PrewarmTaskEvent::Terminate {
964 execution_outcome: None,
965 valid_block_rx: mpsc::channel().1,
966 });
967 }
968 }
969}
970
971#[derive(Debug, Clone)]
973pub struct ExecutionEnv<Evm: ConfigureEvm> {
974 pub evm_env: EvmEnvFor<Evm>,
976 pub hash: B256,
978 pub parent_hash: B256,
980 pub parent_state_root: B256,
984 pub transaction_count: usize,
988 pub gas_used: u64,
991 pub withdrawals: Option<Vec<Withdrawal>>,
994 pub decoded_bal: Option<Arc<DecodedBal>>,
997}
998
999impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
1000where
1001 EvmEnvFor<Evm>: Default,
1002{
1003 #[cfg(any(test, feature = "test-utils"))]
1005 pub fn test_default() -> Self {
1006 Self {
1007 evm_env: Default::default(),
1008 hash: Default::default(),
1009 parent_hash: Default::default(),
1010 parent_state_root: Default::default(),
1011 transaction_count: 0,
1012 gas_used: 0,
1013 withdrawals: None,
1014 decoded_bal: None,
1015 }
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use crate::tree::{
1022 payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1023 precompile_cache::PrecompileCacheMap,
1024 ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
1025 };
1026 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1027 use rand::Rng;
1028 use reth_chainspec::ChainSpec;
1029 use reth_db_common::init::init_genesis;
1030 use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
1031 use reth_evm::OnStateHook;
1032 use reth_evm_ethereum::EthEvmConfig;
1033 use reth_execution_cache::CachedStatus;
1034 use reth_primitives_traits::{Account, Recovered, StorageEntry};
1035 use reth_provider::{
1036 providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
1037 test_utils::create_test_provider_factory_with_chain_spec,
1038 ChainSpecProvider, HashingWriter,
1039 };
1040 use reth_revm::db::BundleState;
1041 use reth_testing_utils::generators;
1042 use reth_trie::{test_utils::state_root, HashedPostState};
1043 use reth_trie_db::ChangesetCache;
1044 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1045 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot, TransactionId};
1046 use std::sync::Arc;
1047
1048 fn make_saved_cache(hash: B256) -> SavedCache {
1049 let execution_cache = ExecutionCache::new(1_000);
1050 SavedCache::new(hash, execution_cache)
1051 }
1052
1053 #[test]
1054 fn execution_cache_allows_single_checkout() {
1055 let execution_cache = PayloadExecutionCache::default();
1056 let hash = B256::from([1u8; 32]);
1057
1058 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1059
1060 let first = execution_cache.get_cache_for(hash);
1061 assert!(first.is_some(), "expected initial checkout to succeed");
1062
1063 let second = execution_cache.get_cache_for(hash);
1064 assert!(second.is_none(), "second checkout should be blocked while guard is active");
1065
1066 drop(first);
1067
1068 let third = execution_cache.get_cache_for(hash);
1069 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1070 }
1071
1072 #[test]
1073 fn execution_cache_checkout_releases_on_drop() {
1074 let execution_cache = PayloadExecutionCache::default();
1075 let hash = B256::from([2u8; 32]);
1076
1077 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1078
1079 {
1080 let guard = execution_cache.get_cache_for(hash);
1081 assert!(guard.is_some(), "expected checkout to succeed");
1082 }
1084
1085 let retry = execution_cache.get_cache_for(hash);
1086 assert!(retry.is_some(), "checkout should succeed after guard drop");
1087 }
1088
1089 #[test]
1090 fn execution_cache_mismatch_parent_clears_and_returns() {
1091 let execution_cache = PayloadExecutionCache::default();
1092 let hash = B256::from([3u8; 32]);
1093
1094 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1095
1096 let different_hash = B256::from([4u8; 32]);
1099 let cache = execution_cache.get_cache_for(different_hash);
1100 assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1101
1102 drop(cache);
1103
1104 let original = execution_cache.get_cache_for(hash);
1107 assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1108 }
1109
1110 #[test]
1111 fn execution_cache_update_after_release_succeeds() {
1112 let execution_cache = PayloadExecutionCache::default();
1113 let initial = B256::from([5u8; 32]);
1114
1115 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1116
1117 let guard =
1118 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1119
1120 drop(guard);
1121
1122 let updated = B256::from([6u8; 32]);
1123 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1124
1125 let new_checkout = execution_cache.get_cache_for(updated);
1126 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1127 }
1128
1129 #[test]
1130 fn on_inserted_executed_block_populates_cache() {
1131 let payload_processor = PayloadProcessor::new(
1132 reth_tasks::Runtime::test(),
1133 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1134 &TreeConfig::default(),
1135 PrecompileCacheMap::default(),
1136 );
1137
1138 let parent_hash = B256::from([1u8; 32]);
1139 let block_hash = B256::from([10u8; 32]);
1140 let block_with_parent = BlockWithParent {
1141 block: BlockNumHash { hash: block_hash, number: 1 },
1142 parent: parent_hash,
1143 };
1144 let bundle_state = BundleState::default();
1145
1146 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1148
1149 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1151
1152 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1154 assert!(cached.is_some());
1155 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1156 }
1157
1158 #[test]
1159 fn on_inserted_executed_block_skips_on_parent_mismatch() {
1160 let payload_processor = PayloadProcessor::new(
1161 reth_tasks::Runtime::test(),
1162 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1163 &TreeConfig::default(),
1164 PrecompileCacheMap::default(),
1165 );
1166
1167 let block1_hash = B256::from([1u8; 32]);
1169 payload_processor
1170 .execution_cache
1171 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1172
1173 let wrong_parent = B256::from([99u8; 32]);
1175 let block3_hash = B256::from([3u8; 32]);
1176 let block_with_parent = BlockWithParent {
1177 block: BlockNumHash { hash: block3_hash, number: 3 },
1178 parent: wrong_parent,
1179 };
1180 let bundle_state = BundleState::default();
1181
1182 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1183
1184 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1186 assert!(cached.is_some(), "Original cache should be preserved");
1187
1188 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1190 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1191 }
1192
1193 #[test]
1194 fn on_inserted_executed_block_does_not_mutate_checked_out_parent_cache() {
1195 let payload_processor = PayloadProcessor::new(
1196 reth_tasks::Runtime::test(),
1197 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1198 &TreeConfig::default(),
1199 PrecompileCacheMap::default(),
1200 );
1201
1202 let parent_hash = B256::from([1u8; 32]);
1203 payload_processor
1204 .execution_cache
1205 .update_with_guard(|slot| *slot = Some(make_saved_cache(parent_hash)));
1206
1207 let checked_out = payload_processor
1211 .execution_cache
1212 .get_cache_for(parent_hash)
1213 .expect("expected parent cache checkout to succeed");
1214
1215 let polluted_address = Address::random();
1216 let bundle_state = BundleState::builder(2..=2)
1217 .state_present_account_info(
1218 polluted_address,
1219 AccountInfo {
1220 balance: U256::from(1337),
1221 nonce: 7,
1222 code_hash: KECCAK_EMPTY,
1223 code: None,
1224 account_id: None,
1225 },
1226 )
1227 .build();
1228
1229 let block_with_parent = BlockWithParent {
1232 block: BlockNumHash { hash: B256::from([2u8; 32]), number: 2 },
1233 parent: parent_hash,
1234 };
1235
1236 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1237
1238 let account = checked_out
1241 .cache()
1242 .get_or_try_insert_account_with(polluted_address, || Ok::<_, ()>(None))
1243 .expect("cache read should succeed");
1244
1245 assert_eq!(
1246 account,
1247 CachedStatus::NotCached(None),
1248 "checked-out parent cache should not observe state from inserted local block"
1249 );
1250 }
1251
1252 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1253 let mut rng = generators::rng();
1254 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1255 let mut updates = Vec::with_capacity(updates_per_account);
1256
1257 for _ in 0..updates_per_account {
1258 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1259 let mut state_update = EvmState::default();
1260
1261 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1262
1263 for &address in selected_addresses {
1264 let mut storage = HashMap::default();
1265 if rng.random_bool(0.7) {
1266 for _ in 0..rng.random_range(1..10) {
1267 let slot = U256::from(rng.random::<u64>());
1268 storage.insert(
1269 slot,
1270 EvmStorageSlot::new_changed(
1271 U256::ZERO,
1272 U256::from(rng.random::<u64>()),
1273 TransactionId::ZERO,
1274 ),
1275 );
1276 }
1277 }
1278
1279 let mut account = revm_state::Account::default();
1280 account.info = AccountInfo {
1281 balance: U256::from(rng.random::<u64>()),
1282 nonce: rng.random::<u64>(),
1283 code_hash: KECCAK_EMPTY,
1284 code: Some(Default::default()),
1285 account_id: None,
1286 };
1287 account.storage = storage;
1288 account.status = AccountStatus::Touched;
1289 account.transaction_id = TransactionId::ZERO;
1290
1291 state_update.insert(address, account);
1292 }
1293
1294 updates.push(state_update);
1295 }
1296
1297 updates
1298 }
1299
1300 #[test]
1301 fn test_state_root() {
1302 reth_tracing::init_test_tracing();
1303
1304 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1305 let genesis_hash = init_genesis(&factory).unwrap();
1306
1307 let state_updates = create_mock_state_updates(10, 10);
1308 let mut hashed_state = HashedPostState::default();
1309 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1310 HashMap::default();
1311
1312 {
1313 let provider_rw = factory.provider_rw().expect("failed to get provider");
1314
1315 for update in &state_updates {
1316 let account_updates = update.iter().map(|(address, account)| {
1317 (*address, Some(Account::from_revm_account(account)))
1318 });
1319 provider_rw
1320 .insert_account_for_hashing(account_updates)
1321 .expect("failed to insert accounts");
1322
1323 let storage_updates = update.iter().map(|(address, account)| {
1324 let storage_entries = account.storage.iter().map(|(slot, value)| {
1325 StorageEntry { key: B256::from(*slot), value: value.present_value }
1326 });
1327 (*address, storage_entries)
1328 });
1329 provider_rw
1330 .insert_storage_for_hashing(storage_updates)
1331 .expect("failed to insert storage");
1332 }
1333 provider_rw.commit().expect("failed to commit changes");
1334 }
1335
1336 for update in &state_updates {
1337 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1338
1339 for (address, account) in update {
1340 let storage: HashMap<B256, U256> = account
1341 .storage
1342 .iter()
1343 .map(|(k, v)| (B256::from(*k), v.present_value))
1344 .collect();
1345
1346 let entry = accumulated_state.entry(*address).or_default();
1347 entry.0 = Account::from_revm_account(account);
1348 entry.1.extend(storage);
1349 }
1350 }
1351
1352 let mut payload_processor = PayloadProcessor::new(
1353 reth_tasks::Runtime::test(),
1354 EthEvmConfig::new(factory.chain_spec()),
1355 &TreeConfig::default(),
1356 PrecompileCacheMap::default(),
1357 );
1358
1359 let provider_factory = BlockchainProvider::new(factory).unwrap();
1360
1361 let mut handle = payload_processor.spawn(
1362 ExecutionEnv::test_default(),
1363 (
1364 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1365 std::convert::identity,
1366 ),
1367 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1368 OverlayStateProviderFactory::new(
1369 provider_factory,
1370 OverlayBuilder::<EthPrimitives>::new(genesis_hash, ChangesetCache::new()),
1371 ),
1372 &TreeConfig::default(),
1373 false,
1374 );
1375
1376 let mut state_hook = handle.state_hook().expect("state hook is None");
1377
1378 for update in state_updates {
1379 state_hook.on_state(&update);
1380 }
1381 drop(state_hook);
1382
1383 let root_from_task = handle.state_root().expect("task failed").state_root;
1384 let root_from_regular = state_root(accumulated_state);
1385
1386 assert_eq!(
1387 root_from_task, root_from_regular,
1388 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1389 );
1390 }
1391
1392 #[test]
1403 fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1404 let execution_cache = PayloadExecutionCache::default();
1405
1406 let block4_hash = B256::from([4u8; 32]);
1408 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1409
1410 let fork_parent = B256::from([2u8; 32]);
1413 let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1414 assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1415 let prewarm_cache = prewarm_cache.unwrap();
1416 assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1417
1418 let fork_addr = Address::from([0xBB; 20]);
1421 let fork_key = B256::from([0xCC; 32]);
1422 prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1423
1424 let during_prewarm = execution_cache.get_cache_for(block4_hash);
1426 assert!(
1427 during_prewarm.is_none(),
1428 "cache must be unavailable while prewarm holds a reference"
1429 );
1430
1431 drop(prewarm_cache);
1433
1434 let block5_cache = execution_cache.get_cache_for(block4_hash);
1438 assert!(
1439 block5_cache.is_some(),
1440 "canonical chain must get cache after fork prewarm is dropped"
1441 );
1442 assert_eq!(
1443 block5_cache.as_ref().unwrap().executed_block_hash(),
1444 block4_hash,
1445 "cache must carry the canonical parent hash, not the fork parent"
1446 );
1447 }
1448}