reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{
6        CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7        SavedCache,
8    },
9    payload_processor::{
10        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11        sparse_trie::StateRootComputeOutcome,
12    },
13    sparse_trie::SparseTrieTask,
14    StateProviderBuilder, TreeConfig,
15};
16use alloy_eips::eip1898::BlockWithParent;
17use alloy_evm::{block::StateChangeSource, ToTxEnv};
18use alloy_primitives::B256;
19use crossbeam_channel::Sender as CrossbeamSender;
20use executor::WorkloadExecutor;
21use multiproof::{SparseTrieUpdate, *};
22use parking_lot::RwLock;
23use prewarm::PrewarmMetrics;
24use rayon::iter::{ParallelBridge, ParallelIterator};
25use reth_engine_primitives::ExecutableTxIterator;
26use reth_evm::{
27    execute::{ExecutableTxFor, WithTxEnv},
28    ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
29};
30use reth_primitives_traits::NodePrimitives;
31use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
32use reth_revm::{db::BundleState, state::EvmState};
33use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
34use reth_trie_parallel::{
35    proof_task::{ProofTaskCtx, ProofWorkerHandle},
36    root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39    provider::{TrieNodeProvider, TrieNodeProviderFactory},
40    ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
41};
42use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
43use std::{
44    collections::BTreeMap,
45    sync::{
46        atomic::AtomicBool,
47        mpsc::{self, channel},
48        Arc,
49    },
50    time::Instant,
51};
52use tracing::{debug, debug_span, instrument, warn, Span};
53
54mod configured_sparse_trie;
55pub mod executor;
56pub mod multiproof;
57pub mod prewarm;
58pub mod sparse_trie;
59
60use configured_sparse_trie::ConfiguredSparseTrie;
61
62/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
63///
64/// These values were determined by performing benchmarks using gradually increasing values to judge
65/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
66/// would deteriorate to the point where PST might as well not be used.
67pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
68    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
69
70/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
71/// nodes in allocated sparse tries.
72///
73/// Node maps have a key of `Nibbles` and value of `SparseNode`.
74/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
75///
76/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
77/// 120MB.
78pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
79
80/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
81/// in allocated sparse tries.
82///
83/// There are storage and account values, the largest of the two being account values, which are
84/// essentially `TrieAccount`s.
85///
86/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
87/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
88///
89/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
90/// 144MB.
91pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
92
93/// Entrypoint for executing the payload.
94#[derive(Debug)]
95pub struct PayloadProcessor<Evm>
96where
97    Evm: ConfigureEvm,
98{
99    /// The executor used by to spawn tasks.
100    executor: WorkloadExecutor,
101    /// The most recent cache used for execution.
102    execution_cache: ExecutionCache,
103    /// Metrics for trie operations
104    trie_metrics: MultiProofTaskMetrics,
105    /// Cross-block cache size in bytes.
106    cross_block_cache_size: u64,
107    /// Whether transactions should not be executed on prewarming task.
108    disable_transaction_prewarming: bool,
109    /// Whether state cache should be disable
110    disable_state_cache: bool,
111    /// Determines how to configure the evm for execution.
112    evm_config: Evm,
113    /// Whether precompile cache should be disabled.
114    precompile_cache_disabled: bool,
115    /// Precompile cache map.
116    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
117    /// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
118    /// that allocations can be minimized.
119    sparse_state_trie: Arc<
120        parking_lot::Mutex<
121            Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
122        >,
123    >,
124    /// Whether to disable the parallel sparse trie.
125    disable_parallel_sparse_trie: bool,
126    /// Maximum concurrency for prewarm task.
127    prewarm_max_concurrency: usize,
128}
129
130impl<N, Evm> PayloadProcessor<Evm>
131where
132    N: NodePrimitives,
133    Evm: ConfigureEvm<Primitives = N>,
134{
135    /// Returns a reference to the workload executor driving payload tasks.
136    pub(super) const fn executor(&self) -> &WorkloadExecutor {
137        &self.executor
138    }
139
140    /// Creates a new payload processor.
141    pub fn new(
142        executor: WorkloadExecutor,
143        evm_config: Evm,
144        config: &TreeConfig,
145        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
146    ) -> Self {
147        Self {
148            executor,
149            execution_cache: Default::default(),
150            trie_metrics: Default::default(),
151            cross_block_cache_size: config.cross_block_cache_size(),
152            disable_transaction_prewarming: config.disable_prewarming(),
153            evm_config,
154            disable_state_cache: config.disable_state_cache(),
155            precompile_cache_disabled: config.precompile_cache_disabled(),
156            precompile_cache_map,
157            sparse_state_trie: Arc::default(),
158            disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
159            prewarm_max_concurrency: config.prewarm_max_concurrency(),
160        }
161    }
162}
163
164impl<N, Evm> PayloadProcessor<Evm>
165where
166    N: NodePrimitives,
167    Evm: ConfigureEvm<Primitives = N> + 'static,
168{
169    /// Spawns all background tasks and returns a handle connected to the tasks.
170    ///
171    /// - Transaction prewarming task
172    /// - State root task
173    /// - Sparse trie task
174    ///
175    /// # Transaction prewarming task
176    ///
177    /// Responsible for feeding state updates to the multi proof task.
178    ///
179    /// This task runs until:
180    ///  - externally cancelled (e.g. sequential block execution is complete)
181    ///
182    /// ## Multi proof task
183    ///
184    /// Responsible for preparing sparse trie messages for the sparse trie task.
185    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
186    /// output back to this task.
187    ///
188    /// Receives updates from sequential execution.
189    /// This task runs until it receives a shutdown signal, which should be after the block
190    /// was fully executed.
191    ///
192    /// ## Sparse trie task
193    ///
194    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
195    ///
196    /// This task runs until there are no further updates to process.
197    ///
198    ///
199    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
200    /// canceling)
201    #[allow(clippy::type_complexity)]
202    #[instrument(
203        level = "debug",
204        target = "engine::tree::payload_processor",
205        name = "payload processor",
206        skip_all
207    )]
208    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
209        &mut self,
210        env: ExecutionEnv<Evm>,
211        transactions: I,
212        provider_builder: StateProviderBuilder<N, P>,
213        multiproof_provider_factory: F,
214        config: &TreeConfig,
215    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
216    where
217        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
218        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
219            + Clone
220            + Send
221            + 'static,
222    {
223        // start preparing transactions immediately
224        let (prewarm_rx, execution_rx, transaction_count_hint) =
225            self.spawn_tx_iterator(transactions);
226
227        let span = Span::current();
228        let (to_sparse_trie, sparse_trie_rx) = channel();
229
230        // We rely on the cursor factory to provide whatever DB overlay is necessary to see a
231        // consistent view of the database, including the trie tables. Because of this there is no
232        // need for an overarching prefix set to invalidate any section of the trie tables, and so
233        // we use an empty prefix set.
234
235        // Create and spawn the storage proof task
236        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
237        let storage_worker_count = config.storage_worker_count();
238        let account_worker_count = config.account_worker_count();
239        let proof_handle = ProofWorkerHandle::new(
240            self.executor.handle().clone(),
241            task_ctx,
242            storage_worker_count,
243            account_worker_count,
244        );
245
246        let multi_proof_task = MultiProofTask::new(
247            proof_handle.clone(),
248            to_sparse_trie,
249            config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
250        );
251
252        // wire the multiproof task to the prewarm task
253        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
254
255        let prewarm_handle = self.spawn_caching_with(
256            env,
257            prewarm_rx,
258            transaction_count_hint,
259            provider_builder,
260            to_multi_proof.clone(),
261        );
262
263        // spawn multi-proof task
264        let parent_span = span.clone();
265        self.executor.spawn_blocking(move || {
266            let _enter = parent_span.entered();
267            multi_proof_task.run();
268        });
269
270        // wire the sparse trie to the state root response receiver
271        let (state_root_tx, state_root_rx) = channel();
272
273        // Spawn the sparse trie task using any stored trie and parallel trie configuration.
274        self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
275
276        PayloadHandle {
277            to_multi_proof,
278            prewarm_handle,
279            state_root: Some(state_root_rx),
280            transactions: execution_rx,
281            _span: span,
282        }
283    }
284
285    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
286    ///
287    /// Returns a [`PayloadHandle`] to communicate with the task.
288    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
289    pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
290        &self,
291        env: ExecutionEnv<Evm>,
292        transactions: I,
293        provider_builder: StateProviderBuilder<N, P>,
294    ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
295    where
296        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
297    {
298        let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
299        let prewarm_handle =
300            self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
301        PayloadHandle {
302            to_multi_proof: None,
303            prewarm_handle,
304            state_root: None,
305            transactions: execution_rx,
306            _span: Span::current(),
307        }
308    }
309
310    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
311    #[expect(clippy::type_complexity)]
312    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
313        &self,
314        transactions: I,
315    ) -> (
316        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
317        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
318        usize,
319    ) {
320        let (transactions, convert) = transactions.into();
321        let transactions = transactions.into_iter();
322        // Get the transaction count for prewarming task
323        // Use upper bound if available (more accurate), otherwise use lower bound
324        let (lower, upper) = transactions.size_hint();
325        let transaction_count_hint = upper.unwrap_or(lower);
326
327        // Spawn a task that iterates through all transactions in parallel and sends them to the
328        // main task.
329        let (tx, rx) = mpsc::channel();
330        self.executor.spawn_blocking(move || {
331            transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
332                let tx = convert(tx);
333                let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
334                let _ = sender.send((idx, tx));
335            });
336        });
337
338        // Spawn a task that processes out-of-order transactions from the task above and sends them
339        // to prewarming and execution tasks.
340        let (prewarm_tx, prewarm_rx) = mpsc::channel();
341        let (execute_tx, execute_rx) = mpsc::channel();
342        self.executor.spawn_blocking(move || {
343            let mut next_for_execution = 0;
344            let mut queue = BTreeMap::new();
345            while let Ok((idx, tx)) = rx.recv() {
346                // only send Ok(_) variants to prewarming task
347                if let Ok(tx) = &tx {
348                    let _ = prewarm_tx.send(tx.clone());
349                }
350
351                if next_for_execution == idx {
352                    let _ = execute_tx.send(tx);
353                    next_for_execution += 1;
354
355                    while let Some(entry) = queue.first_entry() &&
356                        *entry.key() == next_for_execution
357                    {
358                        let _ = execute_tx.send(entry.remove());
359                        next_for_execution += 1;
360                    }
361                } else {
362                    queue.insert(idx, tx);
363                }
364            }
365        });
366
367        (prewarm_rx, execute_rx, transaction_count_hint)
368    }
369
370    /// Spawn prewarming optionally wired to the multiproof task for target updates.
371    fn spawn_caching_with<P>(
372        &self,
373        env: ExecutionEnv<Evm>,
374        mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
375        transaction_count_hint: usize,
376        provider_builder: StateProviderBuilder<N, P>,
377        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
378    ) -> CacheTaskHandle
379    where
380        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
381    {
382        if self.disable_transaction_prewarming {
383            // if no transactions should be executed we clear them but still spawn the task for
384            // caching updates
385            transactions = mpsc::channel().1;
386        }
387
388        let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
389            (None, None, None)
390        } else {
391            let saved_cache = self.cache_for(env.parent_hash);
392            let cache = saved_cache.cache().clone();
393            let cache_metrics = saved_cache.metrics().clone();
394            (Some(saved_cache), Some(cache), Some(cache_metrics))
395        };
396
397        // configure prewarming
398        let prewarm_ctx = PrewarmContext {
399            env,
400            evm_config: self.evm_config.clone(),
401            saved_cache,
402            provider: provider_builder,
403            metrics: PrewarmMetrics::default(),
404            terminate_execution: Arc::new(AtomicBool::new(false)),
405            precompile_cache_disabled: self.precompile_cache_disabled,
406            precompile_cache_map: self.precompile_cache_map.clone(),
407        };
408
409        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
410            self.executor.clone(),
411            self.execution_cache.clone(),
412            prewarm_ctx,
413            to_multi_proof,
414            transaction_count_hint,
415            self.prewarm_max_concurrency,
416        );
417
418        // spawn pre-warm task
419        {
420            let to_prewarm_task = to_prewarm_task.clone();
421            self.executor.spawn_blocking(move || {
422                prewarm_task.run(transactions, to_prewarm_task);
423            });
424        }
425
426        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
427    }
428
429    /// Returns the cache for the given parent hash.
430    ///
431    /// If the given hash is different then what is recently cached, then this will create a new
432    /// instance.
433    #[instrument(level = "debug", target = "engine::caching", skip(self))]
434    fn cache_for(&self, parent_hash: B256) -> SavedCache {
435        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
436            debug!("reusing execution cache");
437            cache
438        } else {
439            debug!("creating new execution cache on cache miss");
440            let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
441            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
442        }
443    }
444
445    /// Spawns the [`SparseTrieTask`] for this payload processor.
446    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
447    fn spawn_sparse_trie_task<BPF>(
448        &self,
449        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
450        proof_worker_handle: BPF,
451        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
452    ) where
453        BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
454        BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
455        BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
456    {
457        // Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
458        // there's none to reuse.
459        let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
460        let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
461            let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
462                ConfiguredSparseTrie::Serial(Default::default())
463            } else {
464                ConfiguredSparseTrie::Parallel(Box::new(
465                    ParallelSparseTrie::default()
466                        .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
467                ))
468            });
469            ClearedSparseStateTrie::from_state_trie(
470                SparseStateTrie::new()
471                    .with_accounts_trie(default_trie.clone())
472                    .with_default_storage_trie(default_trie)
473                    .with_updates(true),
474            )
475        });
476
477        let task =
478            SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
479                sparse_trie_rx,
480                proof_worker_handle,
481                self.trie_metrics.clone(),
482                sparse_state_trie,
483            );
484
485        let span = Span::current();
486        self.executor.spawn_blocking(move || {
487            let _enter = span.entered();
488
489            let (result, trie) = task.run();
490            // Send state root computation result
491            let _ = state_root_tx.send(result);
492
493            // Clear the SparseStateTrie, shrink, and replace it back into the mutex _after_ sending
494            // results to the next step, so that time spent clearing doesn't block the step after
495            // this one.
496            let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
497            let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
498
499            // Shrink the sparse trie so that we don't have ever increasing memory.
500            cleared_trie.shrink_to(
501                SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
502                SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
503            );
504
505            cleared_sparse_trie.lock().replace(cleared_trie);
506        });
507    }
508
509    /// Updates the execution cache with the post-execution state from an inserted block.
510    ///
511    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
512    /// to ensure the cache remains warm for subsequent block execution.
513    ///
514    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
515    /// hitting the database, maintaining performance consistency.
516    pub(crate) fn on_inserted_executed_block(
517        &self,
518        block_with_parent: BlockWithParent,
519        bundle_state: &BundleState,
520    ) {
521        self.execution_cache.update_with_guard(|cached| {
522            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
523                debug!(
524                    target: "engine::caching",
525                    parent_hash = %block_with_parent.parent,
526                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
527                );
528                return;
529            }
530
531            // Take existing cache (if any) or create fresh caches
532            let (caches, cache_metrics) = match cached.take() {
533                Some(existing) => {
534                    existing.split()
535                }
536                None => (
537                    ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
538                    CachedStateMetrics::zeroed(),
539                ),
540            };
541
542            // Insert the block's bundle state into cache
543            let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics);
544            if new_cache.cache().insert_state(bundle_state).is_err() {
545                *cached = None;
546                debug!(target: "engine::caching", "cleared execution cache on update error");
547                return;
548            }
549            new_cache.update_metrics();
550
551            // Replace with the updated cache
552            *cached = Some(new_cache);
553            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
554        });
555    }
556}
557
558/// Handle to all the spawned tasks.
559#[derive(Debug)]
560pub struct PayloadHandle<Tx, Err> {
561    /// Channel for evm state updates
562    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
563    // must include the receiver of the state root wired to the sparse trie
564    prewarm_handle: CacheTaskHandle,
565    /// Stream of block transactions
566    transactions: mpsc::Receiver<Result<Tx, Err>>,
567    /// Receiver for the state root
568    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
569    /// Span for tracing
570    _span: Span,
571}
572
573impl<Tx, Err> PayloadHandle<Tx, Err> {
574    /// Awaits the state root
575    ///
576    /// # Panics
577    ///
578    /// If payload processing was started without background tasks.
579    #[instrument(
580        level = "debug",
581        target = "engine::tree::payload_processor",
582        name = "await_state_root",
583        skip_all
584    )]
585    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
586        self.state_root
587            .take()
588            .expect("state_root is None")
589            .recv()
590            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
591    }
592
593    /// Returns a state hook to be used to send state updates to this task.
594    ///
595    /// If a multiproof task is spawned the hook will notify it about new states.
596    pub fn state_hook(&self) -> impl OnStateHook {
597        // convert the channel into a `StateHookSender` that emits an event on drop
598        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
599
600        move |source: StateChangeSource, state: &EvmState| {
601            if let Some(sender) = &to_multi_proof {
602                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
603            }
604        }
605    }
606
607    /// Returns a clone of the caches used by prewarming
608    pub(super) fn caches(&self) -> Option<StateExecutionCache> {
609        self.prewarm_handle.cache.clone()
610    }
611
612    /// Returns a clone of the cache metrics used by prewarming
613    pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
614        self.prewarm_handle.cache_metrics.clone()
615    }
616
617    /// Terminates the pre-warming transaction processing.
618    ///
619    /// Note: This does not terminate the task yet.
620    pub(super) fn stop_prewarming_execution(&self) {
621        self.prewarm_handle.stop_prewarming_execution()
622    }
623
624    /// Terminates the entire caching task.
625    ///
626    /// If the [`BundleState`] is provided it will update the shared cache.
627    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
628        self.prewarm_handle.terminate_caching(block_output)
629    }
630
631    /// Returns iterator yielding transactions from the stream.
632    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
633        core::iter::repeat_with(|| self.transactions.recv())
634            .take_while(|res| res.is_ok())
635            .map(|res| res.unwrap())
636    }
637}
638
639/// Access to the spawned [`PrewarmCacheTask`].
640#[derive(Debug)]
641pub(crate) struct CacheTaskHandle {
642    /// The shared cache the task operates with.
643    cache: Option<StateExecutionCache>,
644    /// Metrics for the caches
645    cache_metrics: Option<CachedStateMetrics>,
646    /// Channel to the spawned prewarm task if any
647    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
648}
649
650impl CacheTaskHandle {
651    /// Terminates the pre-warming transaction processing.
652    ///
653    /// Note: This does not terminate the task yet.
654    pub(super) fn stop_prewarming_execution(&self) {
655        self.to_prewarm_task
656            .as_ref()
657            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
658    }
659
660    /// Terminates the entire pre-warming task.
661    ///
662    /// If the [`BundleState`] is provided it will update the shared cache.
663    pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
664        if let Some(tx) = self.to_prewarm_task.take() {
665            // Only clone when we have an active task and a state to send
666            let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
667            let _ = tx.send(event);
668        }
669    }
670}
671
672impl Drop for CacheTaskHandle {
673    fn drop(&mut self) {
674        // Ensure we always terminate on drop
675        self.terminate_caching(None);
676    }
677}
678
679/// Shared access to most recently used cache.
680///
681/// This cache is intended to used for processing the payload in the following manner:
682///  - Get Cache if the payload's parent block matches the parent block
683///  - Update cache upon successful payload execution
684///
685/// This process assumes that payloads are received sequentially.
686///
687/// ## Cache Safety
688///
689/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
690/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
691/// the cache may be corrupted or cleared.
692///
693/// ## Cache vs Prewarming Distinction
694///
695/// **`ExecutionCache`**:
696/// - Stores parent block's execution state after completion
697/// - Used to fetch parent data for next block's execution
698/// - Must be exclusively accessed during save operations
699///
700/// **`PrewarmCacheTask`**:
701/// - Speculatively loads accounts/storage that might be used in transaction execution
702/// - Prepares data for state root proof computation
703/// - Runs concurrently but must not interfere with cache saves
704#[derive(Clone, Debug, Default)]
705struct ExecutionCache {
706    /// Guarded cloneable cache identified by a block hash.
707    inner: Arc<RwLock<Option<SavedCache>>>,
708}
709
710impl ExecutionCache {
711    /// Returns the cache for `parent_hash` if it's available for use.
712    ///
713    /// A cache is considered available when:
714    /// - It exists and matches the requested parent hash
715    /// - No other tasks are currently using it (checked via Arc reference count)
716    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
717    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
718        let start = Instant::now();
719        let cache = self.inner.read();
720
721        let elapsed = start.elapsed();
722        if elapsed.as_millis() > 5 {
723            warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
724        }
725
726        cache
727            .as_ref()
728            .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
729            .cloned()
730    }
731
732    /// Clears the tracked cache
733    #[expect(unused)]
734    pub(crate) fn clear(&self) {
735        self.inner.write().take();
736    }
737
738    /// Updates the cache with a closure that has exclusive access to the guard.
739    /// This ensures that all cache operations happen atomically.
740    ///
741    /// ## CRITICAL SAFETY REQUIREMENT
742    ///
743    /// **Before calling this method, you MUST ensure there are no other active cache users.**
744    /// This includes:
745    /// - No running [`PrewarmCacheTask`] instances that could write to the cache
746    /// - No concurrent transactions that might access the cached state
747    /// - All prewarming operations must be completed or cancelled
748    ///
749    /// Violating this requirement can result in cache corruption, incorrect state data,
750    /// and potential consensus failures.
751    pub(crate) fn update_with_guard<F>(&self, update_fn: F)
752    where
753        F: FnOnce(&mut Option<SavedCache>),
754    {
755        let mut guard = self.inner.write();
756        update_fn(&mut guard);
757    }
758}
759
760/// EVM context required to execute a block.
761#[derive(Debug, Clone)]
762pub struct ExecutionEnv<Evm: ConfigureEvm> {
763    /// Evm environment.
764    pub evm_env: EvmEnvFor<Evm>,
765    /// Hash of the block being executed.
766    pub hash: B256,
767    /// Hash of the parent block.
768    pub parent_hash: B256,
769}
770
771impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
772where
773    EvmEnvFor<Evm>: Default,
774{
775    fn default() -> Self {
776        Self {
777            evm_env: Default::default(),
778            hash: Default::default(),
779            parent_hash: Default::default(),
780        }
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::ExecutionCache;
787    use crate::tree::{
788        cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
789        payload_processor::{
790            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
791        },
792        precompile_cache::PrecompileCacheMap,
793        StateProviderBuilder, TreeConfig,
794    };
795    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
796    use alloy_evm::block::StateChangeSource;
797    use rand::Rng;
798    use reth_chainspec::ChainSpec;
799    use reth_db_common::init::init_genesis;
800    use reth_ethereum_primitives::TransactionSigned;
801    use reth_evm::OnStateHook;
802    use reth_evm_ethereum::EthEvmConfig;
803    use reth_primitives_traits::{Account, Recovered, StorageEntry};
804    use reth_provider::{
805        providers::{BlockchainProvider, OverlayStateProviderFactory},
806        test_utils::create_test_provider_factory_with_chain_spec,
807        ChainSpecProvider, HashingWriter,
808    };
809    use reth_revm::db::BundleState;
810    use reth_testing_utils::generators;
811    use reth_trie::{test_utils::state_root, HashedPostState};
812    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
813    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
814    use std::sync::Arc;
815
816    fn make_saved_cache(hash: B256) -> SavedCache {
817        let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
818        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
819    }
820
821    #[test]
822    fn execution_cache_allows_single_checkout() {
823        let execution_cache = ExecutionCache::default();
824        let hash = B256::from([1u8; 32]);
825
826        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
827
828        let first = execution_cache.get_cache_for(hash);
829        assert!(first.is_some(), "expected initial checkout to succeed");
830
831        let second = execution_cache.get_cache_for(hash);
832        assert!(second.is_none(), "second checkout should be blocked while guard is active");
833
834        drop(first);
835
836        let third = execution_cache.get_cache_for(hash);
837        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
838    }
839
840    #[test]
841    fn execution_cache_checkout_releases_on_drop() {
842        let execution_cache = ExecutionCache::default();
843        let hash = B256::from([2u8; 32]);
844
845        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
846
847        {
848            let guard = execution_cache.get_cache_for(hash);
849            assert!(guard.is_some(), "expected checkout to succeed");
850            // Guard dropped at end of scope
851        }
852
853        let retry = execution_cache.get_cache_for(hash);
854        assert!(retry.is_some(), "checkout should succeed after guard drop");
855    }
856
857    #[test]
858    fn execution_cache_mismatch_parent_returns_none() {
859        let execution_cache = ExecutionCache::default();
860        let hash = B256::from([3u8; 32]);
861
862        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
863
864        let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
865        assert!(miss.is_none(), "checkout should fail for different parent hash");
866    }
867
868    #[test]
869    fn execution_cache_update_after_release_succeeds() {
870        let execution_cache = ExecutionCache::default();
871        let initial = B256::from([5u8; 32]);
872
873        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
874
875        let guard =
876            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
877
878        drop(guard);
879
880        let updated = B256::from([6u8; 32]);
881        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
882
883        let new_checkout = execution_cache.get_cache_for(updated);
884        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
885    }
886
887    #[test]
888    fn on_inserted_executed_block_populates_cache() {
889        let payload_processor = PayloadProcessor::new(
890            WorkloadExecutor::default(),
891            EthEvmConfig::new(Arc::new(ChainSpec::default())),
892            &TreeConfig::default(),
893            PrecompileCacheMap::default(),
894        );
895
896        let parent_hash = B256::from([1u8; 32]);
897        let block_hash = B256::from([10u8; 32]);
898        let block_with_parent = BlockWithParent {
899            block: BlockNumHash { hash: block_hash, number: 1 },
900            parent: parent_hash,
901        };
902        let bundle_state = BundleState::default();
903
904        // Cache should be empty initially
905        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
906
907        // Update cache with inserted block
908        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
909
910        // Cache should now exist for the block hash
911        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
912        assert!(cached.is_some());
913        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
914    }
915
916    #[test]
917    fn on_inserted_executed_block_skips_on_parent_mismatch() {
918        let payload_processor = PayloadProcessor::new(
919            WorkloadExecutor::default(),
920            EthEvmConfig::new(Arc::new(ChainSpec::default())),
921            &TreeConfig::default(),
922            PrecompileCacheMap::default(),
923        );
924
925        // Setup: populate cache with block 1
926        let block1_hash = B256::from([1u8; 32]);
927        payload_processor
928            .execution_cache
929            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
930
931        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
932        let wrong_parent = B256::from([99u8; 32]);
933        let block3_hash = B256::from([3u8; 32]);
934        let block_with_parent = BlockWithParent {
935            block: BlockNumHash { hash: block3_hash, number: 3 },
936            parent: wrong_parent,
937        };
938        let bundle_state = BundleState::default();
939
940        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
941
942        // Cache should still be for block 1 (unchanged)
943        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
944        assert!(cached.is_some(), "Original cache should be preserved");
945
946        // Cache for block 3 should not exist
947        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
948        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
949    }
950
951    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
952        let mut rng = generators::rng();
953        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
954        let mut updates = Vec::with_capacity(updates_per_account);
955
956        for _ in 0..updates_per_account {
957            let num_accounts_in_update = rng.random_range(1..=num_accounts);
958            let mut state_update = EvmState::default();
959
960            let selected_addresses = &all_addresses[0..num_accounts_in_update];
961
962            for &address in selected_addresses {
963                let mut storage = HashMap::default();
964                if rng.random_bool(0.7) {
965                    for _ in 0..rng.random_range(1..10) {
966                        let slot = U256::from(rng.random::<u64>());
967                        storage.insert(
968                            slot,
969                            EvmStorageSlot::new_changed(
970                                U256::ZERO,
971                                U256::from(rng.random::<u64>()),
972                                0,
973                            ),
974                        );
975                    }
976                }
977
978                let account = revm_state::Account {
979                    info: AccountInfo {
980                        balance: U256::from(rng.random::<u64>()),
981                        nonce: rng.random::<u64>(),
982                        code_hash: KECCAK_EMPTY,
983                        code: Some(Default::default()),
984                    },
985                    storage,
986                    status: AccountStatus::Touched,
987                    transaction_id: 0,
988                };
989
990                state_update.insert(address, account);
991            }
992
993            updates.push(state_update);
994        }
995
996        updates
997    }
998
999    #[test]
1000    fn test_state_root() {
1001        reth_tracing::init_test_tracing();
1002
1003        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1004        let genesis_hash = init_genesis(&factory).unwrap();
1005
1006        let state_updates = create_mock_state_updates(10, 10);
1007        let mut hashed_state = HashedPostState::default();
1008        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1009            HashMap::default();
1010
1011        {
1012            let provider_rw = factory.provider_rw().expect("failed to get provider");
1013
1014            for update in &state_updates {
1015                let account_updates = update.iter().map(|(address, account)| {
1016                    (*address, Some(Account::from_revm_account(account)))
1017                });
1018                provider_rw
1019                    .insert_account_for_hashing(account_updates)
1020                    .expect("failed to insert accounts");
1021
1022                let storage_updates = update.iter().map(|(address, account)| {
1023                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1024                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1025                    });
1026                    (*address, storage_entries)
1027                });
1028                provider_rw
1029                    .insert_storage_for_hashing(storage_updates)
1030                    .expect("failed to insert storage");
1031            }
1032            provider_rw.commit().expect("failed to commit changes");
1033        }
1034
1035        for update in &state_updates {
1036            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1037
1038            for (address, account) in update {
1039                let storage: HashMap<B256, U256> = account
1040                    .storage
1041                    .iter()
1042                    .map(|(k, v)| (B256::from(*k), v.present_value))
1043                    .collect();
1044
1045                let entry = accumulated_state.entry(*address).or_default();
1046                entry.0 = Account::from_revm_account(account);
1047                entry.1.extend(storage);
1048            }
1049        }
1050
1051        let mut payload_processor = PayloadProcessor::new(
1052            WorkloadExecutor::default(),
1053            EthEvmConfig::new(factory.chain_spec()),
1054            &TreeConfig::default(),
1055            PrecompileCacheMap::default(),
1056        );
1057
1058        let provider_factory = BlockchainProvider::new(factory).unwrap();
1059
1060        let mut handle =
1061            payload_processor.spawn(
1062                Default::default(),
1063                (
1064                    core::iter::empty::<
1065                        Result<Recovered<TransactionSigned>, core::convert::Infallible>,
1066                    >(),
1067                    std::convert::identity,
1068                ),
1069                StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1070                OverlayStateProviderFactory::new(provider_factory),
1071                &TreeConfig::default(),
1072            );
1073
1074        let mut state_hook = handle.state_hook();
1075
1076        for (i, update) in state_updates.into_iter().enumerate() {
1077            state_hook.on_state(StateChangeSource::Transaction(i), &update);
1078        }
1079        drop(state_hook);
1080
1081        let root_from_task = handle.state_root().expect("task failed").state_root;
1082        let root_from_regular = state_root(accumulated_state);
1083
1084        assert_eq!(
1085            root_from_task, root_from_regular,
1086            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1087        );
1088    }
1089}