reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    cached_state::{CachedStateProvider, SavedCache},
16    payload_processor::{
17        executor::WorkloadExecutor, multiproof::MultiProofMessage,
18        ExecutionCache as PayloadExecutionCache,
19    },
20    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
21    ExecutionEnv, StateProviderBuilder,
22};
23use alloy_consensus::transaction::TxHashRef;
24use alloy_eips::Typed2718;
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use crossbeam_channel::Sender as CrossbeamSender;
28use metrics::{Counter, Gauge, Histogram};
29use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
30use reth_metrics::Metrics;
31use reth_primitives_traits::NodePrimitives;
32use reth_provider::{BlockReader, StateProviderBox, StateProviderFactory, StateReader};
33use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
34use reth_trie::MultiProofTargets;
35use std::{
36    sync::{
37        atomic::{AtomicBool, Ordering},
38        mpsc::{self, channel, Receiver, Sender},
39        Arc,
40    },
41    time::Instant,
42};
43use tracing::{debug, debug_span, instrument, trace, warn, Span};
44
45/// A wrapper for transactions that includes their index in the block.
46#[derive(Clone)]
47struct IndexedTransaction<Tx> {
48    /// The transaction index in the block.
49    index: usize,
50    /// The wrapped transaction.
51    tx: Tx,
52}
53
54/// Maximum standard Ethereum transaction type value.
55///
56/// Standard transaction types are:
57/// - Type 0: Legacy transactions (original Ethereum)
58/// - Type 1: EIP-2930 (access list transactions)
59/// - Type 2: EIP-1559 (dynamic fee transactions)
60/// - Type 3: EIP-4844 (blob transactions)
61/// - Type 4: EIP-7702 (set code authorization transactions)
62///
63/// Any transaction with a type > 4 is considered a non-standard/system transaction,
64/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
65const MAX_STANDARD_TX_TYPE: u8 = 4;
66
67/// A task that is responsible for caching and prewarming the cache by executing transactions
68/// individually in parallel.
69///
70/// Note: This task runs until cancelled externally.
71pub(super) struct PrewarmCacheTask<N, P, Evm>
72where
73    N: NodePrimitives,
74    Evm: ConfigureEvm<Primitives = N>,
75{
76    /// The executor used to spawn execution tasks.
77    executor: WorkloadExecutor,
78    /// Shared execution cache.
79    execution_cache: PayloadExecutionCache,
80    /// Context provided to execution tasks
81    ctx: PrewarmContext<N, P, Evm>,
82    /// How many transactions should be executed in parallel
83    max_concurrency: usize,
84    /// The number of transactions to be processed
85    transaction_count_hint: usize,
86    /// Sender to emit evm state outcome messages, if any.
87    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
88    /// Receiver for events produced by tx execution
89    actions_rx: Receiver<PrewarmTaskEvent>,
90    /// Parent span for tracing
91    parent_span: Span,
92}
93
94impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
95where
96    N: NodePrimitives,
97    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
98    Evm: ConfigureEvm<Primitives = N> + 'static,
99{
100    /// Initializes the task with the given transactions pending execution
101    pub(super) fn new(
102        executor: WorkloadExecutor,
103        execution_cache: PayloadExecutionCache,
104        ctx: PrewarmContext<N, P, Evm>,
105        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
106        transaction_count_hint: usize,
107        max_concurrency: usize,
108    ) -> (Self, Sender<PrewarmTaskEvent>) {
109        let (actions_tx, actions_rx) = channel();
110
111        trace!(
112            target: "engine::tree::payload_processor::prewarm",
113            max_concurrency,
114            transaction_count_hint,
115            "Initialized prewarm task"
116        );
117
118        (
119            Self {
120                executor,
121                execution_cache,
122                ctx,
123                max_concurrency,
124                transaction_count_hint,
125                to_multi_proof,
126                actions_rx,
127                parent_span: Span::current(),
128            },
129            actions_tx,
130        )
131    }
132
133    /// Spawns all pending transactions as blocking tasks by first chunking them.
134    ///
135    /// For Optimism chains, special handling is applied to the first transaction if it's a
136    /// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
137    /// subsequent transactions in the block.
138    fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
139    where
140        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
141    {
142        let executor = self.executor.clone();
143        let ctx = self.ctx.clone();
144        let max_concurrency = self.max_concurrency;
145        let transaction_count_hint = self.transaction_count_hint;
146        let span = Span::current();
147
148        self.executor.spawn_blocking(move || {
149            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
150
151            let (done_tx, done_rx) = mpsc::channel();
152
153            // When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
154            // max workers to handle potentially many transactions in parallel rather
155            // than bottlenecking on a single worker.
156            let workers_needed = if transaction_count_hint == 0 {
157                max_concurrency
158            } else {
159                transaction_count_hint.min(max_concurrency)
160            };
161
162            // Initialize worker handles container
163            let mut handles = Vec::with_capacity(workers_needed);
164
165            // Only spawn initial workers as needed
166            for i in 0..workers_needed {
167                handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
168            }
169
170            // Distribute transactions to workers
171            let mut tx_index = 0usize;
172            while let Ok(tx) = pending.recv() {
173                // Stop distributing if termination was requested
174                if ctx.terminate_execution.load(Ordering::Relaxed) {
175                    trace!(
176                        target: "engine::tree::payload_processor::prewarm",
177                        "Termination requested, stopping transaction distribution"
178                    );
179                    break;
180                }
181
182                let indexed_tx = IndexedTransaction { index: tx_index, tx };
183                let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
184
185                // System transactions (type > 4) in the first position set critical metadata
186                // that affects all subsequent transactions (e.g., L1 block info on L2s).
187                // Broadcast the first system transaction to all workers to ensure they have
188                // the critical state. This is particularly important for L2s like Optimism
189                // where the first deposit transaction (type 126) contains essential block metadata.
190                if tx_index == 0 && is_system_tx {
191                    for handle in &handles {
192                        // Ignore send errors: workers listen to terminate_execution and may
193                        // exit early when signaled. Sending to a disconnected worker is
194                        // possible and harmless and should happen at most once due to
195                        //  the terminate_execution check above.
196                        let _ = handle.send(indexed_tx.clone());
197                    }
198                } else {
199                    // Round-robin distribution for all other transactions
200                    let worker_idx = tx_index % workers_needed;
201                    // Ignore send errors: workers listen to terminate_execution and may
202                    // exit early when signaled. Sending to a disconnected worker is
203                    // possible and harmless and should happen at most once due to
204                    //  the terminate_execution check above.
205                    let _ = handles[worker_idx].send(indexed_tx);
206                }
207
208                tx_index += 1;
209            }
210
211            // drop handle and wait for all tasks to finish and drop theirs
212            drop(done_tx);
213            drop(handles);
214            while done_rx.recv().is_ok() {}
215
216            let _ = actions_tx
217                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
218        });
219    }
220
221    /// Returns true if prewarming was terminated and no more transactions should be prewarmed.
222    fn is_execution_terminated(&self) -> bool {
223        self.ctx.terminate_execution.load(Ordering::Relaxed)
224    }
225
226    /// If configured and the tx returned proof targets, emit the targets the transaction produced
227    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
228        if self.is_execution_terminated() {
229            // if execution is already terminated then we dont need to send more proof fetch
230            // messages
231            return
232        }
233
234        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
235            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
236        }
237    }
238
239    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
240    /// It should only be called after ensuring that:
241    /// 1. All prewarming tasks have completed execution
242    /// 2. No other concurrent operations are accessing the cache
243    ///
244    /// Saves the warmed caches back into the shared slot after prewarming completes.
245    ///
246    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
247    /// the new, warmed cache to be inserted.
248    ///
249    /// This method is called from `run()` only after all execution tasks are complete.
250    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
251    fn save_cache(self, state: BundleState) {
252        let start = Instant::now();
253
254        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
255            self;
256        let hash = env.hash;
257
258        if let Some(saved_cache) = saved_cache {
259            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
260            // Perform all cache operations atomically under the lock
261            execution_cache.update_with_guard(|cached| {
262                // consumes the `SavedCache` held by the prewarming task, which releases its usage
263                // guard
264                let (caches, cache_metrics) = saved_cache.split();
265                let new_cache = SavedCache::new(hash, caches, cache_metrics);
266
267                // Insert state into cache while holding the lock
268                if new_cache.cache().insert_state(&state).is_err() {
269                    // Clear the cache on error to prevent having a polluted cache
270                    *cached = None;
271                    debug!(target: "engine::caching", "cleared execution cache on update error");
272                    return;
273                }
274
275                new_cache.update_metrics();
276
277                // Replace the shared cache with the new one; the previous cache (if any) is
278                // dropped.
279                *cached = Some(new_cache);
280            });
281
282            let elapsed = start.elapsed();
283            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
284
285            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
286        }
287    }
288
289    /// Executes the task.
290    ///
291    /// This will execute the transactions until all transactions have been processed or the task
292    /// was cancelled.
293    #[instrument(
294        parent = &self.parent_span,
295        level = "debug",
296        target = "engine::tree::payload_processor::prewarm",
297        name = "prewarm and caching",
298        skip_all
299    )]
300    pub(super) fn run(
301        self,
302        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
303        actions_tx: Sender<PrewarmTaskEvent>,
304    ) {
305        // spawn execution tasks.
306        self.spawn_all(pending, actions_tx);
307
308        let mut final_block_output = None;
309        let mut finished_execution = false;
310        while let Ok(event) = self.actions_rx.recv() {
311            match event {
312                PrewarmTaskEvent::TerminateTransactionExecution => {
313                    // stop tx processing
314                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
315                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
316                }
317                PrewarmTaskEvent::Outcome { proof_targets } => {
318                    // completed executing a set of transactions
319                    self.send_multi_proof_targets(proof_targets);
320                }
321                PrewarmTaskEvent::Terminate { block_output } => {
322                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
323                    final_block_output = Some(block_output);
324
325                    if finished_execution {
326                        // all tasks are done, we can exit, which will save caches and exit
327                        break
328                    }
329                }
330                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
331                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
332                    self.ctx.metrics.transactions.set(executed_transactions as f64);
333                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
334
335                    finished_execution = true;
336
337                    if final_block_output.is_some() {
338                        // all tasks are done, we can exit, which will save caches and exit
339                        break
340                    }
341                }
342            }
343        }
344
345        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
346
347        // save caches and finish
348        if let Some(Some(state)) = final_block_output {
349            self.save_cache(state);
350        }
351    }
352}
353
354/// Context required by tx execution tasks.
355#[derive(Debug, Clone)]
356pub(super) struct PrewarmContext<N, P, Evm>
357where
358    N: NodePrimitives,
359    Evm: ConfigureEvm<Primitives = N>,
360{
361    pub(super) env: ExecutionEnv<Evm>,
362    pub(super) evm_config: Evm,
363    pub(super) saved_cache: Option<SavedCache>,
364    /// Provider to obtain the state
365    pub(super) provider: StateProviderBuilder<N, P>,
366    pub(super) metrics: PrewarmMetrics,
367    /// An atomic bool that tells prewarm tasks to not start any more execution.
368    pub(super) terminate_execution: Arc<AtomicBool>,
369    pub(super) precompile_cache_disabled: bool,
370    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
371}
372
373impl<N, P, Evm> PrewarmContext<N, P, Evm>
374where
375    N: NodePrimitives,
376    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
377    Evm: ConfigureEvm<Primitives = N> + 'static,
378{
379    /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
380    /// execution.
381    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
382    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
383        let Self {
384            env,
385            evm_config,
386            saved_cache,
387            provider,
388            metrics,
389            terminate_execution,
390            precompile_cache_disabled,
391            mut precompile_cache_map,
392        } = self;
393
394        let state_provider = match provider.build() {
395            Ok(provider) => provider,
396            Err(err) => {
397                trace!(
398                    target: "engine::tree::payload_processor::prewarm",
399                    %err,
400                    "Failed to build state provider in prewarm thread"
401                );
402                return None
403            }
404        };
405
406        // Use the caches to create a new provider with caching
407        let state_provider: StateProviderBox = if let Some(saved_cache) = saved_cache {
408            let caches = saved_cache.cache().clone();
409            let cache_metrics = saved_cache.metrics().clone();
410            Box::new(CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics))
411        } else {
412            state_provider
413        };
414
415        let state_provider = StateProviderDatabase::new(state_provider);
416
417        let mut evm_env = env.evm_env;
418
419        // we must disable the nonce check so that we can execute the transaction even if the nonce
420        // doesn't match what's on chain.
421        evm_env.cfg_env.disable_nonce_check = true;
422
423        // create a new executor and disable nonce checks in the env
424        let spec_id = *evm_env.spec_id();
425        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
426
427        if !precompile_cache_disabled {
428            // Only cache pure precompiles to avoid issues with stateful precompiles
429            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
430                CachedPrecompile::wrap(
431                    precompile,
432                    precompile_cache_map.cache_for_address(*address),
433                    spec_id,
434                    None, // No metrics for prewarm
435                )
436            });
437        }
438
439        Some((evm, metrics, terminate_execution))
440    }
441
442    /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
443    /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
444    ///
445    /// This function processes transactions sequentially from the receiver and emits outcome events
446    /// via the provided sender. Execution errors are logged and tracked but do not stop the batch
447    /// processing unless the task is explicitly cancelled.
448    ///
449    /// Note: There are no ordering guarantees; this does not reflect the state produced by
450    /// sequential execution.
451    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
452    fn transact_batch<Tx>(
453        self,
454        txs: mpsc::Receiver<IndexedTransaction<Tx>>,
455        sender: Sender<PrewarmTaskEvent>,
456        done_tx: Sender<()>,
457    ) where
458        Tx: ExecutableTxFor<Evm>,
459    {
460        let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
461
462        while let Ok(IndexedTransaction { index, tx }) = {
463            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
464                .entered();
465            txs.recv()
466        } {
467            let enter =
468                debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
469                    .entered();
470
471            // create the tx env
472            let start = Instant::now();
473
474            // If the task was cancelled, stop execution, send an empty result to notify the task,
475            // and exit.
476            if terminate_execution.load(Ordering::Relaxed) {
477                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
478                break
479            }
480
481            let res = match evm.transact(&tx) {
482                Ok(res) => res,
483                Err(err) => {
484                    trace!(
485                        target: "engine::tree::payload_processor::prewarm",
486                        %err,
487                        tx_hash=%tx.tx().tx_hash(),
488                        sender=%tx.signer(),
489                        "Error when executing prewarm transaction",
490                    );
491                    // Track transaction execution errors
492                    metrics.transaction_errors.increment(1);
493                    // skip error because we can ignore these errors and continue with the next tx
494                    continue
495                }
496            };
497            metrics.execution_duration.record(start.elapsed());
498
499            // record some basic information about the transactions
500            enter.record("gas_used", res.result.gas_used());
501            enter.record("is_success", res.result.is_success());
502
503            drop(enter);
504
505            // If the task was cancelled, stop execution, send an empty result to notify the task,
506            // and exit.
507            if terminate_execution.load(Ordering::Relaxed) {
508                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
509                break
510            }
511
512            // Only send outcome for transactions after the first txn
513            // as the main execution will be just as fast
514            if index > 0 {
515                let _enter =
516                    debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
517                        .entered();
518                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
519                metrics.prefetch_storage_targets.record(storage_targets as f64);
520                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
521                drop(_enter);
522            }
523
524            metrics.total_runtime.record(start.elapsed());
525        }
526
527        // send a message to the main task to flag that we're done
528        let _ = done_tx.send(());
529    }
530
531    /// Spawns a worker task for transaction execution and returns its sender channel.
532    fn spawn_worker<Tx>(
533        &self,
534        idx: usize,
535        executor: &WorkloadExecutor,
536        actions_tx: Sender<PrewarmTaskEvent>,
537        done_tx: Sender<()>,
538    ) -> mpsc::Sender<IndexedTransaction<Tx>>
539    where
540        Tx: ExecutableTxFor<Evm> + Send + 'static,
541    {
542        let (tx, rx) = mpsc::channel();
543        let ctx = self.clone();
544        let span =
545            debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
546
547        executor.spawn_blocking(move || {
548            let _enter = span.entered();
549            ctx.transact_batch(rx, actions_tx, done_tx);
550        });
551
552        tx
553    }
554}
555
556/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
557/// given state.
558fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
559    let mut targets = MultiProofTargets::with_capacity(state.len());
560    let mut storage_targets = 0;
561    for (addr, account) in state {
562        // if the account was not touched, or if the account was selfdestructed, do not
563        // fetch proofs for it
564        //
565        // Since selfdestruct can only happen in the same transaction, we can skip
566        // prefetching proofs for selfdestructed accounts
567        //
568        // See: https://eips.ethereum.org/EIPS/eip-6780
569        if !account.is_touched() || account.is_selfdestructed() {
570            continue
571        }
572
573        let mut storage_set =
574            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
575        for (key, slot) in account.storage {
576            // do nothing if unchanged
577            if !slot.is_changed() {
578                continue
579            }
580
581            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
582        }
583
584        storage_targets += storage_set.len();
585        targets.insert(keccak256(addr), storage_set);
586    }
587
588    (targets, storage_targets)
589}
590
591/// The events the pre-warm task can handle.
592pub(super) enum PrewarmTaskEvent {
593    /// Forcefully terminate all remaining transaction execution.
594    TerminateTransactionExecution,
595    /// Forcefully terminate the task on demand and update the shared cache with the given output
596    /// before exiting.
597    Terminate {
598        /// The final block state output.
599        block_output: Option<BundleState>,
600    },
601    /// The outcome of a pre-warm task
602    Outcome {
603        /// The prepared proof targets based on the evm state outcome
604        proof_targets: Option<MultiProofTargets>,
605    },
606    /// Finished executing all transactions
607    FinishedTxExecution {
608        /// Number of transactions executed
609        executed_transactions: usize,
610    },
611}
612
613/// Metrics for transactions prewarming.
614#[derive(Metrics, Clone)]
615#[metrics(scope = "sync.prewarm")]
616pub(crate) struct PrewarmMetrics {
617    /// The number of transactions to prewarm
618    pub(crate) transactions: Gauge,
619    /// A histogram of the number of transactions to prewarm
620    pub(crate) transactions_histogram: Histogram,
621    /// A histogram of duration per transaction prewarming
622    pub(crate) total_runtime: Histogram,
623    /// A histogram of EVM execution duration per transaction prewarming
624    pub(crate) execution_duration: Histogram,
625    /// A histogram for prefetch targets per transaction prewarming
626    pub(crate) prefetch_storage_targets: Histogram,
627    /// A histogram of duration for cache saving
628    pub(crate) cache_saving_duration: Gauge,
629    /// Counter for transaction execution errors during prewarming
630    pub(crate) transaction_errors: Counter,
631}