reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5    cached_state::{
6        CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache,
7        ExecutionCacheBuilder, SavedCache,
8    },
9    payload_processor::{
10        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
11        sparse_trie::StateRootComputeOutcome,
12    },
13    sparse_trie::SparseTrieTask,
14    StateProviderBuilder, TreeConfig,
15};
16use alloy_eip7928::BlockAccessList;
17use alloy_eips::eip1898::BlockWithParent;
18use alloy_evm::{block::StateChangeSource, ToTxEnv};
19use alloy_primitives::B256;
20use crossbeam_channel::Sender as CrossbeamSender;
21use executor::WorkloadExecutor;
22use multiproof::{SparseTrieUpdate, *};
23use parking_lot::RwLock;
24use prewarm::PrewarmMetrics;
25use rayon::prelude::*;
26use reth_evm::{
27    execute::{ExecutableTxFor, WithTxEnv},
28    ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
29    TxEnvFor,
30};
31use reth_execution_types::ExecutionOutcome;
32use reth_primitives_traits::NodePrimitives;
33use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
34use reth_revm::{db::BundleState, state::EvmState};
35use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
36use reth_trie_parallel::{
37    proof_task::{ProofTaskCtx, ProofWorkerHandle},
38    root::ParallelStateRootError,
39};
40use reth_trie_sparse::{
41    provider::{TrieNodeProvider, TrieNodeProviderFactory},
42    ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
43};
44use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
45use std::{
46    collections::BTreeMap,
47    sync::{
48        atomic::AtomicBool,
49        mpsc::{self, channel},
50        Arc,
51    },
52    time::Instant,
53};
54use tracing::{debug, debug_span, instrument, warn, Span};
55
56pub mod bal;
57mod configured_sparse_trie;
58pub mod executor;
59pub mod multiproof;
60pub mod prewarm;
61pub mod sparse_trie;
62
63use configured_sparse_trie::ConfiguredSparseTrie;
64
65/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
66///
67/// These values were determined by performing benchmarks using gradually increasing values to judge
68/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
69/// would deteriorate to the point where PST might as well not be used.
70pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
71    ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
72
73/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
74/// nodes in allocated sparse tries.
75///
76/// Node maps have a key of `Nibbles` and value of `SparseNode`.
77/// The `size_of::<Nibbles>` is 40, and `size_of::<SparseNode>` is 80.
78///
79/// If we have 1 million entries of 120 bytes each, this conservative estimate comes out at around
80/// 120MB.
81pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
82
83/// Default value capacity for shrinking the sparse trie. This is used to limit the number of values
84/// in allocated sparse tries.
85///
86/// There are storage and account values, the largest of the two being account values, which are
87/// essentially `TrieAccount`s.
88///
89/// Account value maps have a key of `Nibbles` and value of `TrieAccount`.
90/// The `size_of::<Nibbles>` is 40, and `size_of::<TrieAccount>` is 104.
91///
92/// If we have 1 million entries of 144 bytes each, this conservative estimate comes out at around
93/// 144MB.
94pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
95
96/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
97type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
98    WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxTuple>::Tx>,
99    <I as ExecutableTxTuple>::Error,
100    <N as NodePrimitives>::Receipt,
101>;
102
103/// Entrypoint for executing the payload.
104#[derive(Debug)]
105pub struct PayloadProcessor<Evm>
106where
107    Evm: ConfigureEvm,
108{
109    /// The executor used by to spawn tasks.
110    executor: WorkloadExecutor,
111    /// The most recent cache used for execution.
112    execution_cache: ExecutionCache,
113    /// Metrics for trie operations
114    trie_metrics: MultiProofTaskMetrics,
115    /// Cross-block cache size in bytes.
116    cross_block_cache_size: u64,
117    /// Whether transactions should not be executed on prewarming task.
118    disable_transaction_prewarming: bool,
119    /// Whether state cache should be disable
120    disable_state_cache: bool,
121    /// Determines how to configure the evm for execution.
122    evm_config: Evm,
123    /// Whether precompile cache should be disabled.
124    precompile_cache_disabled: bool,
125    /// Precompile cache map.
126    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
127    /// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
128    /// that allocations can be minimized.
129    sparse_state_trie: Arc<
130        parking_lot::Mutex<
131            Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
132        >,
133    >,
134    /// Whether to disable the parallel sparse trie.
135    disable_parallel_sparse_trie: bool,
136    /// Maximum concurrency for prewarm task.
137    prewarm_max_concurrency: usize,
138}
139
140impl<N, Evm> PayloadProcessor<Evm>
141where
142    N: NodePrimitives,
143    Evm: ConfigureEvm<Primitives = N>,
144{
145    /// Returns a reference to the workload executor driving payload tasks.
146    pub(super) const fn executor(&self) -> &WorkloadExecutor {
147        &self.executor
148    }
149
150    /// Creates a new payload processor.
151    pub fn new(
152        executor: WorkloadExecutor,
153        evm_config: Evm,
154        config: &TreeConfig,
155        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
156    ) -> Self {
157        Self {
158            executor,
159            execution_cache: Default::default(),
160            trie_metrics: Default::default(),
161            cross_block_cache_size: config.cross_block_cache_size(),
162            disable_transaction_prewarming: config.disable_prewarming(),
163            evm_config,
164            disable_state_cache: config.disable_state_cache(),
165            precompile_cache_disabled: config.precompile_cache_disabled(),
166            precompile_cache_map,
167            sparse_state_trie: Arc::default(),
168            disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
169            prewarm_max_concurrency: config.prewarm_max_concurrency(),
170        }
171    }
172}
173
174impl<N, Evm> PayloadProcessor<Evm>
175where
176    N: NodePrimitives,
177    Evm: ConfigureEvm<Primitives = N> + 'static,
178{
179    /// Spawns all background tasks and returns a handle connected to the tasks.
180    ///
181    /// - Transaction prewarming task
182    /// - State root task
183    /// - Sparse trie task
184    ///
185    /// # Transaction prewarming task
186    ///
187    /// Responsible for feeding state updates to the multi proof task.
188    ///
189    /// This task runs until:
190    ///  - externally cancelled (e.g. sequential block execution is complete)
191    ///
192    /// ## Multi proof task
193    ///
194    /// Responsible for preparing sparse trie messages for the sparse trie task.
195    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
196    /// output back to this task.
197    ///
198    /// Receives updates from sequential execution.
199    /// This task runs until it receives a shutdown signal, which should be after the block
200    /// was fully executed.
201    ///
202    /// ## Sparse trie task
203    ///
204    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
205    ///
206    /// This task runs until there are no further updates to process.
207    ///
208    ///
209    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
210    /// canceling)
211    #[instrument(
212        level = "debug",
213        target = "engine::tree::payload_processor",
214        name = "payload processor",
215        skip_all
216    )]
217    pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
218        &mut self,
219        env: ExecutionEnv<Evm>,
220        transactions: I,
221        provider_builder: StateProviderBuilder<N, P>,
222        multiproof_provider_factory: F,
223        config: &TreeConfig,
224        bal: Option<Arc<BlockAccessList>>,
225    ) -> IteratorPayloadHandle<Evm, I, N>
226    where
227        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
228        F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
229            + Clone
230            + Send
231            + 'static,
232    {
233        let parent_hash = env.parent_hash;
234
235        // start preparing transactions immediately
236        let (prewarm_rx, execution_rx, transaction_count_hint) =
237            self.spawn_tx_iterator(transactions);
238
239        let span = Span::current();
240        let (to_sparse_trie, sparse_trie_rx) = channel();
241        let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
242
243        // Handle BAL-based optimization if available
244        let prewarm_handle = if let Some(bal) = bal {
245            // When BAL is present, use BAL prewarming and send BAL to multiproof
246            debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
247
248            // Send BAL message immediately to MultiProofTask
249            let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(Arc::clone(&bal)));
250
251            // Spawn with BAL prewarming
252            self.spawn_caching_with(
253                env,
254                prewarm_rx,
255                transaction_count_hint,
256                provider_builder.clone(),
257                None, // Don't send proof targets when BAL is present
258                Some(bal),
259            )
260        } else {
261            // Normal path: spawn with transaction prewarming
262            self.spawn_caching_with(
263                env,
264                prewarm_rx,
265                transaction_count_hint,
266                provider_builder.clone(),
267                Some(to_multi_proof.clone()),
268                None,
269            )
270        };
271
272        // Create and spawn the storage proof task
273        let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
274        let storage_worker_count = config.storage_worker_count();
275        let account_worker_count = config.account_worker_count();
276        let proof_handle = ProofWorkerHandle::new(
277            self.executor.handle().clone(),
278            task_ctx,
279            storage_worker_count,
280            account_worker_count,
281        );
282
283        let multi_proof_task = MultiProofTask::new(
284            proof_handle.clone(),
285            to_sparse_trie,
286            config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
287            to_multi_proof,
288            from_multi_proof,
289        );
290
291        // wire the multiproof task to the prewarm task
292        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
293
294        // spawn multi-proof task
295        let parent_span = span.clone();
296        self.executor.spawn_blocking({
297            let saved_cache = self.cache_for(parent_hash);
298            let cache = saved_cache.cache().clone();
299            let cache_metrics = saved_cache.metrics().clone();
300            move || {
301                let _enter = parent_span.entered();
302                // Build a state provider for the multiproof task
303                let provider = provider_builder.build().expect("failed to build provider");
304                let provider = CachedStateProvider::new(provider, cache, cache_metrics);
305                multi_proof_task.run(provider);
306            }
307        });
308
309        // wire the sparse trie to the state root response receiver
310        let (state_root_tx, state_root_rx) = channel();
311
312        // Spawn the sparse trie task using any stored trie and parallel trie configuration.
313        self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
314
315        PayloadHandle {
316            to_multi_proof,
317            prewarm_handle,
318            state_root: Some(state_root_rx),
319            transactions: execution_rx,
320            _span: span,
321        }
322    }
323
324    /// Spawns a task that exclusively handles cache prewarming for transaction execution.
325    ///
326    /// Returns a [`PayloadHandle`] to communicate with the task.
327    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
328    pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
329        &self,
330        env: ExecutionEnv<Evm>,
331        transactions: I,
332        provider_builder: StateProviderBuilder<N, P>,
333        bal: Option<Arc<BlockAccessList>>,
334    ) -> IteratorPayloadHandle<Evm, I, N>
335    where
336        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
337    {
338        let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
339        let prewarm_handle =
340            self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal);
341        PayloadHandle {
342            to_multi_proof: None,
343            prewarm_handle,
344            state_root: None,
345            transactions: execution_rx,
346            _span: Span::current(),
347        }
348    }
349
350    /// Spawns a task advancing transaction env iterator and streaming updates through a channel.
351    #[expect(clippy::type_complexity)]
352    fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
353        &self,
354        transactions: I,
355    ) -> (
356        mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
357        mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
358        usize,
359    ) {
360        let (transactions, convert) = transactions.into();
361        let transactions = transactions.into_par_iter();
362        let transaction_count_hint = transactions.len();
363
364        let (ooo_tx, ooo_rx) = mpsc::channel();
365        let (prewarm_tx, prewarm_rx) = mpsc::channel();
366        let (execute_tx, execute_rx) = mpsc::channel();
367
368        // Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
369        self.executor.spawn_blocking(move || {
370            transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
371                let tx = convert(tx);
372                let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
373                // Only send Ok(_) variants to prewarming task.
374                if let Ok(tx) = &tx {
375                    let _ = prewarm_tx.send(tx.clone());
376                }
377                let _ = ooo_tx.send((idx, tx));
378            });
379        });
380
381        // Spawn a task that processes out-of-order transactions from the task above and sends them
382        // to the execution task in order.
383        self.executor.spawn_blocking(move || {
384            let mut next_for_execution = 0;
385            let mut queue = BTreeMap::new();
386            while let Ok((idx, tx)) = ooo_rx.recv() {
387                if next_for_execution == idx {
388                    let _ = execute_tx.send(tx);
389                    next_for_execution += 1;
390
391                    while let Some(entry) = queue.first_entry() &&
392                        *entry.key() == next_for_execution
393                    {
394                        let _ = execute_tx.send(entry.remove());
395                        next_for_execution += 1;
396                    }
397                } else {
398                    queue.insert(idx, tx);
399                }
400            }
401        });
402
403        (prewarm_rx, execute_rx, transaction_count_hint)
404    }
405
406    /// Spawn prewarming optionally wired to the multiproof task for target updates.
407    fn spawn_caching_with<P>(
408        &self,
409        env: ExecutionEnv<Evm>,
410        mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
411        transaction_count_hint: usize,
412        provider_builder: StateProviderBuilder<N, P>,
413        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
414        bal: Option<Arc<BlockAccessList>>,
415    ) -> CacheTaskHandle<N::Receipt>
416    where
417        P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
418    {
419        if self.disable_transaction_prewarming {
420            // if no transactions should be executed we clear them but still spawn the task for
421            // caching updates
422            transactions = mpsc::channel().1;
423        }
424
425        let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
426            (None, None, None)
427        } else {
428            let saved_cache = self.cache_for(env.parent_hash);
429            let cache = saved_cache.cache().clone();
430            let cache_metrics = saved_cache.metrics().clone();
431            (Some(saved_cache), Some(cache), Some(cache_metrics))
432        };
433
434        // configure prewarming
435        let prewarm_ctx = PrewarmContext {
436            env,
437            evm_config: self.evm_config.clone(),
438            saved_cache,
439            provider: provider_builder,
440            metrics: PrewarmMetrics::default(),
441            terminate_execution: Arc::new(AtomicBool::new(false)),
442            precompile_cache_disabled: self.precompile_cache_disabled,
443            precompile_cache_map: self.precompile_cache_map.clone(),
444        };
445
446        let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
447            self.executor.clone(),
448            self.execution_cache.clone(),
449            prewarm_ctx,
450            to_multi_proof,
451            transaction_count_hint,
452            self.prewarm_max_concurrency,
453        );
454
455        // spawn pre-warm task
456        {
457            let to_prewarm_task = to_prewarm_task.clone();
458            self.executor.spawn_blocking(move || {
459                let mode = if let Some(bal) = bal {
460                    PrewarmMode::BlockAccessList(bal)
461                } else {
462                    PrewarmMode::Transactions(transactions)
463                };
464                prewarm_task.run(mode, to_prewarm_task);
465            });
466        }
467
468        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
469    }
470
471    /// Returns the cache for the given parent hash.
472    ///
473    /// If the given hash is different then what is recently cached, then this will create a new
474    /// instance.
475    #[instrument(level = "debug", target = "engine::caching", skip(self))]
476    fn cache_for(&self, parent_hash: B256) -> SavedCache {
477        if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
478            debug!("reusing execution cache");
479            cache
480        } else {
481            debug!("creating new execution cache on cache miss");
482            let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
483            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
484        }
485    }
486
487    /// Spawns the [`SparseTrieTask`] for this payload processor.
488    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
489    fn spawn_sparse_trie_task<BPF>(
490        &self,
491        sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
492        proof_worker_handle: BPF,
493        state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
494    ) where
495        BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
496        BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
497        BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
498    {
499        // Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
500        // there's none to reuse.
501        let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
502        let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
503            let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
504                ConfiguredSparseTrie::Serial(Default::default())
505            } else {
506                ConfiguredSparseTrie::Parallel(Box::new(
507                    ParallelSparseTrie::default()
508                        .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
509                ))
510            });
511            ClearedSparseStateTrie::from_state_trie(
512                SparseStateTrie::new()
513                    .with_accounts_trie(default_trie.clone())
514                    .with_default_storage_trie(default_trie)
515                    .with_updates(true),
516            )
517        });
518
519        let task =
520            SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
521                sparse_trie_rx,
522                proof_worker_handle,
523                self.trie_metrics.clone(),
524                sparse_state_trie,
525            );
526
527        let span = Span::current();
528        self.executor.spawn_blocking(move || {
529            let _enter = span.entered();
530
531            let (result, trie) = task.run();
532            // Send state root computation result
533            let _ = state_root_tx.send(result);
534
535            // Clear the SparseStateTrie, shrink, and replace it back into the mutex _after_ sending
536            // results to the next step, so that time spent clearing doesn't block the step after
537            // this one.
538            let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
539            let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
540
541            // Shrink the sparse trie so that we don't have ever increasing memory.
542            cleared_trie.shrink_to(
543                SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
544                SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
545            );
546
547            cleared_sparse_trie.lock().replace(cleared_trie);
548        });
549    }
550
551    /// Updates the execution cache with the post-execution state from an inserted block.
552    ///
553    /// This is used when blocks are inserted directly (e.g., locally built blocks by sequencers)
554    /// to ensure the cache remains warm for subsequent block execution.
555    ///
556    /// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
557    /// hitting the database, maintaining performance consistency.
558    pub(crate) fn on_inserted_executed_block(
559        &self,
560        block_with_parent: BlockWithParent,
561        bundle_state: &BundleState,
562    ) {
563        self.execution_cache.update_with_guard(|cached| {
564            if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
565                debug!(
566                    target: "engine::caching",
567                    parent_hash = %block_with_parent.parent,
568                    "Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
569                );
570                return;
571            }
572
573            // Take existing cache (if any) or create fresh caches
574            let (caches, cache_metrics) = match cached.take() {
575                Some(existing) => {
576                    existing.split()
577                }
578                None => (
579                    ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
580                    CachedStateMetrics::zeroed(),
581                ),
582            };
583
584            // Insert the block's bundle state into cache
585            let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics);
586            if new_cache.cache().insert_state(bundle_state).is_err() {
587                *cached = None;
588                debug!(target: "engine::caching", "cleared execution cache on update error");
589                return;
590            }
591            new_cache.update_metrics();
592
593            // Replace with the updated cache
594            *cached = Some(new_cache);
595            debug!(target: "engine::caching", ?block_with_parent, "Updated execution cache for inserted block");
596        });
597    }
598}
599
600/// Handle to all the spawned tasks.
601///
602/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
603/// caching task without cloning the expensive `BundleState`.
604#[derive(Debug)]
605pub struct PayloadHandle<Tx, Err, R> {
606    /// Channel for evm state updates
607    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
608    // must include the receiver of the state root wired to the sparse trie
609    prewarm_handle: CacheTaskHandle<R>,
610    /// Stream of block transactions
611    transactions: mpsc::Receiver<Result<Tx, Err>>,
612    /// Receiver for the state root
613    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
614    /// Span for tracing
615    _span: Span,
616}
617
618impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
619    /// Awaits the state root
620    ///
621    /// # Panics
622    ///
623    /// If payload processing was started without background tasks.
624    #[instrument(
625        level = "debug",
626        target = "engine::tree::payload_processor",
627        name = "await_state_root",
628        skip_all
629    )]
630    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
631        self.state_root
632            .take()
633            .expect("state_root is None")
634            .recv()
635            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
636    }
637
638    /// Returns a state hook to be used to send state updates to this task.
639    ///
640    /// If a multiproof task is spawned the hook will notify it about new states.
641    pub fn state_hook(&self) -> impl OnStateHook {
642        // convert the channel into a `StateHookSender` that emits an event on drop
643        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
644
645        move |source: StateChangeSource, state: &EvmState| {
646            if let Some(sender) = &to_multi_proof {
647                let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
648            }
649        }
650    }
651
652    /// Returns a clone of the caches used by prewarming
653    pub(super) fn caches(&self) -> Option<StateExecutionCache> {
654        self.prewarm_handle.cache.clone()
655    }
656
657    /// Returns a clone of the cache metrics used by prewarming
658    pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
659        self.prewarm_handle.cache_metrics.clone()
660    }
661
662    /// Terminates the pre-warming transaction processing.
663    ///
664    /// Note: This does not terminate the task yet.
665    pub(super) fn stop_prewarming_execution(&self) {
666        self.prewarm_handle.stop_prewarming_execution()
667    }
668
669    /// Terminates the entire caching task.
670    ///
671    /// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
672    /// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
673    /// path without cloning the expensive `BundleState`.
674    pub(super) fn terminate_caching(
675        &mut self,
676        execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
677    ) {
678        self.prewarm_handle.terminate_caching(execution_outcome)
679    }
680
681    /// Returns iterator yielding transactions from the stream.
682    pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
683        core::iter::repeat_with(|| self.transactions.recv())
684            .take_while(|res| res.is_ok())
685            .map(|res| res.unwrap())
686    }
687}
688
689/// Access to the spawned [`PrewarmCacheTask`].
690///
691/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
692/// prewarm task without cloning the expensive `BundleState`.
693#[derive(Debug)]
694pub(crate) struct CacheTaskHandle<R> {
695    /// The shared cache the task operates with.
696    cache: Option<StateExecutionCache>,
697    /// Metrics for the caches
698    cache_metrics: Option<CachedStateMetrics>,
699    /// Channel to the spawned prewarm task if any
700    to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
701}
702
703impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
704    /// Terminates the pre-warming transaction processing.
705    ///
706    /// Note: This does not terminate the task yet.
707    pub(super) fn stop_prewarming_execution(&self) {
708        self.to_prewarm_task
709            .as_ref()
710            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
711    }
712
713    /// Terminates the entire pre-warming task.
714    ///
715    /// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
716    /// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
717    pub(super) fn terminate_caching(
718        &mut self,
719        execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
720    ) {
721        if let Some(tx) = self.to_prewarm_task.take() {
722            let event = PrewarmTaskEvent::Terminate { execution_outcome };
723            let _ = tx.send(event);
724        }
725    }
726}
727
728impl<R> Drop for CacheTaskHandle<R> {
729    fn drop(&mut self) {
730        // Ensure we always terminate on drop - send None without needing Send + Sync bounds
731        if let Some(tx) = self.to_prewarm_task.take() {
732            let _ = tx.send(PrewarmTaskEvent::Terminate { execution_outcome: None });
733        }
734    }
735}
736
737/// Shared access to most recently used cache.
738///
739/// This cache is intended to used for processing the payload in the following manner:
740///  - Get Cache if the payload's parent block matches the parent block
741///  - Update cache upon successful payload execution
742///
743/// This process assumes that payloads are received sequentially.
744///
745/// ## Cache Safety
746///
747/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
748/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
749/// the cache may be corrupted or cleared.
750///
751/// ## Cache vs Prewarming Distinction
752///
753/// **`ExecutionCache`**:
754/// - Stores parent block's execution state after completion
755/// - Used to fetch parent data for next block's execution
756/// - Must be exclusively accessed during save operations
757///
758/// **`PrewarmCacheTask`**:
759/// - Speculatively loads accounts/storage that might be used in transaction execution
760/// - Prepares data for state root proof computation
761/// - Runs concurrently but must not interfere with cache saves
762#[derive(Clone, Debug, Default)]
763struct ExecutionCache {
764    /// Guarded cloneable cache identified by a block hash.
765    inner: Arc<RwLock<Option<SavedCache>>>,
766}
767
768impl ExecutionCache {
769    /// Returns the cache for `parent_hash` if it's available for use.
770    ///
771    /// A cache is considered available when:
772    /// - It exists and matches the requested parent hash
773    /// - No other tasks are currently using it (checked via Arc reference count)
774    #[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
775    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
776        let start = Instant::now();
777        let cache = self.inner.read();
778
779        let elapsed = start.elapsed();
780        if elapsed.as_millis() > 5 {
781            warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
782        }
783
784        cache
785            .as_ref()
786            // Check `is_available()` to ensure no other tasks (e.g., prewarming) currently hold
787            // a reference to this cache. We can only reuse it when we have exclusive access.
788            .filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
789            .cloned()
790    }
791
792    /// Clears the tracked cache
793    #[expect(unused)]
794    pub(crate) fn clear(&self) {
795        self.inner.write().take();
796    }
797
798    /// Updates the cache with a closure that has exclusive access to the guard.
799    /// This ensures that all cache operations happen atomically.
800    ///
801    /// ## CRITICAL SAFETY REQUIREMENT
802    ///
803    /// **Before calling this method, you MUST ensure there are no other active cache users.**
804    /// This includes:
805    /// - No running [`PrewarmCacheTask`] instances that could write to the cache
806    /// - No concurrent transactions that might access the cached state
807    /// - All prewarming operations must be completed or cancelled
808    ///
809    /// Violating this requirement can result in cache corruption, incorrect state data,
810    /// and potential consensus failures.
811    pub(crate) fn update_with_guard<F>(&self, update_fn: F)
812    where
813        F: FnOnce(&mut Option<SavedCache>),
814    {
815        let mut guard = self.inner.write();
816        update_fn(&mut guard);
817    }
818}
819
820/// EVM context required to execute a block.
821#[derive(Debug, Clone)]
822pub struct ExecutionEnv<Evm: ConfigureEvm> {
823    /// Evm environment.
824    pub evm_env: EvmEnvFor<Evm>,
825    /// Hash of the block being executed.
826    pub hash: B256,
827    /// Hash of the parent block.
828    pub parent_hash: B256,
829}
830
831impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
832where
833    EvmEnvFor<Evm>: Default,
834{
835    fn default() -> Self {
836        Self {
837            evm_env: Default::default(),
838            hash: Default::default(),
839            parent_hash: Default::default(),
840        }
841    }
842}
843
844#[cfg(test)]
845mod tests {
846    use super::ExecutionCache;
847    use crate::tree::{
848        cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
849        payload_processor::{
850            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
851        },
852        precompile_cache::PrecompileCacheMap,
853        StateProviderBuilder, TreeConfig,
854    };
855    use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
856    use alloy_evm::block::StateChangeSource;
857    use rand::Rng;
858    use reth_chainspec::ChainSpec;
859    use reth_db_common::init::init_genesis;
860    use reth_ethereum_primitives::TransactionSigned;
861    use reth_evm::OnStateHook;
862    use reth_evm_ethereum::EthEvmConfig;
863    use reth_primitives_traits::{Account, Recovered, StorageEntry};
864    use reth_provider::{
865        providers::{BlockchainProvider, OverlayStateProviderFactory},
866        test_utils::create_test_provider_factory_with_chain_spec,
867        ChainSpecProvider, HashingWriter,
868    };
869    use reth_revm::db::BundleState;
870    use reth_testing_utils::generators;
871    use reth_trie::{test_utils::state_root, HashedPostState};
872    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
873    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
874    use std::sync::Arc;
875
876    fn make_saved_cache(hash: B256) -> SavedCache {
877        let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
878        SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
879    }
880
881    #[test]
882    fn execution_cache_allows_single_checkout() {
883        let execution_cache = ExecutionCache::default();
884        let hash = B256::from([1u8; 32]);
885
886        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
887
888        let first = execution_cache.get_cache_for(hash);
889        assert!(first.is_some(), "expected initial checkout to succeed");
890
891        let second = execution_cache.get_cache_for(hash);
892        assert!(second.is_none(), "second checkout should be blocked while guard is active");
893
894        drop(first);
895
896        let third = execution_cache.get_cache_for(hash);
897        assert!(third.is_some(), "third checkout should succeed after guard is dropped");
898    }
899
900    #[test]
901    fn execution_cache_checkout_releases_on_drop() {
902        let execution_cache = ExecutionCache::default();
903        let hash = B256::from([2u8; 32]);
904
905        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
906
907        {
908            let guard = execution_cache.get_cache_for(hash);
909            assert!(guard.is_some(), "expected checkout to succeed");
910            // Guard dropped at end of scope
911        }
912
913        let retry = execution_cache.get_cache_for(hash);
914        assert!(retry.is_some(), "checkout should succeed after guard drop");
915    }
916
917    #[test]
918    fn execution_cache_mismatch_parent_returns_none() {
919        let execution_cache = ExecutionCache::default();
920        let hash = B256::from([3u8; 32]);
921
922        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
923
924        let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
925        assert!(miss.is_none(), "checkout should fail for different parent hash");
926    }
927
928    #[test]
929    fn execution_cache_update_after_release_succeeds() {
930        let execution_cache = ExecutionCache::default();
931        let initial = B256::from([5u8; 32]);
932
933        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
934
935        let guard =
936            execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
937
938        drop(guard);
939
940        let updated = B256::from([6u8; 32]);
941        execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
942
943        let new_checkout = execution_cache.get_cache_for(updated);
944        assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
945    }
946
947    #[test]
948    fn on_inserted_executed_block_populates_cache() {
949        let payload_processor = PayloadProcessor::new(
950            WorkloadExecutor::default(),
951            EthEvmConfig::new(Arc::new(ChainSpec::default())),
952            &TreeConfig::default(),
953            PrecompileCacheMap::default(),
954        );
955
956        let parent_hash = B256::from([1u8; 32]);
957        let block_hash = B256::from([10u8; 32]);
958        let block_with_parent = BlockWithParent {
959            block: BlockNumHash { hash: block_hash, number: 1 },
960            parent: parent_hash,
961        };
962        let bundle_state = BundleState::default();
963
964        // Cache should be empty initially
965        assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
966
967        // Update cache with inserted block
968        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
969
970        // Cache should now exist for the block hash
971        let cached = payload_processor.execution_cache.get_cache_for(block_hash);
972        assert!(cached.is_some());
973        assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
974    }
975
976    #[test]
977    fn on_inserted_executed_block_skips_on_parent_mismatch() {
978        let payload_processor = PayloadProcessor::new(
979            WorkloadExecutor::default(),
980            EthEvmConfig::new(Arc::new(ChainSpec::default())),
981            &TreeConfig::default(),
982            PrecompileCacheMap::default(),
983        );
984
985        // Setup: populate cache with block 1
986        let block1_hash = B256::from([1u8; 32]);
987        payload_processor
988            .execution_cache
989            .update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
990
991        // Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
992        let wrong_parent = B256::from([99u8; 32]);
993        let block3_hash = B256::from([3u8; 32]);
994        let block_with_parent = BlockWithParent {
995            block: BlockNumHash { hash: block3_hash, number: 3 },
996            parent: wrong_parent,
997        };
998        let bundle_state = BundleState::default();
999
1000        payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
1001
1002        // Cache should still be for block 1 (unchanged)
1003        let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
1004        assert!(cached.is_some(), "Original cache should be preserved");
1005
1006        // Cache for block 3 should not exist
1007        let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
1008        assert!(cached3.is_none(), "New block cache should not be created on mismatch");
1009    }
1010
1011    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
1012        let mut rng = generators::rng();
1013        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
1014        let mut updates = Vec::with_capacity(updates_per_account);
1015
1016        for _ in 0..updates_per_account {
1017            let num_accounts_in_update = rng.random_range(1..=num_accounts);
1018            let mut state_update = EvmState::default();
1019
1020            let selected_addresses = &all_addresses[0..num_accounts_in_update];
1021
1022            for &address in selected_addresses {
1023                let mut storage = HashMap::default();
1024                if rng.random_bool(0.7) {
1025                    for _ in 0..rng.random_range(1..10) {
1026                        let slot = U256::from(rng.random::<u64>());
1027                        storage.insert(
1028                            slot,
1029                            EvmStorageSlot::new_changed(
1030                                U256::ZERO,
1031                                U256::from(rng.random::<u64>()),
1032                                0,
1033                            ),
1034                        );
1035                    }
1036                }
1037
1038                let account = revm_state::Account {
1039                    info: AccountInfo {
1040                        balance: U256::from(rng.random::<u64>()),
1041                        nonce: rng.random::<u64>(),
1042                        code_hash: KECCAK_EMPTY,
1043                        code: Some(Default::default()),
1044                    },
1045                    storage,
1046                    status: AccountStatus::Touched,
1047                    transaction_id: 0,
1048                };
1049
1050                state_update.insert(address, account);
1051            }
1052
1053            updates.push(state_update);
1054        }
1055
1056        updates
1057    }
1058
1059    #[test]
1060    fn test_state_root() {
1061        reth_tracing::init_test_tracing();
1062
1063        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
1064        let genesis_hash = init_genesis(&factory).unwrap();
1065
1066        let state_updates = create_mock_state_updates(10, 10);
1067        let mut hashed_state = HashedPostState::default();
1068        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
1069            HashMap::default();
1070
1071        {
1072            let provider_rw = factory.provider_rw().expect("failed to get provider");
1073
1074            for update in &state_updates {
1075                let account_updates = update.iter().map(|(address, account)| {
1076                    (*address, Some(Account::from_revm_account(account)))
1077                });
1078                provider_rw
1079                    .insert_account_for_hashing(account_updates)
1080                    .expect("failed to insert accounts");
1081
1082                let storage_updates = update.iter().map(|(address, account)| {
1083                    let storage_entries = account.storage.iter().map(|(slot, value)| {
1084                        StorageEntry { key: B256::from(*slot), value: value.present_value }
1085                    });
1086                    (*address, storage_entries)
1087                });
1088                provider_rw
1089                    .insert_storage_for_hashing(storage_updates)
1090                    .expect("failed to insert storage");
1091            }
1092            provider_rw.commit().expect("failed to commit changes");
1093        }
1094
1095        for update in &state_updates {
1096            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
1097
1098            for (address, account) in update {
1099                let storage: HashMap<B256, U256> = account
1100                    .storage
1101                    .iter()
1102                    .map(|(k, v)| (B256::from(*k), v.present_value))
1103                    .collect();
1104
1105                let entry = accumulated_state.entry(*address).or_default();
1106                entry.0 = Account::from_revm_account(account);
1107                entry.1.extend(storage);
1108            }
1109        }
1110
1111        let mut payload_processor = PayloadProcessor::new(
1112            WorkloadExecutor::default(),
1113            EthEvmConfig::new(factory.chain_spec()),
1114            &TreeConfig::default(),
1115            PrecompileCacheMap::default(),
1116        );
1117
1118        let provider_factory = BlockchainProvider::new(factory).unwrap();
1119
1120        let mut handle = payload_processor.spawn(
1121            Default::default(),
1122            (
1123                Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
1124                std::convert::identity,
1125            ),
1126            StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1127            OverlayStateProviderFactory::new(provider_factory),
1128            &TreeConfig::default(),
1129            None, // No BAL for test
1130        );
1131
1132        let mut state_hook = handle.state_hook();
1133
1134        for (i, update) in state_updates.into_iter().enumerate() {
1135            state_hook.on_state(StateChangeSource::Transaction(i), &update);
1136        }
1137        drop(state_hook);
1138
1139        let root_from_task = handle.state_root().expect("task failed").state_root;
1140        let root_from_regular = state_root(accumulated_state);
1141
1142        assert_eq!(
1143            root_from_task, root_from_regular,
1144            "State root mismatch: task={root_from_task}, base={root_from_regular}"
1145        );
1146    }
1147}