1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
6 sparse_trie::SparseTrieCacheTask,
7 CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
8 PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
9};
10use alloy_eip7928::bal::DecodedBal;
11use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
12use alloy_primitives::B256;
13use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
14use multiproof::*;
15use prewarm::PrewarmMetrics;
16use rayon::prelude::*;
17use reth_evm::{
18 block::ExecutableTxParts,
19 execute::{ExecutableTxFor, WithTxEnv},
20 ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
21 SpecFor, TxEnvFor,
22};
23use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
24use reth_provider::{
25 BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
26};
27use reth_revm::db::BundleState;
28use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
29use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
30use reth_trie_parallel::{
31 proof_task::{ProofTaskCtx, ProofWorkerHandle},
32 root::ParallelStateRootError,
33};
34use reth_trie_sparse::{
35 ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
36};
37use std::{
38 ops::Not,
39 sync::{
40 atomic::{AtomicBool, AtomicUsize},
41 mpsc::{self, channel},
42 Arc,
43 },
44};
45use tracing::{debug, debug_span, instrument, trace, warn, Span};
46
47pub mod bal;
48pub mod multiproof;
49mod preserved_sparse_trie;
50pub mod prewarm;
51pub mod receipt_root_task;
52pub mod sparse_trie;
53
54use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
55
56pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
65
66pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
78
79pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
82
83type IteratorTx<Evm, I> = RecoveredTx<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
85
86type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
87 IteratorTx<Evm, I>,
88 <I as ExecutableTxTuple>::Error,
89 <N as NodePrimitives>::Receipt,
90>;
91
92type IteratorPrewarmTxReceiver<Evm, I> =
93 PrewarmTxReceiver<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;
94
95type IteratorExecuteTxReceiver<Evm, I> = ExecuteTxReceiver<
96 TxEnvFor<Evm>,
97 <I as ExecutableTxIterator<Evm>>::Recovered,
98 <I as ExecutableTxTuple>::Error,
99>;
100
101type RecoveredTx<TxEnv, Recovered> = WithTxEnv<TxEnv, Recovered>;
102type IndexedTxResult<Tx, Err> = (usize, Result<Tx, Err>);
103type IndexedTxReceiver<Tx, Err> = CrossbeamReceiver<IndexedTxResult<Tx, Err>>;
104type IndexedTxSender<Tx, Err> = CrossbeamSender<IndexedTxResult<Tx, Err>>;
105type PrewarmTxReceiver<TxEnv, Recovered> = mpsc::Receiver<(usize, RecoveredTx<TxEnv, Recovered>)>;
106type ExecuteTxReceiver<TxEnv, Recovered, Err> =
107 IndexedTxReceiver<RecoveredTx<TxEnv, Recovered>, Err>;
108type ExecuteTxSender<TxEnv, Recovered, Err> = IndexedTxSender<RecoveredTx<TxEnv, Recovered>, Err>;
109
110#[derive(Debug)]
112pub struct PayloadProcessor<Evm>
113where
114 Evm: ConfigureEvm,
115{
116 executor: Runtime,
118 execution_cache: PayloadExecutionCache,
120 cache_metrics: Option<CachedStateMetrics>,
122 trie_metrics: MultiProofTaskMetrics,
124 cross_block_cache_size: usize,
126 disable_transaction_prewarming: bool,
128 disable_state_cache: bool,
130 evm_config: Evm,
132 precompile_cache_disabled: bool,
134 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
136 sparse_state_trie: SharedPreservedSparseTrie,
140 sparse_trie_max_hot_slots: usize,
142 sparse_trie_max_hot_accounts: usize,
144 disable_sparse_trie_cache_pruning: bool,
146 disable_bal_parallel_state_root: bool,
149 disable_bal_batch_io: bool,
151}
152
153impl<N, Evm> PayloadProcessor<Evm>
154where
155 N: NodePrimitives,
156 Evm: ConfigureEvm<Primitives = N>,
157{
158 pub const fn executor(&self) -> &Runtime {
160 &self.executor
161 }
162
163 pub fn new(
165 executor: Runtime,
166 evm_config: Evm,
167 config: &TreeConfig,
168 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
169 ) -> Self {
170 Self {
171 executor,
172 execution_cache: Default::default(),
173 trie_metrics: Default::default(),
174 cross_block_cache_size: config.cross_block_cache_size(),
175 disable_transaction_prewarming: config.disable_prewarming(),
176 evm_config,
177 disable_state_cache: config.disable_state_cache(),
178 precompile_cache_disabled: config.precompile_cache_disabled(),
179 precompile_cache_map,
180 sparse_state_trie: SharedPreservedSparseTrie::default(),
181 sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
182 sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
183 disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
184 cache_metrics: (!config.disable_cache_metrics())
185 .then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
186 disable_bal_parallel_state_root: config.disable_bal_parallel_state_root(),
187 disable_bal_batch_io: config.disable_bal_batch_io(),
188 }
189 }
190}
191
192impl<Evm> WaitForCaches for PayloadProcessor<Evm>
193where
194 Evm: ConfigureEvm,
195{
196 fn wait_for_caches(&self) -> CacheWaitDurations {
197 debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
198
199 let execution_cache = self.execution_cache.clone();
201 let sparse_trie = self.sparse_state_trie.clone();
202
203 let (execution_tx, execution_rx) = std::sync::mpsc::channel();
205 let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
206
207 self.executor.spawn_blocking_named("wait-exec-cache", move || {
208 let _ = execution_tx.send(execution_cache.wait_for_availability());
209 });
210 self.executor.spawn_blocking_named("wait-sparse-tri", move || {
211 let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
212 });
213
214 let execution_cache_duration =
215 execution_rx.recv().expect("execution cache wait task failed to send result");
216 let sparse_trie_duration =
217 sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
218
219 debug!(
220 target: "engine::tree::payload_processor",
221 ?execution_cache_duration,
222 ?sparse_trie_duration,
223 "Execution cache and sparse trie locks acquired"
224 );
225 CacheWaitDurations {
226 execution_cache: execution_cache_duration,
227 sparse_trie: sparse_trie_duration,
228 }
229 }
230}
231
232impl<N, Evm> PayloadProcessor<Evm>
233where
234 N: NodePrimitives,
235 Evm: ConfigureEvm<Primitives = N> + 'static,
236{
237 #[instrument(
260 level = "debug",
261 target = "engine::tree::payload_processor",
262 name = "payload processor",
263 skip_all
264 )]
265 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
266 &mut self,
267 env: ExecutionEnv<Evm>,
268 transactions: I,
269 provider_builder: StateProviderBuilder<N, P>,
270 multiproof_provider_factory: F,
271 config: &TreeConfig,
272 ) -> IteratorPayloadHandle<Evm, I, N>
273 where
274 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
275 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
276 + Clone
277 + Send
278 + Sync
279 + 'static,
280 {
281 let (prewarm_rx, execution_rx) =
283 self.spawn_tx_iterator(transactions, env.transaction_count);
284
285 let span = Span::current();
286
287 let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
288 let state_root_handle = self.spawn_state_root(
289 multiproof_provider_factory,
290 env.parent_state_root,
291 halve_workers,
292 config,
293 );
294 let parallel_bal_execution = !config.disable_state_cache() &&
303 !config.disable_bal_parallel_execution() &&
304 env.decoded_bal.is_some();
305 let install_state_hook = !parallel_bal_execution;
306 let prewarm_handle = self.spawn_caching_with(
307 env,
308 prewarm_rx,
309 provider_builder,
310 Some(state_root_handle.updates_tx().clone()),
311 parallel_bal_execution,
312 );
313
314 PayloadHandle {
315 state_root_handle: Some(state_root_handle),
316 install_state_hook,
317 prewarm_handle,
318 transactions: execution_rx,
319 _span: span,
320 }
321 }
322
323 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
327 pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
328 &self,
329 env: ExecutionEnv<Evm>,
330 transactions: I,
331 provider_builder: StateProviderBuilder<N, P>,
332 ) -> IteratorPayloadHandle<Evm, I, N>
333 where
334 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
335 {
336 let (prewarm_rx, execution_rx) =
337 self.spawn_tx_iterator(transactions, env.transaction_count);
338 let prewarm_handle =
339 self.spawn_caching_with(env, prewarm_rx, provider_builder, None, false);
340 PayloadHandle {
341 state_root_handle: None,
342 install_state_hook: false,
343 prewarm_handle,
344 transactions: execution_rx,
345 _span: Span::current(),
346 }
347 }
348
349 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
362 pub fn spawn_state_root<F>(
363 &self,
364 multiproof_provider_factory: F,
365 parent_state_root: B256,
366 halve_workers: bool,
367 config: &TreeConfig,
368 ) -> StateRootHandle
369 where
370 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
371 + Clone
372 + Send
373 + Sync
374 + 'static,
375 {
376 let (updates_tx, from_multi_proof) = crossbeam_channel::unbounded();
377
378 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
379 #[cfg(feature = "trie-debug")]
380 let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
381 let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
382
383 let (state_root_tx, state_root_rx) = channel();
384
385 self.spawn_sparse_trie_task(
386 proof_handle,
387 state_root_tx,
388 from_multi_proof,
389 parent_state_root,
390 config.multiproof_chunk_size(),
391 );
392
393 StateRootHandle::new(parent_state_root, updates_tx, state_root_rx)
394 }
395
396 const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
399
400 const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
407
408 const PARALLEL_PREFETCH_COUNT: usize = 4;
416
417 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
424 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
425 &self,
426 transactions: I,
427 transaction_count: usize,
428 ) -> (IteratorPrewarmTxReceiver<Evm, I>, IteratorExecuteTxReceiver<Evm, I>) {
429 let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
430 let (execute_tx, execute_rx) = crossbeam_channel::bounded(transaction_count);
431
432 if transaction_count == 0 {
433 } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
435 debug!(
438 target: "engine::tree::payload_processor",
439 transaction_count,
440 "using sequential sig recovery for small block"
441 );
442 self.executor.spawn_blocking_named("tx-iterator", move || {
443 let (transactions, convert) = transactions.into_parts();
444 convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
445 });
446 } else {
447 let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
454 let executor = self.executor.clone();
455 self.executor.spawn_blocking_named("tx-iterator", move || {
456 let (transactions, convert) = transactions.into_parts();
457 let mut all: Vec<_> = transactions.into_iter().collect();
458 let rest = all.split_off(prefetch.min(all.len()));
459
460 convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
463
464 rest.into_par_iter()
466 .enumerate()
467 .map(|(i, tx)| {
468 let idx = i + prefetch;
469 let tx = convert.convert(tx);
470 (idx, tx)
471 })
472 .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
473 let tx = tx.map(|tx| {
474 let (tx_env, tx) = tx.into_parts();
475 let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
476 let _ = prewarm_tx.send((idx, tx.clone()));
477 tx
478 });
479 let _ = execute_tx.send((idx, tx));
480 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
481 });
482 });
483 }
484
485 (prewarm_rx, execute_rx)
486 }
487
488 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
494 fn spawn_caching_with<P>(
495 &self,
496 env: ExecutionEnv<Evm>,
497 transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
498 provider_builder: StateProviderBuilder<N, P>,
499 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
500 parallel_bal_execution: bool,
501 ) -> CacheTaskHandle<N::Receipt>
502 where
503 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
504 {
505 let mode = if parallel_bal_execution {
506 PrewarmMode::BlockAccessList(
507 env.decoded_bal.clone().expect("BAL dispatch implies decoded BAL"),
508 )
509 } else if self.disable_transaction_prewarming ||
510 env.transaction_count < SMALL_BLOCK_TX_THRESHOLD
511 {
512 PrewarmMode::Skipped
513 } else {
514 PrewarmMode::Transactions(transactions)
515 };
516 let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
517
518 let executed_tx_index = Arc::new(AtomicUsize::new(0));
519 let prewarm_ctx = PrewarmContext {
521 env,
522 evm_config: self.evm_config.clone(),
523 saved_cache: saved_cache.clone(),
524 provider: provider_builder,
525 metrics: PrewarmMetrics::default(),
526 cache_metrics: self.cache_metrics.clone(),
527 terminate_execution: Arc::new(AtomicBool::new(false)),
528 executed_tx_index: Arc::clone(&executed_tx_index),
529 precompile_cache_disabled: self.precompile_cache_disabled,
530 precompile_cache_map: self.precompile_cache_map.clone(),
531 disable_bal_parallel_state_root: self.disable_bal_parallel_state_root,
532 disable_bal_batch_io: self.disable_bal_batch_io,
533 };
534
535 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
536 self.executor.clone(),
537 self.execution_cache.clone(),
538 prewarm_ctx,
539 to_sparse_trie_task,
540 );
541 {
542 let to_prewarm_task = to_prewarm_task.clone();
543 self.executor.spawn_blocking_named("prewarm", move || {
544 prewarm_task.run(mode, to_prewarm_task);
545 });
546 }
547
548 CacheTaskHandle {
549 saved_cache,
550 to_prewarm_task: Some(to_prewarm_task),
551 executed_tx_index,
552 cache_metrics: self.cache_metrics.clone(),
553 }
554 }
555
556 #[instrument(level = "debug", target = "engine::caching", skip(self))]
561 pub fn cache_for(&self, parent_hash: B256) -> SavedCache {
562 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
563 debug!("reusing execution cache");
564 cache
565 } else {
566 debug!("creating new execution cache on cache miss");
567 let start = Instant::now();
568 let cache = ExecutionCache::new(self.cross_block_cache_size);
569 if let Some(metrics) = &self.cache_metrics {
570 metrics.record_cache_creation(start.elapsed());
571 }
572 SavedCache::new(parent_hash, cache)
573 }
574 }
575
576 fn spawn_sparse_trie_task(
580 &self,
581 proof_worker_handle: ProofWorkerHandle,
582 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
583 from_multi_proof: CrossbeamReceiver<StateRootMessage>,
584 parent_state_root: B256,
585 chunk_size: usize,
586 ) {
587 let preserved_sparse_trie = self.sparse_state_trie.clone();
588 let trie_metrics = self.trie_metrics.clone();
589 let max_hot_slots = self.sparse_trie_max_hot_slots;
590 let max_hot_accounts = self.sparse_trie_max_hot_accounts;
591 let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
592 let executor = self.executor.clone();
593
594 let parent_span = Span::current();
595 self.executor.spawn_blocking_named("sparse-trie", move || {
596 reth_tasks::once!(increase_thread_priority);
597
598 let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
599 .entered();
600
601 let start = Instant::now();
606 let preserved = preserved_sparse_trie.take();
607 trie_metrics
608 .sparse_trie_cache_wait_duration_histogram
609 .record(start.elapsed().as_secs_f64());
610
611 let mut sparse_state_trie = preserved
612 .map(|preserved| preserved.into_trie_for(parent_state_root))
613 .unwrap_or_else(|| {
614 debug!(
615 target: "engine::tree::payload_processor",
616 "Creating new sparse trie - no preserved trie available"
617 );
618 let default_trie = RevealableSparseTrie::blind_from(
619 ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
620 );
621 SparseStateTrie::default()
622 .with_accounts_trie(default_trie.clone())
623 .with_default_storage_trie(default_trie)
624 .with_updates(true)
625 });
626 sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
627
628 let mut task = SparseTrieCacheTask::new_with_trie(
629 &executor,
630 from_multi_proof,
631 proof_worker_handle,
632 trie_metrics.clone(),
633 sparse_state_trie,
634 parent_state_root,
635 chunk_size,
636 );
637
638 let result = task.run();
639
640 let mut guard = preserved_sparse_trie.lock();
646
647 let task_result = result.as_ref().ok().cloned();
648 if state_root_tx.send(result).is_err() {
650 debug!(
653 target: "engine::tree::payload_processor",
654 "State root receiver dropped, clearing trie"
655 );
656 let (trie, deferred) = task.into_cleared_trie(
657 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
658 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
659 );
660 guard.store(PreservedSparseTrie::cleared(trie));
661 drop(guard);
662 executor.spawn_drop(deferred);
663 return;
664 }
665
666 let _enter =
669 debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
670 let deferred = if let Some(result) = task_result {
671 let start = Instant::now();
672 let (trie, deferred) = task.into_trie_for_reuse(
673 max_hot_slots,
674 max_hot_accounts,
675 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
676 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
677 disable_cache_pruning,
678 &result.trie_updates,
679 );
680 trie_metrics
681 .into_trie_for_reuse_duration_histogram
682 .record(start.elapsed().as_secs_f64());
683 trie_metrics
684 .sparse_trie_retained_memory_bytes
685 .set(trie.memory_size() as f64);
686 trie_metrics
687 .sparse_trie_retained_storage_tries
688 .set(trie.retained_storage_tries_count() as f64);
689 guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
690 deferred
691 } else {
692 debug!(
693 target: "engine::tree::payload_processor",
694 "State root computation failed, clearing trie"
695 );
696 let (trie, deferred) = task.into_cleared_trie(
697 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
698 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
699 );
700 guard.store(PreservedSparseTrie::cleared(trie));
701 deferred
702 };
703 drop(guard);
704 executor.spawn_drop(deferred);
705 });
706 }
707
708 pub fn on_inserted_executed_block(
716 &self,
717 block_with_parent: BlockWithParent,
718 bundle_state: &BundleState,
719 ) {
720 let cache_metrics = self.cache_metrics.clone();
721 self.execution_cache.update_with_guard(|cached| {
722 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
723 debug!(
724 target: "engine::caching",
725 parent_hash = %block_with_parent.parent,
726 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
727 );
728 return
729 }
730
731 let caches = match cached.take() {
733 Some(existing) => existing.cache().clone(),
734 None => ExecutionCache::new(self.cross_block_cache_size),
735 };
736
737 let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
739 if new_cache.cache().insert_state(bundle_state).is_err() {
740 *cached = None;
741 debug!(target: "engine::caching", "cleared execution cache on update error");
742 return
743 }
744 new_cache.update_metrics(cache_metrics.as_ref());
745
746 *cached = Some(new_cache);
748 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
749 });
750 }
751}
752
753fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
755 iter: impl Iterator<Item = RawTx>,
756 convert: &C,
757 prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
758 execute_tx: &ExecuteTxSender<TxEnv, Recovered, Err>,
759) where
760 Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
761 TxEnv: Clone,
762 C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
763{
764 for (idx, raw_tx) in iter.enumerate() {
765 let tx = convert.convert(raw_tx);
766 let tx = tx.map(|tx| {
767 let (tx_env, tx) = tx.into_parts();
768 WithTxEnv { tx_env, tx: Arc::new(tx) }
769 });
770 if let Ok(tx) = &tx {
771 let _ = prewarm_tx.send((idx, tx.clone()));
772 }
773 let _ = execute_tx.send((idx, tx));
774 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
775 }
776}
777
778#[derive(Debug)]
783pub struct PayloadHandle<Tx, Err, R> {
784 state_root_handle: Option<StateRootHandle>,
786 install_state_hook: bool,
788 prewarm_handle: CacheTaskHandle<R>,
790 transactions: IndexedTxReceiver<Tx, Err>,
792 _span: Span,
794}
795
796impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
797 #[instrument(
803 level = "debug",
804 target = "engine::tree::payload_processor",
805 name = "await_state_root",
806 skip_all
807 )]
808 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
809 self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
810 }
811
812 pub const fn take_state_root_rx(
819 &mut self,
820 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
821 self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
822 }
823
824 pub fn state_hook(&self) -> Option<impl OnStateHook> {
828 self.install_state_hook
829 .then(|| self.state_root_handle.as_ref().map(|handle| handle.state_hook()))
830 .flatten()
831 }
832
833 pub fn sparse_trie_updates_tx(&self) -> Option<CrossbeamSender<StateRootMessage>> {
837 self.state_root_handle.as_ref().map(|handle| handle.updates_tx().clone())
838 }
839
840 pub fn caches(&self) -> Option<ExecutionCache> {
842 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
843 }
844
845 pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
847 self.prewarm_handle.cache_metrics.clone()
848 }
849
850 pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
855 &self.prewarm_handle.executed_tx_index
856 }
857
858 pub fn stop_prewarming_execution(&self) {
862 self.prewarm_handle.stop_prewarming_execution()
863 }
864
865 pub fn terminate_caching(
873 &mut self,
874 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
875 ) -> Option<mpsc::Sender<()>> {
876 self.prewarm_handle.terminate_caching(execution_outcome)
877 }
878
879 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
881 self.transactions.iter().map(|(_, tx)| tx)
882 }
883
884 pub fn clone_transaction_receiver(&self) -> IndexedTxReceiver<Tx, Err> {
886 self.transactions.clone()
887 }
888}
889
890#[derive(Debug)]
895pub struct CacheTaskHandle<R> {
896 saved_cache: Option<SavedCache>,
898 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
900 executed_tx_index: Arc<AtomicUsize>,
903 cache_metrics: Option<CachedStateMetrics>,
905}
906
907impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
908 pub fn stop_prewarming_execution(&self) {
912 self.to_prewarm_task
913 .as_ref()
914 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
915 }
916
917 #[must_use = "sender must be used and notified on block validation success"]
922 pub fn terminate_caching(
923 &mut self,
924 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
925 ) -> Option<mpsc::Sender<()>> {
926 if let Some(tx) = self.to_prewarm_task.take() {
927 let (valid_block_tx, valid_block_rx) = mpsc::channel();
928 let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
929 let _ = tx.send(event);
930
931 Some(valid_block_tx)
932 } else {
933 None
934 }
935 }
936}
937
938impl<R> Drop for CacheTaskHandle<R> {
939 fn drop(&mut self) {
940 if let Some(tx) = self.to_prewarm_task.take() {
942 let _ = tx.send(PrewarmTaskEvent::Terminate {
943 execution_outcome: None,
944 valid_block_rx: mpsc::channel().1,
945 });
946 }
947 }
948}
949
950#[derive(Debug, Clone)]
952pub struct ExecutionEnv<Evm: ConfigureEvm> {
953 pub evm_env: EvmEnvFor<Evm>,
955 pub hash: B256,
957 pub parent_hash: B256,
959 pub parent_state_root: B256,
963 pub transaction_count: usize,
967 pub gas_used: u64,
970 pub withdrawals: Option<Vec<Withdrawal>>,
973 pub decoded_bal: Option<Arc<DecodedBal>>,
976}
977
978impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
979where
980 EvmEnvFor<Evm>: Default,
981{
982 #[cfg(any(test, feature = "test-utils"))]
984 pub fn test_default() -> Self {
985 Self {
986 evm_env: Default::default(),
987 hash: Default::default(),
988 parent_hash: Default::default(),
989 parent_state_root: Default::default(),
990 transaction_count: 0,
991 gas_used: 0,
992 withdrawals: None,
993 decoded_bal: None,
994 }
995 }
996}
997
998#[cfg(test)]
999mod tests {
1000 use crate::tree::{
1001 payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1002 precompile_cache::PrecompileCacheMap,
1003 ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
1004 };
1005 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1006 use alloy_evm::block::StateChangeSource;
1007 use rand::Rng;
1008 use reth_chainspec::ChainSpec;
1009 use reth_db_common::init::init_genesis;
1010 use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
1011 use reth_evm::OnStateHook;
1012 use reth_evm_ethereum::EthEvmConfig;
1013 use reth_primitives_traits::{Account, Recovered, StorageEntry};
1014 use reth_provider::{
1015 providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
1016 test_utils::create_test_provider_factory_with_chain_spec,
1017 ChainSpecProvider, HashingWriter,
1018 };
1019 use reth_revm::db::BundleState;
1020 use reth_testing_utils::generators;
1021 use reth_trie::{test_utils::state_root, HashedPostState};
1022 use reth_trie_db::ChangesetCache;
1023 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1024 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1025 use std::sync::Arc;
1026
1027 fn make_saved_cache(hash: B256) -> SavedCache {
1028 let execution_cache = ExecutionCache::new(1_000);
1029 SavedCache::new(hash, execution_cache)
1030 }
1031
1032 #[test]
1033 fn execution_cache_allows_single_checkout() {
1034 let execution_cache = PayloadExecutionCache::default();
1035 let hash = B256::from([1u8; 32]);
1036
1037 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1038
1039 let first = execution_cache.get_cache_for(hash);
1040 assert!(first.is_some(), "expected initial checkout to succeed");
1041
1042 let second = execution_cache.get_cache_for(hash);
1043 assert!(second.is_none(), "second checkout should be blocked while guard is active");
1044
1045 drop(first);
1046
1047 let third = execution_cache.get_cache_for(hash);
1048 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1049 }
1050
1051 #[test]
1052 fn execution_cache_checkout_releases_on_drop() {
1053 let execution_cache = PayloadExecutionCache::default();
1054 let hash = B256::from([2u8; 32]);
1055
1056 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1057
1058 {
1059 let guard = execution_cache.get_cache_for(hash);
1060 assert!(guard.is_some(), "expected checkout to succeed");
1061 }
1063
1064 let retry = execution_cache.get_cache_for(hash);
1065 assert!(retry.is_some(), "checkout should succeed after guard drop");
1066 }
1067
1068 #[test]
1069 fn execution_cache_mismatch_parent_clears_and_returns() {
1070 let execution_cache = PayloadExecutionCache::default();
1071 let hash = B256::from([3u8; 32]);
1072
1073 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1074
1075 let different_hash = B256::from([4u8; 32]);
1078 let cache = execution_cache.get_cache_for(different_hash);
1079 assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1080
1081 drop(cache);
1082
1083 let original = execution_cache.get_cache_for(hash);
1086 assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1087 }
1088
1089 #[test]
1090 fn execution_cache_update_after_release_succeeds() {
1091 let execution_cache = PayloadExecutionCache::default();
1092 let initial = B256::from([5u8; 32]);
1093
1094 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1095
1096 let guard =
1097 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1098
1099 drop(guard);
1100
1101 let updated = B256::from([6u8; 32]);
1102 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1103
1104 let new_checkout = execution_cache.get_cache_for(updated);
1105 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1106 }
1107
1108 #[test]
1109 fn on_inserted_executed_block_populates_cache() {
1110 let payload_processor = PayloadProcessor::new(
1111 reth_tasks::Runtime::test(),
1112 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1113 &TreeConfig::default(),
1114 PrecompileCacheMap::default(),
1115 );
1116
1117 let parent_hash = B256::from([1u8; 32]);
1118 let block_hash = B256::from([10u8; 32]);
1119 let block_with_parent = BlockWithParent {
1120 block: BlockNumHash { hash: block_hash, number: 1 },
1121 parent: parent_hash,
1122 };
1123 let bundle_state = BundleState::default();
1124
1125 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1127
1128 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1130
1131 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1133 assert!(cached.is_some());
1134 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1135 }
1136
1137 #[test]
1138 fn on_inserted_executed_block_skips_on_parent_mismatch() {
1139 let payload_processor = PayloadProcessor::new(
1140 reth_tasks::Runtime::test(),
1141 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1142 &TreeConfig::default(),
1143 PrecompileCacheMap::default(),
1144 );
1145
1146 let block1_hash = B256::from([1u8; 32]);
1148 payload_processor
1149 .execution_cache
1150 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1151
1152 let wrong_parent = B256::from([99u8; 32]);
1154 let block3_hash = B256::from([3u8; 32]);
1155 let block_with_parent = BlockWithParent {
1156 block: BlockNumHash { hash: block3_hash, number: 3 },
1157 parent: wrong_parent,
1158 };
1159 let bundle_state = BundleState::default();
1160
1161 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1162
1163 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1165 assert!(cached.is_some(), "Original cache should be preserved");
1166
1167 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1169 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1170 }
1171
1172 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1173 let mut rng = generators::rng();
1174 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1175 let mut updates = Vec::with_capacity(updates_per_account);
1176
1177 for _ in 0..updates_per_account {
1178 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1179 let mut state_update = EvmState::default();
1180
1181 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1182
1183 for &address in selected_addresses {
1184 let mut storage = HashMap::default();
1185 if rng.random_bool(0.7) {
1186 for _ in 0..rng.random_range(1..10) {
1187 let slot = U256::from(rng.random::<u64>());
1188 storage.insert(
1189 slot,
1190 EvmStorageSlot::new_changed(
1191 U256::ZERO,
1192 U256::from(rng.random::<u64>()),
1193 0,
1194 ),
1195 );
1196 }
1197 }
1198
1199 let account = revm_state::Account {
1200 info: AccountInfo {
1201 balance: U256::from(rng.random::<u64>()),
1202 nonce: rng.random::<u64>(),
1203 code_hash: KECCAK_EMPTY,
1204 code: Some(Default::default()),
1205 account_id: None,
1206 },
1207 original_info: Box::new(AccountInfo::default()),
1208 storage,
1209 status: AccountStatus::Touched,
1210 transaction_id: 0,
1211 };
1212
1213 state_update.insert(address, account);
1214 }
1215
1216 updates.push(state_update);
1217 }
1218
1219 updates
1220 }
1221
1222 #[test]
1223 fn test_state_root() {
1224 reth_tracing::init_test_tracing();
1225
1226 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1227 let genesis_hash = init_genesis(&factory).unwrap();
1228
1229 let state_updates = create_mock_state_updates(10, 10);
1230 let mut hashed_state = HashedPostState::default();
1231 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1232 HashMap::default();
1233
1234 {
1235 let provider_rw = factory.provider_rw().expect("failed to get provider");
1236
1237 for update in &state_updates {
1238 let account_updates = update.iter().map(|(address, account)| {
1239 (*address, Some(Account::from_revm_account(account)))
1240 });
1241 provider_rw
1242 .insert_account_for_hashing(account_updates)
1243 .expect("failed to insert accounts");
1244
1245 let storage_updates = update.iter().map(|(address, account)| {
1246 let storage_entries = account.storage.iter().map(|(slot, value)| {
1247 StorageEntry { key: B256::from(*slot), value: value.present_value }
1248 });
1249 (*address, storage_entries)
1250 });
1251 provider_rw
1252 .insert_storage_for_hashing(storage_updates)
1253 .expect("failed to insert storage");
1254 }
1255 provider_rw.commit().expect("failed to commit changes");
1256 }
1257
1258 for update in &state_updates {
1259 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1260
1261 for (address, account) in update {
1262 let storage: HashMap<B256, U256> = account
1263 .storage
1264 .iter()
1265 .map(|(k, v)| (B256::from(*k), v.present_value))
1266 .collect();
1267
1268 let entry = accumulated_state.entry(*address).or_default();
1269 entry.0 = Account::from_revm_account(account);
1270 entry.1.extend(storage);
1271 }
1272 }
1273
1274 let mut payload_processor = PayloadProcessor::new(
1275 reth_tasks::Runtime::test(),
1276 EthEvmConfig::new(factory.chain_spec()),
1277 &TreeConfig::default(),
1278 PrecompileCacheMap::default(),
1279 );
1280
1281 let provider_factory = BlockchainProvider::new(factory).unwrap();
1282
1283 let mut handle = payload_processor.spawn(
1284 ExecutionEnv::test_default(),
1285 (
1286 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1287 std::convert::identity,
1288 ),
1289 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1290 OverlayStateProviderFactory::new(
1291 provider_factory,
1292 OverlayBuilder::<EthPrimitives>::new(genesis_hash, ChangesetCache::new()),
1293 ),
1294 &TreeConfig::default(),
1295 );
1296
1297 let mut state_hook = handle.state_hook().expect("state hook is None");
1298
1299 for (i, update) in state_updates.into_iter().enumerate() {
1300 state_hook.on_state(StateChangeSource::Transaction(i), &update);
1301 }
1302 drop(state_hook);
1303
1304 let root_from_task = handle.state_root().expect("task failed").state_root;
1305 let root_from_regular = state_root(accumulated_state);
1306
1307 assert_eq!(
1308 root_from_task, root_from_regular,
1309 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1310 );
1311 }
1312
1313 #[test]
1324 fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1325 let execution_cache = PayloadExecutionCache::default();
1326
1327 let block4_hash = B256::from([4u8; 32]);
1329 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1330
1331 let fork_parent = B256::from([2u8; 32]);
1334 let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1335 assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1336 let prewarm_cache = prewarm_cache.unwrap();
1337 assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1338
1339 let fork_addr = Address::from([0xBB; 20]);
1342 let fork_key = B256::from([0xCC; 32]);
1343 prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1344
1345 let during_prewarm = execution_cache.get_cache_for(block4_hash);
1347 assert!(
1348 during_prewarm.is_none(),
1349 "cache must be unavailable while prewarm holds a reference"
1350 );
1351
1352 drop(prewarm_cache);
1354
1355 let block5_cache = execution_cache.get_cache_for(block4_hash);
1359 assert!(
1360 block5_cache.is_some(),
1361 "canonical chain must get cache after fork prewarm is dropped"
1362 );
1363 assert_eq!(
1364 block5_cache.as_ref().unwrap().executed_block_hash(),
1365 block4_hash,
1366 "cache must carry the canonical parent hash, not the fork parent"
1367 );
1368 }
1369}