1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 payload_processor::{
6 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
7 sparse_trie::StateRootComputeOutcome,
8 },
9 sparse_trie::SparseTrieCacheTask,
10 CacheWaitDurations, CachedStateMetrics, ExecutionCache, PayloadExecutionCache, SavedCache,
11 StateProviderBuilder, TreeConfig, WaitForCaches,
12};
13use alloy_eip7928::BlockAccessList;
14use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
15use alloy_evm::block::StateChangeSource;
16use alloy_primitives::B256;
17use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
18use multiproof::*;
19use prewarm::PrewarmMetrics;
20use rayon::prelude::*;
21use reth_evm::{
22 block::ExecutableTxParts,
23 execute::{ExecutableTxFor, WithTxEnv},
24 ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
25 SpecFor, TxEnvFor,
26};
27use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
28use reth_provider::{
29 BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
30};
31use reth_revm::{db::BundleState, state::EvmState};
32use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
33use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
34use reth_trie_parallel::{
35 proof_task::{ProofTaskCtx, ProofWorkerHandle},
36 root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39 ArenaParallelSparseTrie, ConfigurableSparseTrie, RevealableSparseTrie, SparseStateTrie,
40};
41use std::{
42 ops::Not,
43 sync::{
44 atomic::{AtomicBool, AtomicUsize},
45 mpsc::{self, channel},
46 Arc,
47 },
48};
49use tracing::{debug, debug_span, instrument, trace, warn, Span};
50
51pub mod bal;
52pub mod multiproof;
53mod preserved_sparse_trie;
54pub mod prewarm;
55pub mod receipt_root_task;
56pub mod sparse_trie;
57
58use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
59
60pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
69
70pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
82
83pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
86
87type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
89 WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
90 <I as ExecutableTxTuple>::Error,
91 <N as NodePrimitives>::Receipt,
92>;
93
94#[derive(Debug)]
96pub struct PayloadProcessor<Evm>
97where
98 Evm: ConfigureEvm,
99{
100 executor: Runtime,
102 execution_cache: PayloadExecutionCache,
104 trie_metrics: MultiProofTaskMetrics,
106 cross_block_cache_size: usize,
108 disable_transaction_prewarming: bool,
110 disable_state_cache: bool,
112 evm_config: Evm,
114 precompile_cache_disabled: bool,
116 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
118 sparse_state_trie: SharedPreservedSparseTrie,
122 sparse_trie_max_hot_slots: usize,
124 sparse_trie_max_hot_accounts: usize,
126 disable_sparse_trie_cache_pruning: bool,
128 disable_cache_metrics: bool,
130}
131
132impl<N, Evm> PayloadProcessor<Evm>
133where
134 N: NodePrimitives,
135 Evm: ConfigureEvm<Primitives = N>,
136{
137 pub const fn executor(&self) -> &Runtime {
139 &self.executor
140 }
141
142 pub fn new(
144 executor: Runtime,
145 evm_config: Evm,
146 config: &TreeConfig,
147 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
148 ) -> Self {
149 Self {
150 executor,
151 execution_cache: Default::default(),
152 trie_metrics: Default::default(),
153 cross_block_cache_size: config.cross_block_cache_size(),
154 disable_transaction_prewarming: config.disable_prewarming(),
155 evm_config,
156 disable_state_cache: config.disable_state_cache(),
157 precompile_cache_disabled: config.precompile_cache_disabled(),
158 precompile_cache_map,
159 sparse_state_trie: SharedPreservedSparseTrie::default(),
160 sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
161 sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
162 disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
163 disable_cache_metrics: config.disable_cache_metrics(),
164 }
165 }
166}
167
168impl<Evm> WaitForCaches for PayloadProcessor<Evm>
169where
170 Evm: ConfigureEvm,
171{
172 fn wait_for_caches(&self) -> CacheWaitDurations {
173 debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
174
175 let execution_cache = self.execution_cache.clone();
177 let sparse_trie = self.sparse_state_trie.clone();
178
179 let (execution_tx, execution_rx) = std::sync::mpsc::channel();
181 let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
182
183 self.executor.spawn_blocking_named("wait-exec-cache", move || {
184 let _ = execution_tx.send(execution_cache.wait_for_availability());
185 });
186 self.executor.spawn_blocking_named("wait-sparse-tri", move || {
187 let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
188 });
189
190 let execution_cache_duration =
191 execution_rx.recv().expect("execution cache wait task failed to send result");
192 let sparse_trie_duration =
193 sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
194
195 debug!(
196 target: "engine::tree::payload_processor",
197 ?execution_cache_duration,
198 ?sparse_trie_duration,
199 "Execution cache and sparse trie locks acquired"
200 );
201 CacheWaitDurations {
202 execution_cache: execution_cache_duration,
203 sparse_trie: sparse_trie_duration,
204 }
205 }
206}
207
208impl<N, Evm> PayloadProcessor<Evm>
209where
210 N: NodePrimitives,
211 Evm: ConfigureEvm<Primitives = N> + 'static,
212{
213 #[instrument(
246 level = "debug",
247 target = "engine::tree::payload_processor",
248 name = "payload processor",
249 skip_all
250 )]
251 pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
252 &mut self,
253 env: ExecutionEnv<Evm>,
254 transactions: I,
255 provider_builder: StateProviderBuilder<N, P>,
256 multiproof_provider_factory: F,
257 config: &TreeConfig,
258 bal: Option<Arc<BlockAccessList>>,
259 ) -> IteratorPayloadHandle<Evm, I, N>
260 where
261 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
262 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
263 + Clone
264 + Send
265 + Sync
266 + 'static,
267 {
268 let (prewarm_rx, execution_rx) =
270 self.spawn_tx_iterator(transactions, env.transaction_count);
271
272 let span = Span::current();
273
274 let state_root_handle = self.spawn_state_root(multiproof_provider_factory, &env, config);
275 let prewarm_handle = self.spawn_caching_with(
276 env,
277 prewarm_rx,
278 provider_builder,
279 Some(state_root_handle.to_multi_proof.clone()),
280 bal,
281 );
282
283 PayloadHandle {
284 state_root_handle: Some(state_root_handle),
285 prewarm_handle,
286 transactions: execution_rx,
287 _span: span,
288 }
289 }
290
291 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
295 pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
296 &self,
297 env: ExecutionEnv<Evm>,
298 transactions: I,
299 provider_builder: StateProviderBuilder<N, P>,
300 bal: Option<Arc<BlockAccessList>>,
301 ) -> IteratorPayloadHandle<Evm, I, N>
302 where
303 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
304 {
305 let (prewarm_rx, execution_rx) =
306 self.spawn_tx_iterator(transactions, env.transaction_count);
307 let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
308 PayloadHandle {
309 state_root_handle: None,
310 prewarm_handle,
311 transactions: execution_rx,
312 _span: Span::current(),
313 }
314 }
315
316 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
326 pub fn spawn_state_root<F>(
327 &mut self,
328 multiproof_provider_factory: F,
329 env: &ExecutionEnv<Evm>,
330 config: &TreeConfig,
331 ) -> StateRootHandle
332 where
333 F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
334 + Clone
335 + Send
336 + Sync
337 + 'static,
338 {
339 let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
340
341 let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
342 #[cfg(feature = "trie-debug")]
343 let task_ctx = task_ctx.with_proof_jitter(config.proof_jitter());
344 let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
345 let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
346
347 let (state_root_tx, state_root_rx) = channel();
348
349 self.spawn_sparse_trie_task(
350 proof_handle,
351 state_root_tx,
352 from_multi_proof,
353 env.parent_state_root,
354 config.multiproof_chunk_size(),
355 );
356
357 StateRootHandle::new(to_multi_proof, state_root_rx)
358 }
359
360 const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
363
364 const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
371
372 const PARALLEL_PREFETCH_COUNT: usize = 4;
380
381 #[expect(clippy::type_complexity)]
388 #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
389 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
390 &self,
391 transactions: I,
392 transaction_count: usize,
393 ) -> (
394 mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
395 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
396 ) {
397 let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
398 let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
399
400 if transaction_count == 0 {
401 } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
403 debug!(
406 target: "engine::tree::payload_processor",
407 transaction_count,
408 "using sequential sig recovery for small block"
409 );
410 self.executor.spawn_blocking_named("tx-iterator", move || {
411 let (transactions, convert) = transactions.into_parts();
412 convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
413 });
414 } else {
415 let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
422 let executor = self.executor.clone();
423 self.executor.spawn_blocking_named("tx-iterator", move || {
424 let (transactions, convert) = transactions.into_parts();
425 let mut all: Vec<_> = transactions.into_iter().collect();
426 let rest = all.split_off(prefetch.min(all.len()));
427
428 convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
431
432 rest.into_par_iter()
434 .enumerate()
435 .map(|(i, tx)| {
436 let idx = i + prefetch;
437 let tx = convert.convert(tx);
438 (idx, tx)
439 })
440 .for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
441 let tx = tx.map(|tx| {
442 let (tx_env, tx) = tx.into_parts();
443 let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
444 let _ = prewarm_tx.send((idx, tx.clone()));
445 tx
446 });
447 let _ = execute_tx.send(tx);
448 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
449 });
450 });
451 }
452
453 (prewarm_rx, execute_rx)
454 }
455
456 #[instrument(
458 level = "debug",
459 target = "engine::tree::payload_processor",
460 skip_all,
461 fields(bal=%bal.is_some())
462 )]
463 fn spawn_caching_with<P>(
464 &self,
465 env: ExecutionEnv<Evm>,
466 transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
467 provider_builder: StateProviderBuilder<N, P>,
468 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
469 bal: Option<Arc<BlockAccessList>>,
470 ) -> CacheTaskHandle<N::Receipt>
471 where
472 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
473 {
474 let skip_prewarm =
475 self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
476
477 let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
478
479 let executed_tx_index = Arc::new(AtomicUsize::new(0));
480
481 let prewarm_ctx = PrewarmContext {
483 env,
484 evm_config: self.evm_config.clone(),
485 saved_cache: saved_cache.clone(),
486 provider: provider_builder,
487 metrics: PrewarmMetrics::default(),
488 terminate_execution: Arc::new(AtomicBool::new(false)),
489 executed_tx_index: Arc::clone(&executed_tx_index),
490 precompile_cache_disabled: self.precompile_cache_disabled,
491 precompile_cache_map: self.precompile_cache_map.clone(),
492 };
493
494 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
495 self.executor.clone(),
496 self.execution_cache.clone(),
497 prewarm_ctx,
498 to_multi_proof,
499 );
500
501 {
502 let to_prewarm_task = to_prewarm_task.clone();
503 self.executor.spawn_blocking_named("prewarm", move || {
504 let mode = if skip_prewarm {
505 PrewarmMode::Skipped
506 } else if let Some(bal) = bal {
507 PrewarmMode::BlockAccessList(bal)
508 } else {
509 PrewarmMode::Transactions(transactions)
510 };
511 prewarm_task.run(mode, to_prewarm_task);
512 });
513 }
514
515 CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index }
516 }
517
518 #[instrument(level = "debug", target = "engine::caching", skip(self))]
523 fn cache_for(&self, parent_hash: B256) -> SavedCache {
524 if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
525 debug!("reusing execution cache");
526 cache
527 } else {
528 debug!("creating new execution cache on cache miss");
529 let start = Instant::now();
530 let cache = ExecutionCache::new(self.cross_block_cache_size);
531 let metrics = CachedStateMetrics::zeroed();
532 metrics.record_cache_creation(start.elapsed());
533 SavedCache::new(parent_hash, cache, metrics)
534 .with_disable_cache_metrics(self.disable_cache_metrics)
535 }
536 }
537
538 fn spawn_sparse_trie_task(
542 &self,
543 proof_worker_handle: ProofWorkerHandle,
544 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
545 from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
546 parent_state_root: B256,
547 chunk_size: usize,
548 ) {
549 let preserved_sparse_trie = self.sparse_state_trie.clone();
550 let trie_metrics = self.trie_metrics.clone();
551 let max_hot_slots = self.sparse_trie_max_hot_slots;
552 let max_hot_accounts = self.sparse_trie_max_hot_accounts;
553 let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
554 let executor = self.executor.clone();
555
556 let parent_span = Span::current();
557 self.executor.spawn_blocking_named("sparse-trie", move || {
558 reth_tasks::once!(increase_thread_priority);
559
560 let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
561 .entered();
562
563 let start = Instant::now();
568 let preserved = preserved_sparse_trie.take();
569 trie_metrics
570 .sparse_trie_cache_wait_duration_histogram
571 .record(start.elapsed().as_secs_f64());
572
573 let mut sparse_state_trie = preserved
574 .map(|preserved| preserved.into_trie_for(parent_state_root))
575 .unwrap_or_else(|| {
576 debug!(
577 target: "engine::tree::payload_processor",
578 "Creating new sparse trie - no preserved trie available"
579 );
580 let default_trie = RevealableSparseTrie::blind_from(
581 ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
582 );
583 SparseStateTrie::default()
584 .with_accounts_trie(default_trie.clone())
585 .with_default_storage_trie(default_trie)
586 .with_updates(true)
587 });
588 sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
589
590 let mut task = SparseTrieCacheTask::new_with_trie(
591 &executor,
592 from_multi_proof,
593 proof_worker_handle,
594 trie_metrics.clone(),
595 sparse_state_trie,
596 chunk_size,
597 );
598
599 let result = task.run();
600
601 let mut guard = preserved_sparse_trie.lock();
607
608 let task_result = result.as_ref().ok().cloned();
609 if state_root_tx.send(result).is_err() {
611 debug!(
614 target: "engine::tree::payload_processor",
615 "State root receiver dropped, clearing trie"
616 );
617 let (trie, deferred) = task.into_cleared_trie(
618 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
619 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
620 );
621 guard.store(PreservedSparseTrie::cleared(trie));
622 drop(guard);
623 executor.spawn_drop(deferred);
624 return;
625 }
626
627 let _enter =
630 debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
631 let deferred = if let Some(result) = task_result {
632 let start = Instant::now();
633 let (trie, deferred) = task.into_trie_for_reuse(
634 max_hot_slots,
635 max_hot_accounts,
636 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
637 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
638 disable_cache_pruning,
639 &result.trie_updates,
640 );
641 trie_metrics
642 .into_trie_for_reuse_duration_histogram
643 .record(start.elapsed().as_secs_f64());
644 trie_metrics
645 .sparse_trie_retained_memory_bytes
646 .set(trie.memory_size() as f64);
647 trie_metrics
648 .sparse_trie_retained_storage_tries
649 .set(trie.retained_storage_tries_count() as f64);
650 guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
651 deferred
652 } else {
653 debug!(
654 target: "engine::tree::payload_processor",
655 "State root computation failed, clearing trie"
656 );
657 let (trie, deferred) = task.into_cleared_trie(
658 SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
659 SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
660 );
661 guard.store(PreservedSparseTrie::cleared(trie));
662 deferred
663 };
664 drop(guard);
665 executor.spawn_drop(deferred);
666 });
667 }
668
669 pub fn on_inserted_executed_block(
677 &self,
678 block_with_parent: BlockWithParent,
679 bundle_state: &BundleState,
680 ) {
681 let disable_cache_metrics = self.disable_cache_metrics;
682 self.execution_cache.update_with_guard(|cached| {
683 if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
684 debug!(
685 target: "engine::caching",
686 parent_hash = %block_with_parent.parent,
687 "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
688 );
689 return
690 }
691
692 let (caches, cache_metrics, _) = match cached.take() {
694 Some(existing) => existing.split(),
695 None => (
696 ExecutionCache::new(self.cross_block_cache_size),
697 CachedStateMetrics::zeroed(),
698 false,
699 ),
700 };
701
702 let new_cache =
704 SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
705 .with_disable_cache_metrics(disable_cache_metrics);
706 if new_cache.cache().insert_state(bundle_state).is_err() {
707 *cached = None;
708 debug!(target: "engine::caching", "cleared execution cache on update error");
709 return
710 }
711 new_cache.update_metrics();
712
713 *cached = Some(new_cache);
715 debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
716 });
717 }
718}
719
720fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
722 iter: impl Iterator<Item = RawTx>,
723 convert: &C,
724 prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
725 execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
726) where
727 Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
728 TxEnv: Clone,
729 C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
730{
731 for (idx, raw_tx) in iter.enumerate() {
732 let tx = convert.convert(raw_tx);
733 let tx = tx.map(|tx| {
734 let (tx_env, tx) = tx.into_parts();
735 WithTxEnv { tx_env, tx: Arc::new(tx) }
736 });
737 if let Ok(tx) = &tx {
738 let _ = prewarm_tx.send((idx, tx.clone()));
739 }
740 let _ = execute_tx.send(tx);
741 trace!(target: "engine::tree::payload_processor", idx, "yielded transaction");
742 }
743}
744
745#[derive(Debug)]
753pub struct StateRootHandle {
754 to_multi_proof: CrossbeamSender<MultiProofMessage>,
756 state_root_rx: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
758}
759
760impl StateRootHandle {
761 pub const fn new(
763 to_multi_proof: CrossbeamSender<MultiProofMessage>,
764 state_root_rx: mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>,
765 ) -> Self {
766 Self { to_multi_proof, state_root_rx: Some(state_root_rx) }
767 }
768
769 pub fn state_hook(&self) -> impl OnStateHook {
773 let to_multi_proof = StateHookSender::new(self.to_multi_proof.clone());
774
775 move |source: StateChangeSource, state: &EvmState| {
776 let _ =
777 to_multi_proof.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
778 }
779 }
780
781 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
787 self.state_root_rx
788 .take()
789 .expect("state_root already taken")
790 .recv()
791 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
792 }
793
794 pub const fn take_state_root_rx(
800 &mut self,
801 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
802 self.state_root_rx.take().expect("state_root already taken")
803 }
804}
805
806#[derive(Debug)]
811pub struct PayloadHandle<Tx, Err, R> {
812 state_root_handle: Option<StateRootHandle>,
814 prewarm_handle: CacheTaskHandle<R>,
816 transactions: mpsc::Receiver<Result<Tx, Err>>,
818 _span: Span,
820}
821
822impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
823 #[instrument(
829 level = "debug",
830 target = "engine::tree::payload_processor",
831 name = "await_state_root",
832 skip_all
833 )]
834 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
835 self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
836 }
837
838 pub const fn take_state_root_rx(
845 &mut self,
846 ) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
847 self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
848 }
849
850 pub fn state_hook(&self) -> Option<impl OnStateHook> {
854 self.state_root_handle.as_ref().map(|handle| handle.state_hook())
855 }
856
857 pub fn caches(&self) -> Option<ExecutionCache> {
859 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
860 }
861
862 pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
864 self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
865 }
866
867 pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
872 &self.prewarm_handle.executed_tx_index
873 }
874
875 pub fn stop_prewarming_execution(&self) {
879 self.prewarm_handle.stop_prewarming_execution()
880 }
881
882 pub fn terminate_caching(
890 &mut self,
891 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
892 ) -> Option<mpsc::Sender<()>> {
893 self.prewarm_handle.terminate_caching(execution_outcome)
894 }
895
896 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
898 self.transactions.iter()
899 }
900}
901
902#[derive(Debug)]
907pub struct CacheTaskHandle<R> {
908 saved_cache: Option<SavedCache>,
910 to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
912 executed_tx_index: Arc<AtomicUsize>,
915}
916
917impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
918 pub fn stop_prewarming_execution(&self) {
922 self.to_prewarm_task
923 .as_ref()
924 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
925 }
926
927 #[must_use = "sender must be used and notified on block validation success"]
932 pub fn terminate_caching(
933 &mut self,
934 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
935 ) -> Option<mpsc::Sender<()>> {
936 if let Some(tx) = self.to_prewarm_task.take() {
937 let (valid_block_tx, valid_block_rx) = mpsc::channel();
938 let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
939 let _ = tx.send(event);
940
941 Some(valid_block_tx)
942 } else {
943 None
944 }
945 }
946}
947
948impl<R> Drop for CacheTaskHandle<R> {
949 fn drop(&mut self) {
950 if let Some(tx) = self.to_prewarm_task.take() {
952 let _ = tx.send(PrewarmTaskEvent::Terminate {
953 execution_outcome: None,
954 valid_block_rx: mpsc::channel().1,
955 });
956 }
957 }
958}
959
960#[derive(Debug, Clone)]
962pub struct ExecutionEnv<Evm: ConfigureEvm> {
963 pub evm_env: EvmEnvFor<Evm>,
965 pub hash: B256,
967 pub parent_hash: B256,
969 pub parent_state_root: B256,
973 pub transaction_count: usize,
977 pub gas_used: u64,
980 pub withdrawals: Option<Vec<Withdrawal>>,
983}
984
985impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
986where
987 EvmEnvFor<Evm>: Default,
988{
989 #[cfg(any(test, feature = "test-utils"))]
991 pub fn test_default() -> Self {
992 Self {
993 evm_env: Default::default(),
994 hash: Default::default(),
995 parent_hash: Default::default(),
996 parent_state_root: Default::default(),
997 transaction_count: 0,
998 gas_used: 0,
999 withdrawals: None,
1000 }
1001 }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use crate::tree::{
1007 payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
1008 precompile_cache::PrecompileCacheMap,
1009 CachedStateMetrics, ExecutionCache, PayloadExecutionCache, SavedCache,
1010 StateProviderBuilder, TreeConfig,
1011 };
1012 use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
1013 use alloy_evm::block::StateChangeSource;
1014 use rand::Rng;
1015 use reth_chainspec::ChainSpec;
1016 use reth_db_common::init::init_genesis;
1017 use reth_ethereum_primitives::TransactionSigned;
1018 use reth_evm::OnStateHook;
1019 use reth_evm_ethereum::EthEvmConfig;
1020 use reth_primitives_traits::{Account, Recovered, StorageEntry};
1021 use reth_provider::{
1022 providers::{BlockchainProvider, OverlayStateProviderFactory},
1023 test_utils::create_test_provider_factory_with_chain_spec,
1024 ChainSpecProvider, HashingWriter,
1025 };
1026 use reth_revm::db::BundleState;
1027 use reth_testing_utils::generators;
1028 use reth_trie::{test_utils::state_root, HashedPostState};
1029 use reth_trie_db::ChangesetCache;
1030 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
1031 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
1032 use std::sync::Arc;
1033
1034 fn make_saved_cache(hash: B256) -> SavedCache {
1035 let execution_cache = ExecutionCache::new(1_000);
1036 SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
1037 }
1038
1039 #[test]
1040 fn execution_cache_allows_single_checkout() {
1041 let execution_cache = PayloadExecutionCache::default();
1042 let hash = B256::from([1u8; 32]);
1043
1044 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1045
1046 let first = execution_cache.get_cache_for(hash);
1047 assert!(first.is_some(), "expected initial checkout to succeed");
1048
1049 let second = execution_cache.get_cache_for(hash);
1050 assert!(second.is_none(), "second checkout should be blocked while guard is active");
1051
1052 drop(first);
1053
1054 let third = execution_cache.get_cache_for(hash);
1055 assert!(third.is_some(), "third checkout should succeed after guard is dropped");
1056 }
1057
1058 #[test]
1059 fn execution_cache_checkout_releases_on_drop() {
1060 let execution_cache = PayloadExecutionCache::default();
1061 let hash = B256::from([2u8; 32]);
1062
1063 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1064
1065 {
1066 let guard = execution_cache.get_cache_for(hash);
1067 assert!(guard.is_some(), "expected checkout to succeed");
1068 }
1070
1071 let retry = execution_cache.get_cache_for(hash);
1072 assert!(retry.is_some(), "checkout should succeed after guard drop");
1073 }
1074
1075 #[test]
1076 fn execution_cache_mismatch_parent_clears_and_returns() {
1077 let execution_cache = PayloadExecutionCache::default();
1078 let hash = B256::from([3u8; 32]);
1079
1080 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
1081
1082 let different_hash = B256::from([4u8; 32]);
1085 let cache = execution_cache.get_cache_for(different_hash);
1086 assert!(cache.is_some(), "cache should be returned for reuse after clearing");
1087
1088 drop(cache);
1089
1090 let original = execution_cache.get_cache_for(hash);
1093 assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
1094 }
1095
1096 #[test]
1097 fn execution_cache_update_after_release_succeeds() {
1098 let execution_cache = PayloadExecutionCache::default();
1099 let initial = B256::from([5u8; 32]);
1100
1101 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
1102
1103 let guard =
1104 execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
1105
1106 drop(guard);
1107
1108 let updated = B256::from([6u8; 32]);
1109 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
1110
1111 let new_checkout = execution_cache.get_cache_for(updated);
1112 assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
1113 }
1114
1115 #[test]
1116 fn on_inserted_executed_block_populates_cache() {
1117 let payload_processor = PayloadProcessor::new(
1118 reth_tasks::Runtime::test(),
1119 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1120 &TreeConfig::default(),
1121 PrecompileCacheMap::default(),
1122 );
1123
1124 let parent_hash = B256::from([1u8; 32]);
1125 let block_hash = B256::from([10u8; 32]);
1126 let block_with_parent = BlockWithParent {
1127 block: BlockNumHash { hash: block_hash, number: 1 },
1128 parent: parent_hash,
1129 };
1130 let bundle_state = BundleState::default();
1131
1132 assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
1134
1135 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1137
1138 let cached = payload_processor.execution_cache.get_cache_for(block_hash);
1140 assert!(cached.is_some());
1141 assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
1142 }
1143
1144 #[test]
1145 fn on_inserted_executed_block_skips_on_parent_mismatch() {
1146 let payload_processor = PayloadProcessor::new(
1147 reth_tasks::Runtime::test(),
1148 EthEvmConfig::new(Arc::new(ChainSpec::default())),
1149 &TreeConfig::default(),
1150 PrecompileCacheMap::default(),
1151 );
1152
1153 let block1_hash = B256::from([1u8; 32]);
1155 payload_processor
1156 .execution_cache
1157 .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
1158
1159 let wrong_parent = B256::from([99u8; 32]);
1161 let block3_hash = B256::from([3u8; 32]);
1162 let block_with_parent = BlockWithParent {
1163 block: BlockNumHash { hash: block3_hash, number: 3 },
1164 parent: wrong_parent,
1165 };
1166 let bundle_state = BundleState::default();
1167
1168 payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1169
1170 let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1172 assert!(cached.is_some(), "Original cache should be preserved");
1173
1174 let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1176 assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1177 }
1178
1179 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1180 let mut rng = generators::rng();
1181 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1182 let mut updates = Vec::with_capacity(updates_per_account);
1183
1184 for _ in 0..updates_per_account {
1185 let num_accounts_in_update = rng.random_range(1..=num_accounts);
1186 let mut state_update = EvmState::default();
1187
1188 let selected_addresses = &all_addresses[0..num_accounts_in_update];
1189
1190 for &address in selected_addresses {
1191 let mut storage = HashMap::default();
1192 if rng.random_bool(0.7) {
1193 for _ in 0..rng.random_range(1..10) {
1194 let slot = U256::from(rng.random::<u64>());
1195 storage.insert(
1196 slot,
1197 EvmStorageSlot::new_changed(
1198 U256::ZERO,
1199 U256::from(rng.random::<u64>()),
1200 0,
1201 ),
1202 );
1203 }
1204 }
1205
1206 let account = revm_state::Account {
1207 info: AccountInfo {
1208 balance: U256::from(rng.random::<u64>()),
1209 nonce: rng.random::<u64>(),
1210 code_hash: KECCAK_EMPTY,
1211 code: Some(Default::default()),
1212 account_id: None,
1213 },
1214 original_info: Box::new(AccountInfo::default()),
1215 storage,
1216 status: AccountStatus::Touched,
1217 transaction_id: 0,
1218 };
1219
1220 state_update.insert(address, account);
1221 }
1222
1223 updates.push(state_update);
1224 }
1225
1226 updates
1227 }
1228
1229 #[test]
1230 fn test_state_root() {
1231 reth_tracing::init_test_tracing();
1232
1233 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1234 let genesis_hash = init_genesis(&factory).unwrap();
1235
1236 let state_updates = create_mock_state_updates(10, 10);
1237 let mut hashed_state = HashedPostState::default();
1238 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1239 HashMap::default();
1240
1241 {
1242 let provider_rw = factory.provider_rw().expect("failed to get provider");
1243
1244 for update in &state_updates {
1245 let account_updates = update.iter().map(|(address, account)| {
1246 (*address, Some(Account::from_revm_account(account)))
1247 });
1248 provider_rw
1249 .insert_account_for_hashing(account_updates)
1250 .expect("failed to insert accounts");
1251
1252 let storage_updates = update.iter().map(|(address, account)| {
1253 let storage_entries = account.storage.iter().map(|(slot, value)| {
1254 StorageEntry { key: B256::from(*slot), value: value.present_value }
1255 });
1256 (*address, storage_entries)
1257 });
1258 provider_rw
1259 .insert_storage_for_hashing(storage_updates)
1260 .expect("failed to insert storage");
1261 }
1262 provider_rw.commit().expect("failed to commit changes");
1263 }
1264
1265 for update in &state_updates {
1266 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1267
1268 for (address, account) in update {
1269 let storage: HashMap<B256, U256> = account
1270 .storage
1271 .iter()
1272 .map(|(k, v)| (B256::from(*k), v.present_value))
1273 .collect();
1274
1275 let entry = accumulated_state.entry(*address).or_default();
1276 entry.0 = Account::from_revm_account(account);
1277 entry.1.extend(storage);
1278 }
1279 }
1280
1281 let mut payload_processor = PayloadProcessor::new(
1282 reth_tasks::Runtime::test(),
1283 EthEvmConfig::new(factory.chain_spec()),
1284 &TreeConfig::default(),
1285 PrecompileCacheMap::default(),
1286 );
1287
1288 let provider_factory = BlockchainProvider::new(factory).unwrap();
1289
1290 let mut handle = payload_processor.spawn(
1291 ExecutionEnv::test_default(),
1292 (
1293 Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1294 std::convert::identity,
1295 ),
1296 StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1297 OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
1298 &TreeConfig::default(),
1299 None, );
1301
1302 let mut state_hook = handle.state_hook().expect("state hook is None");
1303
1304 for (i, update) in state_updates.into_iter().enumerate() {
1305 state_hook.on_state(StateChangeSource::Transaction(i), &update);
1306 }
1307 drop(state_hook);
1308
1309 let root_from_task = handle.state_root().expect("task failed").state_root;
1310 let root_from_regular = state_root(accumulated_state);
1311
1312 assert_eq!(
1313 root_from_task, root_from_regular,
1314 "State root mismatch: task={root_from_task}, base={root_from_regular}"
1315 );
1316 }
1317
1318 #[test]
1329 fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
1330 let execution_cache = PayloadExecutionCache::default();
1331
1332 let block4_hash = B256::from([4u8; 32]);
1334 execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
1335
1336 let fork_parent = B256::from([2u8; 32]);
1339 let prewarm_cache = execution_cache.get_cache_for(fork_parent);
1340 assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
1341 let prewarm_cache = prewarm_cache.unwrap();
1342 assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
1343
1344 let fork_addr = Address::from([0xBB; 20]);
1347 let fork_key = B256::from([0xCC; 32]);
1348 prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
1349
1350 let during_prewarm = execution_cache.get_cache_for(block4_hash);
1352 assert!(
1353 during_prewarm.is_none(),
1354 "cache must be unavailable while prewarm holds a reference"
1355 );
1356
1357 drop(prewarm_cache);
1359
1360 let block5_cache = execution_cache.get_cache_for(block4_hash);
1364 assert!(
1365 block5_cache.is_some(),
1366 "canonical chain must get cache after fork prewarm is dropped"
1367 );
1368 assert_eq!(
1369 block5_cache.as_ref().unwrap().executed_block_hash(),
1370 block4_hash,
1371 "cache must carry the canonical parent hash, not the fork parent"
1372 );
1373 }
1374}