reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    cached_state::{CachedStateProvider, SavedCache},
16    payload_processor::{
17        executor::WorkloadExecutor, multiproof::MultiProofMessage,
18        ExecutionCache as PayloadExecutionCache,
19    },
20    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
21    ExecutionEnv, StateProviderBuilder,
22};
23use alloy_consensus::transaction::TxHashRef;
24use alloy_eips::Typed2718;
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use crossbeam_channel::Sender as CrossbeamSender;
28use metrics::{Counter, Gauge, Histogram};
29use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
30use reth_metrics::Metrics;
31use reth_primitives_traits::NodePrimitives;
32use reth_provider::{BlockReader, StateProviderFactory, StateReader};
33use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
34use reth_trie::MultiProofTargets;
35use std::{
36    sync::{
37        atomic::{AtomicBool, Ordering},
38        mpsc::{self, channel, Receiver, Sender},
39        Arc,
40    },
41    time::Instant,
42};
43use tracing::{debug, debug_span, instrument, trace, warn};
44
45/// A wrapper for transactions that includes their index in the block.
46#[derive(Clone)]
47struct IndexedTransaction<Tx> {
48    /// The transaction index in the block.
49    index: usize,
50    /// The wrapped transaction.
51    tx: Tx,
52}
53
54/// Maximum standard Ethereum transaction type value.
55///
56/// Standard transaction types are:
57/// - Type 0: Legacy transactions (original Ethereum)
58/// - Type 1: EIP-2930 (access list transactions)
59/// - Type 2: EIP-1559 (dynamic fee transactions)
60/// - Type 3: EIP-4844 (blob transactions)
61/// - Type 4: EIP-7702 (set code authorization transactions)
62///
63/// Any transaction with a type > 4 is considered a non-standard/system transaction,
64/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
65const MAX_STANDARD_TX_TYPE: u8 = 4;
66
67/// A task that is responsible for caching and prewarming the cache by executing transactions
68/// individually in parallel.
69///
70/// Note: This task runs until cancelled externally.
71pub(super) struct PrewarmCacheTask<N, P, Evm>
72where
73    N: NodePrimitives,
74    Evm: ConfigureEvm<Primitives = N>,
75{
76    /// The executor used to spawn execution tasks.
77    executor: WorkloadExecutor,
78    /// Shared execution cache.
79    execution_cache: PayloadExecutionCache,
80    /// Context provided to execution tasks
81    ctx: PrewarmContext<N, P, Evm>,
82    /// How many transactions should be executed in parallel
83    max_concurrency: usize,
84    /// The number of transactions to be processed
85    transaction_count_hint: usize,
86    /// Sender to emit evm state outcome messages, if any.
87    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
88    /// Receiver for events produced by tx execution
89    actions_rx: Receiver<PrewarmTaskEvent>,
90}
91
92impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
93where
94    N: NodePrimitives,
95    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
96    Evm: ConfigureEvm<Primitives = N> + 'static,
97{
98    /// Initializes the task with the given transactions pending execution
99    pub(super) fn new(
100        executor: WorkloadExecutor,
101        execution_cache: PayloadExecutionCache,
102        ctx: PrewarmContext<N, P, Evm>,
103        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
104        transaction_count_hint: usize,
105        max_concurrency: usize,
106    ) -> (Self, Sender<PrewarmTaskEvent>) {
107        let (actions_tx, actions_rx) = channel();
108
109        trace!(
110            target: "engine::tree::payload_processor::prewarm",
111            max_concurrency,
112            transaction_count_hint,
113            "Initialized prewarm task"
114        );
115
116        (
117            Self {
118                executor,
119                execution_cache,
120                ctx,
121                max_concurrency,
122                transaction_count_hint,
123                to_multi_proof,
124                actions_rx,
125            },
126            actions_tx,
127        )
128    }
129
130    /// Spawns all pending transactions as blocking tasks by first chunking them.
131    ///
132    /// For Optimism chains, special handling is applied to the first transaction if it's a
133    /// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
134    /// subsequent transactions in the block.
135    fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
136    where
137        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
138    {
139        let executor = self.executor.clone();
140        let ctx = self.ctx.clone();
141        let max_concurrency = self.max_concurrency;
142        let transaction_count_hint = self.transaction_count_hint;
143        let span = tracing::Span::current();
144
145        self.executor.spawn_blocking(move || {
146            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
147
148            let (done_tx, done_rx) = mpsc::channel();
149
150            // When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
151            // max workers to handle potentially many transactions in parallel rather
152            // than bottlenecking on a single worker.
153            let workers_needed = if transaction_count_hint == 0 {
154                max_concurrency
155            } else {
156                transaction_count_hint.min(max_concurrency)
157            };
158
159            // Initialize worker handles container
160            let mut handles = Vec::with_capacity(workers_needed);
161
162            // Only spawn initial workers as needed
163            for i in 0..workers_needed {
164                handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
165            }
166
167            // Distribute transactions to workers
168            let mut tx_index = 0usize;
169            while let Ok(tx) = pending.recv() {
170                // Stop distributing if termination was requested
171                if ctx.terminate_execution.load(Ordering::Relaxed) {
172                    trace!(
173                        target: "engine::tree::payload_processor::prewarm",
174                        "Termination requested, stopping transaction distribution"
175                    );
176                    break;
177                }
178
179                let indexed_tx = IndexedTransaction { index: tx_index, tx };
180                let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
181
182                // System transactions (type > 4) in the first position set critical metadata
183                // that affects all subsequent transactions (e.g., L1 block info on L2s).
184                // Broadcast the first system transaction to all workers to ensure they have
185                // the critical state. This is particularly important for L2s like Optimism
186                // where the first deposit transaction (type 126) contains essential block metadata.
187                if tx_index == 0 && is_system_tx {
188                    for handle in &handles {
189                        // Ignore send errors: workers listen to terminate_execution and may
190                        // exit early when signaled. Sending to a disconnected worker is
191                        // possible and harmless and should happen at most once due to
192                        //  the terminate_execution check above.
193                        let _ = handle.send(indexed_tx.clone());
194                    }
195                } else {
196                    // Round-robin distribution for all other transactions
197                    let worker_idx = tx_index % workers_needed;
198                    // Ignore send errors: workers listen to terminate_execution and may
199                    // exit early when signaled. Sending to a disconnected worker is
200                    // possible and harmless and should happen at most once due to
201                    //  the terminate_execution check above.
202                    let _ = handles[worker_idx].send(indexed_tx);
203                }
204
205                tx_index += 1;
206            }
207
208            // drop handle and wait for all tasks to finish and drop theirs
209            drop(done_tx);
210            drop(handles);
211            while done_rx.recv().is_ok() {}
212
213            let _ = actions_tx
214                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
215        });
216    }
217
218    /// Returns true if prewarming was terminated and no more transactions should be prewarmed.
219    fn is_execution_terminated(&self) -> bool {
220        self.ctx.terminate_execution.load(Ordering::Relaxed)
221    }
222
223    /// If configured and the tx returned proof targets, emit the targets the transaction produced
224    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
225        if self.is_execution_terminated() {
226            // if execution is already terminated then we dont need to send more proof fetch
227            // messages
228            return
229        }
230
231        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
232            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
233        }
234    }
235
236    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
237    /// It should only be called after ensuring that:
238    /// 1. All prewarming tasks have completed execution
239    /// 2. No other concurrent operations are accessing the cache
240    ///
241    /// Saves the warmed caches back into the shared slot after prewarming completes.
242    ///
243    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
244    /// the new, warmed cache to be inserted.
245    ///
246    /// This method is called from `run()` only after all execution tasks are complete.
247    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
248    fn save_cache(self, state: BundleState) {
249        let start = Instant::now();
250
251        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
252            self;
253        let hash = env.hash;
254
255        debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
256        // Perform all cache operations atomically under the lock
257        execution_cache.update_with_guard(|cached| {
258            // consumes the `SavedCache` held by the prewarming task, which releases its usage guard
259            let (caches, cache_metrics) = saved_cache.split();
260            let new_cache = SavedCache::new(hash, caches, cache_metrics);
261
262            // Insert state into cache while holding the lock
263            if new_cache.cache().insert_state(&state).is_err() {
264                // Clear the cache on error to prevent having a polluted cache
265                *cached = None;
266                debug!(target: "engine::caching", "cleared execution cache on update error");
267                return;
268            }
269
270            new_cache.update_metrics();
271
272            // Replace the shared cache with the new one; the previous cache (if any) is dropped.
273            *cached = Some(new_cache);
274        });
275
276        let elapsed = start.elapsed();
277        debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
278
279        metrics.cache_saving_duration.set(elapsed.as_secs_f64());
280    }
281
282    /// Executes the task.
283    ///
284    /// This will execute the transactions until all transactions have been processed or the task
285    /// was cancelled.
286    #[instrument(
287        level = "debug",
288        target = "engine::tree::payload_processor::prewarm",
289        name = "prewarm",
290        skip_all
291    )]
292    pub(super) fn run(
293        self,
294        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
295        actions_tx: Sender<PrewarmTaskEvent>,
296    ) {
297        // spawn execution tasks.
298        self.spawn_all(pending, actions_tx);
299
300        let mut final_block_output = None;
301        let mut finished_execution = false;
302        while let Ok(event) = self.actions_rx.recv() {
303            match event {
304                PrewarmTaskEvent::TerminateTransactionExecution => {
305                    // stop tx processing
306                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
307                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
308                }
309                PrewarmTaskEvent::Outcome { proof_targets } => {
310                    // completed executing a set of transactions
311                    self.send_multi_proof_targets(proof_targets);
312                }
313                PrewarmTaskEvent::Terminate { block_output } => {
314                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
315                    final_block_output = Some(block_output);
316
317                    if finished_execution {
318                        // all tasks are done, we can exit, which will save caches and exit
319                        break
320                    }
321                }
322                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
323                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
324                    self.ctx.metrics.transactions.set(executed_transactions as f64);
325                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
326
327                    finished_execution = true;
328
329                    if final_block_output.is_some() {
330                        // all tasks are done, we can exit, which will save caches and exit
331                        break
332                    }
333                }
334            }
335        }
336
337        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
338
339        // save caches and finish
340        if let Some(Some(state)) = final_block_output {
341            self.save_cache(state);
342        }
343    }
344}
345
346/// Context required by tx execution tasks.
347#[derive(Debug, Clone)]
348pub(super) struct PrewarmContext<N, P, Evm>
349where
350    N: NodePrimitives,
351    Evm: ConfigureEvm<Primitives = N>,
352{
353    pub(super) env: ExecutionEnv<Evm>,
354    pub(super) evm_config: Evm,
355    pub(super) saved_cache: SavedCache,
356    /// Provider to obtain the state
357    pub(super) provider: StateProviderBuilder<N, P>,
358    pub(super) metrics: PrewarmMetrics,
359    /// An atomic bool that tells prewarm tasks to not start any more execution.
360    pub(super) terminate_execution: Arc<AtomicBool>,
361    pub(super) precompile_cache_disabled: bool,
362    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
363}
364
365impl<N, P, Evm> PrewarmContext<N, P, Evm>
366where
367    N: NodePrimitives,
368    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
369    Evm: ConfigureEvm<Primitives = N> + 'static,
370{
371    /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
372    /// execution.
373    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
374    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
375        let Self {
376            env,
377            evm_config,
378            saved_cache,
379            provider,
380            metrics,
381            terminate_execution,
382            precompile_cache_disabled,
383            mut precompile_cache_map,
384        } = self;
385
386        let state_provider = match provider.build() {
387            Ok(provider) => provider,
388            Err(err) => {
389                trace!(
390                    target: "engine::tree::payload_processor::prewarm",
391                    %err,
392                    "Failed to build state provider in prewarm thread"
393                );
394                return None
395            }
396        };
397
398        // Use the caches to create a new provider with caching
399        let caches = saved_cache.cache().clone();
400        let cache_metrics = saved_cache.metrics().clone();
401        let state_provider =
402            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
403
404        let state_provider = StateProviderDatabase::new(state_provider);
405
406        let mut evm_env = env.evm_env;
407
408        // we must disable the nonce check so that we can execute the transaction even if the nonce
409        // doesn't match what's on chain.
410        evm_env.cfg_env.disable_nonce_check = true;
411
412        // create a new executor and disable nonce checks in the env
413        let spec_id = *evm_env.spec_id();
414        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
415
416        if !precompile_cache_disabled {
417            // Only cache pure precompiles to avoid issues with stateful precompiles
418            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
419                CachedPrecompile::wrap(
420                    precompile,
421                    precompile_cache_map.cache_for_address(*address),
422                    spec_id,
423                    None, // No metrics for prewarm
424                )
425            });
426        }
427
428        Some((evm, metrics, terminate_execution))
429    }
430
431    /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
432    /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
433    ///
434    /// Returns `None` if executing the transactions failed to a non Revert error.
435    /// Returns the touched+modified state of the transaction.
436    ///
437    /// Note: There are no ordering guarantees; this does not reflect the state produced by
438    /// sequential execution.
439    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
440    fn transact_batch<Tx>(
441        self,
442        txs: mpsc::Receiver<IndexedTransaction<Tx>>,
443        sender: Sender<PrewarmTaskEvent>,
444        done_tx: Sender<()>,
445    ) where
446        Tx: ExecutableTxFor<Evm>,
447    {
448        let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
449
450        while let Ok(IndexedTransaction { index, tx }) = {
451            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
452                .entered();
453            txs.recv()
454        } {
455            let _enter =
456                debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
457                    .entered();
458
459            // create the tx env
460            let start = Instant::now();
461
462            // If the task was cancelled, stop execution, send an empty result to notify the task,
463            // and exit.
464            if terminate_execution.load(Ordering::Relaxed) {
465                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
466                break
467            }
468
469            let res = match evm.transact(&tx) {
470                Ok(res) => res,
471                Err(err) => {
472                    trace!(
473                        target: "engine::tree::payload_processor::prewarm",
474                        %err,
475                        tx_hash=%tx.tx().tx_hash(),
476                        sender=%tx.signer(),
477                        "Error when executing prewarm transaction",
478                    );
479                    // Track transaction execution errors
480                    metrics.transaction_errors.increment(1);
481                    // skip error because we can ignore these errors and continue with the next tx
482                    continue
483                }
484            };
485            metrics.execution_duration.record(start.elapsed());
486
487            drop(_enter);
488
489            // If the task was cancelled, stop execution, send an empty result to notify the task,
490            // and exit.
491            if terminate_execution.load(Ordering::Relaxed) {
492                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
493                break
494            }
495
496            // Only send outcome for transactions after the first txn
497            // as the main execution will be just as fast
498            if index > 0 {
499                let _enter =
500                    debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
501                        .entered();
502                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
503                metrics.prefetch_storage_targets.record(storage_targets as f64);
504                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
505                drop(_enter);
506            }
507
508            metrics.total_runtime.record(start.elapsed());
509        }
510
511        // send a message to the main task to flag that we're done
512        let _ = done_tx.send(());
513    }
514
515    /// Spawns a worker task for transaction execution and returns its sender channel.
516    fn spawn_worker<Tx>(
517        &self,
518        idx: usize,
519        executor: &WorkloadExecutor,
520        actions_tx: Sender<PrewarmTaskEvent>,
521        done_tx: Sender<()>,
522    ) -> mpsc::Sender<IndexedTransaction<Tx>>
523    where
524        Tx: ExecutableTxFor<Evm> + Send + 'static,
525    {
526        let (tx, rx) = mpsc::channel();
527        let ctx = self.clone();
528        let span =
529            debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
530
531        executor.spawn_blocking(move || {
532            let _enter = span.entered();
533            ctx.transact_batch(rx, actions_tx, done_tx);
534        });
535
536        tx
537    }
538}
539
540/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
541/// given state.
542fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
543    let mut targets = MultiProofTargets::with_capacity(state.len());
544    let mut storage_targets = 0;
545    for (addr, account) in state {
546        // if the account was not touched, or if the account was selfdestructed, do not
547        // fetch proofs for it
548        //
549        // Since selfdestruct can only happen in the same transaction, we can skip
550        // prefetching proofs for selfdestructed accounts
551        //
552        // See: https://eips.ethereum.org/EIPS/eip-6780
553        if !account.is_touched() || account.is_selfdestructed() {
554            continue
555        }
556
557        let mut storage_set =
558            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
559        for (key, slot) in account.storage {
560            // do nothing if unchanged
561            if !slot.is_changed() {
562                continue
563            }
564
565            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
566        }
567
568        storage_targets += storage_set.len();
569        targets.insert(keccak256(addr), storage_set);
570    }
571
572    (targets, storage_targets)
573}
574
575/// The events the pre-warm task can handle.
576pub(super) enum PrewarmTaskEvent {
577    /// Forcefully terminate all remaining transaction execution.
578    TerminateTransactionExecution,
579    /// Forcefully terminate the task on demand and update the shared cache with the given output
580    /// before exiting.
581    Terminate {
582        /// The final block state output.
583        block_output: Option<BundleState>,
584    },
585    /// The outcome of a pre-warm task
586    Outcome {
587        /// The prepared proof targets based on the evm state outcome
588        proof_targets: Option<MultiProofTargets>,
589    },
590    /// Finished executing all transactions
591    FinishedTxExecution {
592        /// Number of transactions executed
593        executed_transactions: usize,
594    },
595}
596
597/// Metrics for transactions prewarming.
598#[derive(Metrics, Clone)]
599#[metrics(scope = "sync.prewarm")]
600pub(crate) struct PrewarmMetrics {
601    /// The number of transactions to prewarm
602    pub(crate) transactions: Gauge,
603    /// A histogram of the number of transactions to prewarm
604    pub(crate) transactions_histogram: Histogram,
605    /// A histogram of duration per transaction prewarming
606    pub(crate) total_runtime: Histogram,
607    /// A histogram of EVM execution duration per transaction prewarming
608    pub(crate) execution_duration: Histogram,
609    /// A histogram for prefetch targets per transaction prewarming
610    pub(crate) prefetch_storage_targets: Histogram,
611    /// A histogram of duration for cache saving
612    pub(crate) cache_saving_duration: Gauge,
613    /// Counter for transaction execution errors during prewarming
614    pub(crate) transaction_errors: Counter,
615}