reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    cached_state::{CachedStateProvider, SavedCache},
16    payload_processor::{
17        executor::WorkloadExecutor, multiproof::MultiProofMessage,
18        ExecutionCache as PayloadExecutionCache,
19    },
20    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
21    ExecutionEnv, StateProviderBuilder,
22};
23use alloy_consensus::transaction::TxHashRef;
24use alloy_eips::Typed2718;
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use metrics::{Counter, Gauge, Histogram};
28use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
29use reth_metrics::Metrics;
30use reth_primitives_traits::NodePrimitives;
31use reth_provider::{BlockReader, StateProviderFactory, StateReader};
32use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
33use reth_trie::MultiProofTargets;
34use std::{
35    sync::{
36        atomic::{AtomicBool, Ordering},
37        mpsc::{self, channel, Receiver, Sender},
38        Arc,
39    },
40    time::Instant,
41};
42use tracing::{debug, trace, warn};
43
44/// A wrapper for transactions that includes their index in the block.
45#[derive(Clone)]
46struct IndexedTransaction<Tx> {
47    /// The transaction index in the block.
48    index: usize,
49    /// The wrapped transaction.
50    tx: Tx,
51}
52
53/// Maximum standard Ethereum transaction type value.
54///
55/// Standard transaction types are:
56/// - Type 0: Legacy transactions (original Ethereum)
57/// - Type 1: EIP-2930 (access list transactions)
58/// - Type 2: EIP-1559 (dynamic fee transactions)
59/// - Type 3: EIP-4844 (blob transactions)
60/// - Type 4: EIP-7702 (set code authorization transactions)
61///
62/// Any transaction with a type > 4 is considered a non-standard/system transaction,
63/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
64const MAX_STANDARD_TX_TYPE: u8 = 4;
65
66/// A task that is responsible for caching and prewarming the cache by executing transactions
67/// individually in parallel.
68///
69/// Note: This task runs until cancelled externally.
70pub(super) struct PrewarmCacheTask<N, P, Evm>
71where
72    N: NodePrimitives,
73    Evm: ConfigureEvm<Primitives = N>,
74{
75    /// The executor used to spawn execution tasks.
76    executor: WorkloadExecutor,
77    /// Shared execution cache.
78    execution_cache: PayloadExecutionCache,
79    /// Context provided to execution tasks
80    ctx: PrewarmContext<N, P, Evm>,
81    /// How many transactions should be executed in parallel
82    max_concurrency: usize,
83    /// The number of transactions to be processed
84    transaction_count_hint: usize,
85    /// Sender to emit evm state outcome messages, if any.
86    to_multi_proof: Option<Sender<MultiProofMessage>>,
87    /// Receiver for events produced by tx execution
88    actions_rx: Receiver<PrewarmTaskEvent>,
89}
90
91impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
92where
93    N: NodePrimitives,
94    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
95    Evm: ConfigureEvm<Primitives = N> + 'static,
96{
97    /// Initializes the task with the given transactions pending execution
98    pub(super) fn new(
99        executor: WorkloadExecutor,
100        execution_cache: PayloadExecutionCache,
101        ctx: PrewarmContext<N, P, Evm>,
102        to_multi_proof: Option<Sender<MultiProofMessage>>,
103        transaction_count_hint: usize,
104        max_concurrency: usize,
105    ) -> (Self, Sender<PrewarmTaskEvent>) {
106        let (actions_tx, actions_rx) = channel();
107
108        trace!(
109            target: "engine::tree::prewarm",
110            max_concurrency,
111            transaction_count_hint,
112            "Initialized prewarm task"
113        );
114
115        (
116            Self {
117                executor,
118                execution_cache,
119                ctx,
120                max_concurrency,
121                transaction_count_hint,
122                to_multi_proof,
123                actions_rx,
124            },
125            actions_tx,
126        )
127    }
128
129    /// Spawns all pending transactions as blocking tasks by first chunking them.
130    ///
131    /// For Optimism chains, special handling is applied to the first transaction if it's a
132    /// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
133    /// subsequent transactions in the block.
134    fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
135    where
136        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
137    {
138        let executor = self.executor.clone();
139        let ctx = self.ctx.clone();
140        let max_concurrency = self.max_concurrency;
141        let transaction_count_hint = self.transaction_count_hint;
142
143        self.executor.spawn_blocking(move || {
144            let (done_tx, done_rx) = mpsc::channel();
145            let mut executing = 0usize;
146
147            // Initialize worker handles container
148            let mut handles = Vec::with_capacity(max_concurrency);
149
150            // When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
151            // max workers to handle potentially many transactions in parallel rather
152            // than bottlenecking on a single worker.
153            let workers_needed = if transaction_count_hint == 0 {
154                max_concurrency
155            } else {
156                transaction_count_hint.min(max_concurrency)
157            };
158
159            // Only spawn initial workers as needed
160            for _ in 0..workers_needed {
161                handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
162            }
163
164            let mut tx_index = 0usize;
165
166            // Handle first transaction - special case for system transactions
167            if let Ok(first_tx) = pending.recv() {
168                // Move the transaction into the indexed wrapper to avoid an extra clone
169                let indexed_tx = IndexedTransaction { index: tx_index, tx: first_tx };
170                // Compute metadata from the moved value
171                let tx_ref = indexed_tx.tx.tx();
172                let is_system_tx = tx_ref.ty() > MAX_STANDARD_TX_TYPE;
173                let first_tx_hash = tx_ref.tx_hash();
174
175                // Check if this is a system transaction (type > 4)
176                // System transactions in the first position typically set critical metadata
177                // that affects all subsequent transactions (e.g., L1 block info, fees on L2s).
178                if is_system_tx {
179                    // Broadcast system transaction to all workers to ensure they have the
180                    // critical state. This is particularly important for L2s like Optimism
181                    // where the first deposit transaction contains essential block metadata.
182                    for handle in &handles {
183                        if let Err(err) = handle.send(indexed_tx.clone()) {
184                            warn!(
185                                target: "engine::tree::prewarm",
186                                tx_hash = %first_tx_hash,
187                                error = %err,
188                                "Failed to send deposit transaction to worker"
189                            );
190                        }
191                    }
192                } else {
193                    // Not a deposit, send to first worker via round-robin
194                    if let Err(err) = handles[0].send(indexed_tx) {
195                        warn!(
196                            target: "engine::tree::prewarm",
197                            task_idx = 0,
198                            error = %err,
199                            "Failed to send transaction to worker"
200                        );
201                    }
202                }
203                executing += 1;
204                tx_index += 1;
205            }
206
207            // Process remaining transactions with round-robin distribution
208            while let Ok(executable) = pending.recv() {
209                let indexed_tx = IndexedTransaction { index: tx_index, tx: executable };
210                let task_idx = executing % workers_needed;
211                if let Err(err) = handles[task_idx].send(indexed_tx) {
212                    warn!(
213                        target: "engine::tree::prewarm",
214                        task_idx,
215                        error = %err,
216                        "Failed to send transaction to worker"
217                    );
218                }
219                executing += 1;
220                tx_index += 1;
221            }
222
223            // drop handle and wait for all tasks to finish and drop theirs
224            drop(done_tx);
225            drop(handles);
226            while done_rx.recv().is_ok() {}
227
228            let _ = actions_tx
229                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: executing });
230        });
231    }
232
233    /// If configured and the tx returned proof targets, emit the targets the transaction produced
234    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
235        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
236            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
237        }
238    }
239
240    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
241    /// It should only be called after ensuring that:
242    /// 1. All prewarming tasks have completed execution
243    /// 2. No other concurrent operations are accessing the cache
244    ///
245    /// Saves the warmed caches back into the shared slot after prewarming completes.
246    ///
247    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
248    /// the new, warmed cache to be inserted.
249    ///
250    /// This method is called from `run()` only after all execution tasks are complete.
251    fn save_cache(self, state: BundleState) {
252        let start = Instant::now();
253
254        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
255            self;
256        let hash = env.hash;
257
258        // Perform all cache operations atomically under the lock
259        execution_cache.update_with_guard(|cached| {
260
261            // consumes the `SavedCache` held by the prewarming task, which releases its usage guard
262            let (caches, cache_metrics) = saved_cache.split();
263            let new_cache = SavedCache::new(hash, caches, cache_metrics);
264
265            // Insert state into cache while holding the lock
266            if new_cache.cache().insert_state(&state).is_err() {
267                // Clear the cache on error to prevent having a polluted cache
268                *cached = None;
269                debug!(target: "engine::caching", "cleared execution cache on update error");
270                return;
271            }
272
273            new_cache.update_metrics();
274            debug!(target: "engine::caching", parent_hash=?new_cache.executed_block_hash(), "Updated execution cache");
275
276            // Replace the shared cache with the new one; the previous cache (if any) is dropped.
277            *cached = Some(new_cache);
278        });
279
280        metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
281    }
282
283    /// Executes the task.
284    ///
285    /// This will execute the transactions until all transactions have been processed or the task
286    /// was cancelled.
287    pub(super) fn run(
288        self,
289        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
290        actions_tx: Sender<PrewarmTaskEvent>,
291    ) {
292        // spawn execution tasks.
293        self.spawn_all(pending, actions_tx);
294
295        let mut final_block_output = None;
296        let mut finished_execution = false;
297        while let Ok(event) = self.actions_rx.recv() {
298            match event {
299                PrewarmTaskEvent::TerminateTransactionExecution => {
300                    // stop tx processing
301                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
302                }
303                PrewarmTaskEvent::Outcome { proof_targets } => {
304                    // completed executing a set of transactions
305                    self.send_multi_proof_targets(proof_targets);
306                }
307                PrewarmTaskEvent::Terminate { block_output } => {
308                    trace!(target: "engine::tree::prewarm", "Received termination signal");
309                    final_block_output = Some(block_output);
310
311                    if finished_execution {
312                        // all tasks are done, we can exit, which will save caches and exit
313                        break
314                    }
315                }
316                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
317                    trace!(target: "engine::tree::prewarm", "Finished prewarm execution signal");
318                    self.ctx.metrics.transactions.set(executed_transactions as f64);
319                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
320
321                    finished_execution = true;
322
323                    if final_block_output.is_some() {
324                        // all tasks are done, we can exit, which will save caches and exit
325                        break
326                    }
327                }
328            }
329        }
330
331        trace!(target: "engine::tree::prewarm", "Completed prewarm execution");
332
333        // save caches and finish
334        if let Some(Some(state)) = final_block_output {
335            self.save_cache(state);
336        }
337    }
338}
339
340/// Context required by tx execution tasks.
341#[derive(Debug, Clone)]
342pub(super) struct PrewarmContext<N, P, Evm>
343where
344    N: NodePrimitives,
345    Evm: ConfigureEvm<Primitives = N>,
346{
347    pub(super) env: ExecutionEnv<Evm>,
348    pub(super) evm_config: Evm,
349    pub(super) saved_cache: SavedCache,
350    /// Provider to obtain the state
351    pub(super) provider: StateProviderBuilder<N, P>,
352    pub(super) metrics: PrewarmMetrics,
353    /// An atomic bool that tells prewarm tasks to not start any more execution.
354    pub(super) terminate_execution: Arc<AtomicBool>,
355    pub(super) precompile_cache_disabled: bool,
356    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
357}
358
359impl<N, P, Evm> PrewarmContext<N, P, Evm>
360where
361    N: NodePrimitives,
362    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
363    Evm: ConfigureEvm<Primitives = N> + 'static,
364{
365    /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
366    /// execution.
367    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
368        let Self {
369            env,
370            evm_config,
371            saved_cache,
372            provider,
373            metrics,
374            terminate_execution,
375            precompile_cache_disabled,
376            mut precompile_cache_map,
377        } = self;
378
379        let state_provider = match provider.build() {
380            Ok(provider) => provider,
381            Err(err) => {
382                trace!(
383                    target: "engine::tree",
384                    %err,
385                    "Failed to build state provider in prewarm thread"
386                );
387                return None
388            }
389        };
390
391        // Use the caches to create a new provider with caching
392        let caches = saved_cache.cache().clone();
393        let cache_metrics = saved_cache.metrics().clone();
394        let state_provider =
395            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
396
397        let state_provider = StateProviderDatabase::new(state_provider);
398
399        let mut evm_env = env.evm_env;
400
401        // we must disable the nonce check so that we can execute the transaction even if the nonce
402        // doesn't match what's on chain.
403        evm_env.cfg_env.disable_nonce_check = true;
404
405        // create a new executor and disable nonce checks in the env
406        let spec_id = *evm_env.spec_id();
407        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
408
409        if !precompile_cache_disabled {
410            // Only cache pure precompiles to avoid issues with stateful precompiles
411            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
412                CachedPrecompile::wrap(
413                    precompile,
414                    precompile_cache_map.cache_for_address(*address),
415                    spec_id,
416                    None, // No metrics for prewarm
417                )
418            });
419        }
420
421        Some((evm, metrics, terminate_execution))
422    }
423
424    /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
425    /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
426    ///
427    /// Returns `None` if executing the transactions failed to a non Revert error.
428    /// Returns the touched+modified state of the transaction.
429    ///
430    /// Note: There are no ordering guarantees; this does not reflect the state produced by
431    /// sequential execution.
432    fn transact_batch<Tx>(
433        self,
434        txs: mpsc::Receiver<IndexedTransaction<Tx>>,
435        sender: Sender<PrewarmTaskEvent>,
436        done_tx: Sender<()>,
437    ) where
438        Tx: ExecutableTxFor<Evm>,
439    {
440        let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
441
442        while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
443            // If the task was cancelled, stop execution, send an empty result to notify the task,
444            // and exit.
445            if terminate_execution.load(Ordering::Relaxed) {
446                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
447                break
448            }
449
450            // create the tx env
451            let start = Instant::now();
452            let res = match evm.transact(&tx) {
453                Ok(res) => res,
454                Err(err) => {
455                    trace!(
456                        target: "engine::tree::prewarm",
457                        %err,
458                        tx_hash=%tx.tx().tx_hash(),
459                        sender=%tx.signer(),
460                        "Error when executing prewarm transaction",
461                    );
462                    // Track transaction execution errors
463                    metrics.transaction_errors.increment(1);
464                    // skip error because we can ignore these errors and continue with the next tx
465                    continue
466                }
467            };
468            metrics.execution_duration.record(start.elapsed());
469
470            // Only send outcome for transactions after the first txn
471            // as the main execution will be just as fast
472            if index > 0 {
473                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
474                metrics.prefetch_storage_targets.record(storage_targets as f64);
475                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
476            }
477
478            metrics.total_runtime.record(start.elapsed());
479        }
480
481        // send a message to the main task to flag that we're done
482        let _ = done_tx.send(());
483    }
484
485    /// Spawns a worker task for transaction execution and returns its sender channel.
486    fn spawn_worker<Tx>(
487        &self,
488        executor: &WorkloadExecutor,
489        actions_tx: Sender<PrewarmTaskEvent>,
490        done_tx: Sender<()>,
491    ) -> mpsc::Sender<IndexedTransaction<Tx>>
492    where
493        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
494    {
495        let (tx, rx) = mpsc::channel();
496        let ctx = self.clone();
497
498        executor.spawn_blocking(move || {
499            ctx.transact_batch(rx, actions_tx, done_tx);
500        });
501
502        tx
503    }
504}
505
506/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
507/// given state.
508fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
509    let mut targets = MultiProofTargets::with_capacity(state.len());
510    let mut storage_targets = 0;
511    for (addr, account) in state {
512        // if the account was not touched, or if the account was selfdestructed, do not
513        // fetch proofs for it
514        //
515        // Since selfdestruct can only happen in the same transaction, we can skip
516        // prefetching proofs for selfdestructed accounts
517        //
518        // See: https://eips.ethereum.org/EIPS/eip-6780
519        if !account.is_touched() || account.is_selfdestructed() {
520            continue
521        }
522
523        let mut storage_set =
524            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
525        for (key, slot) in account.storage {
526            // do nothing if unchanged
527            if !slot.is_changed() {
528                continue
529            }
530
531            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
532        }
533
534        storage_targets += storage_set.len();
535        targets.insert(keccak256(addr), storage_set);
536    }
537
538    (targets, storage_targets)
539}
540
541/// The events the pre-warm task can handle.
542pub(super) enum PrewarmTaskEvent {
543    /// Forcefully terminate all remaining transaction execution.
544    TerminateTransactionExecution,
545    /// Forcefully terminate the task on demand and update the shared cache with the given output
546    /// before exiting.
547    Terminate {
548        /// The final block state output.
549        block_output: Option<BundleState>,
550    },
551    /// The outcome of a pre-warm task
552    Outcome {
553        /// The prepared proof targets based on the evm state outcome
554        proof_targets: Option<MultiProofTargets>,
555    },
556    /// Finished executing all transactions
557    FinishedTxExecution {
558        /// Number of transactions executed
559        executed_transactions: usize,
560    },
561}
562
563/// Metrics for transactions prewarming.
564#[derive(Metrics, Clone)]
565#[metrics(scope = "sync.prewarm")]
566pub(crate) struct PrewarmMetrics {
567    /// The number of transactions to prewarm
568    pub(crate) transactions: Gauge,
569    /// A histogram of the number of transactions to prewarm
570    pub(crate) transactions_histogram: Histogram,
571    /// A histogram of duration per transaction prewarming
572    pub(crate) total_runtime: Histogram,
573    /// A histogram of EVM execution duration per transaction prewarming
574    pub(crate) execution_duration: Histogram,
575    /// A histogram for prefetch targets per transaction prewarming
576    pub(crate) prefetch_storage_targets: Histogram,
577    /// A histogram of duration for cache saving
578    pub(crate) cache_saving_duration: Gauge,
579    /// Counter for transaction execution errors during prewarming
580    pub(crate) transaction_errors: Counter,
581}