reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    cached_state::{
16        CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache, SavedCache,
17    },
18    payload_processor::{
19        executor::WorkloadExecutor, multiproof::MultiProofMessage,
20        ExecutionCache as PayloadExecutionCache,
21    },
22    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
23    ExecutionEnv, StateProviderBuilder,
24};
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use metrics::{Counter, Gauge, Histogram};
28use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
29use reth_metrics::Metrics;
30use reth_primitives_traits::{NodePrimitives, SignedTransaction};
31use reth_provider::{BlockReader, StateProviderFactory, StateReader};
32use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
33use reth_trie::MultiProofTargets;
34use std::{
35    sync::{
36        atomic::{AtomicBool, Ordering},
37        mpsc::{self, channel, Receiver, Sender},
38        Arc,
39    },
40    time::Instant,
41};
42use tracing::{debug, trace};
43
44/// A task that is responsible for caching and prewarming the cache by executing transactions
45/// individually in parallel.
46///
47/// Note: This task runs until cancelled externally.
48pub(super) struct PrewarmCacheTask<N, P, Evm>
49where
50    N: NodePrimitives,
51    Evm: ConfigureEvm<Primitives = N>,
52{
53    /// The executor used to spawn execution tasks.
54    executor: WorkloadExecutor,
55    /// Shared execution cache.
56    execution_cache: PayloadExecutionCache,
57    /// Context provided to execution tasks
58    ctx: PrewarmContext<N, P, Evm>,
59    /// How many transactions should be executed in parallel
60    max_concurrency: usize,
61    /// Sender to emit evm state outcome messages, if any.
62    to_multi_proof: Option<Sender<MultiProofMessage>>,
63    /// Receiver for events produced by tx execution
64    actions_rx: Receiver<PrewarmTaskEvent>,
65}
66
67impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
68where
69    N: NodePrimitives,
70    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
71    Evm: ConfigureEvm<Primitives = N> + 'static,
72{
73    /// Initializes the task with the given transactions pending execution
74    pub(super) fn new(
75        executor: WorkloadExecutor,
76        execution_cache: PayloadExecutionCache,
77        ctx: PrewarmContext<N, P, Evm>,
78        to_multi_proof: Option<Sender<MultiProofMessage>>,
79    ) -> (Self, Sender<PrewarmTaskEvent>) {
80        let (actions_tx, actions_rx) = channel();
81        (
82            Self {
83                executor,
84                execution_cache,
85                ctx,
86                max_concurrency: 64,
87                to_multi_proof,
88                actions_rx,
89            },
90            actions_tx,
91        )
92    }
93
94    /// Spawns all pending transactions as blocking tasks by first chunking them.
95    fn spawn_all(
96        &self,
97        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
98        actions_tx: Sender<PrewarmTaskEvent>,
99    ) {
100        let executor = self.executor.clone();
101        let ctx = self.ctx.clone();
102        let max_concurrency = self.max_concurrency;
103
104        self.executor.spawn_blocking(move || {
105            let mut handles = Vec::with_capacity(max_concurrency);
106            let (done_tx, done_rx) = mpsc::channel();
107            let mut executing = 0;
108            while let Ok(executable) = pending.recv() {
109                let task_idx = executing % max_concurrency;
110
111                if handles.len() <= task_idx {
112                    let (tx, rx) = mpsc::channel();
113                    let sender = actions_tx.clone();
114                    let ctx = ctx.clone();
115                    let done_tx = done_tx.clone();
116
117                    executor.spawn_blocking(move || {
118                        ctx.transact_batch(rx, sender, done_tx);
119                    });
120
121                    handles.push(tx);
122                }
123
124                let _ = handles[task_idx].send(executable);
125
126                executing += 1;
127            }
128
129            // drop handle and wait for all tasks to finish and drop theirs
130            drop(done_tx);
131            drop(handles);
132            while done_rx.recv().is_ok() {}
133
134            let _ = actions_tx
135                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: executing });
136        });
137    }
138
139    /// If configured and the tx returned proof targets, emit the targets the transaction produced
140    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
141        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
142            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
143        }
144    }
145
146    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
147    /// It should only be called after ensuring that:
148    /// 1. All prewarming tasks have completed execution
149    /// 2. No other concurrent operations are accessing the cache
150    /// 3. The prewarming phase has finished (typically signaled by `FinishedTxExecution`)
151    ///
152    /// This method is called from `run()` only after all execution tasks are complete,
153    fn save_cache(self, state: BundleState) {
154        let start = Instant::now();
155
156        // Precompute outside the lock
157        let hash = self.ctx.env.hash;
158        let caches = self.ctx.cache.clone();
159        let metrics = self.ctx.cache_metrics.clone();
160
161        // Perform all cache operations atomically under the lock
162        self.execution_cache.update_with_guard(|cached| {
163            let cache = SavedCache::new(hash, caches, metrics);
164
165            // Insert state into cache while holding the lock
166            if cache.cache().insert_state(&state).is_err() {
167                // Clear the cache on error to prevent having a polluted cache
168                *cached = None;
169                debug!(target: "engine::caching", "cleared execution cache on update error");
170                return;
171            }
172
173            cache.update_metrics();
174            debug!(target: "engine::caching", parent_hash=?cache.executed_block_hash(), "Updated execution cache");
175
176            // Replace the shared cache with the new one; the previous cache (if any) is dropped.
177            *cached = Some(cache);
178        });
179
180        self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
181    }
182
183    /// Executes the task.
184    ///
185    /// This will execute the transactions until all transactions have been processed or the task
186    /// was cancelled.
187    pub(super) fn run(
188        self,
189        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
190        actions_tx: Sender<PrewarmTaskEvent>,
191    ) {
192        // spawn execution tasks.
193        self.spawn_all(pending, actions_tx);
194
195        let mut final_block_output = None;
196        let mut finished_execution = false;
197        while let Ok(event) = self.actions_rx.recv() {
198            match event {
199                PrewarmTaskEvent::TerminateTransactionExecution => {
200                    // stop tx processing
201                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
202                }
203                PrewarmTaskEvent::Outcome { proof_targets } => {
204                    // completed executing a set of transactions
205                    self.send_multi_proof_targets(proof_targets);
206                }
207                PrewarmTaskEvent::Terminate { block_output } => {
208                    trace!(target: "engine::tree::prewarm", "Received termination signal");
209                    final_block_output = Some(block_output);
210
211                    if finished_execution {
212                        // all tasks are done, we can exit, which will save caches and exit
213                        break
214                    }
215                }
216                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
217                    trace!(target: "engine::tree::prewarm", "Finished prewarm execution signal");
218                    self.ctx.metrics.transactions.set(executed_transactions as f64);
219                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
220
221                    finished_execution = true;
222
223                    if final_block_output.is_some() {
224                        // all tasks are done, we can exit, which will save caches and exit
225                        break
226                    }
227                }
228            }
229        }
230
231        trace!(target: "engine::tree::prewarm", "Completed prewarm execution");
232
233        // save caches and finish
234        if let Some(Some(state)) = final_block_output {
235            self.save_cache(state);
236        }
237    }
238}
239
240/// Context required by tx execution tasks.
241#[derive(Debug, Clone)]
242pub(super) struct PrewarmContext<N, P, Evm>
243where
244    N: NodePrimitives,
245    Evm: ConfigureEvm<Primitives = N>,
246{
247    pub(super) env: ExecutionEnv<Evm>,
248    pub(super) evm_config: Evm,
249    pub(super) cache: StateExecutionCache,
250    pub(super) cache_metrics: CachedStateMetrics,
251    /// Provider to obtain the state
252    pub(super) provider: StateProviderBuilder<N, P>,
253    pub(super) metrics: PrewarmMetrics,
254    /// An atomic bool that tells prewarm tasks to not start any more execution.
255    pub(super) terminate_execution: Arc<AtomicBool>,
256    pub(super) precompile_cache_disabled: bool,
257    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
258}
259
260impl<N, P, Evm> PrewarmContext<N, P, Evm>
261where
262    N: NodePrimitives,
263    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
264    Evm: ConfigureEvm<Primitives = N> + 'static,
265{
266    /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
267    /// execution.
268    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
269        let Self {
270            env,
271            evm_config,
272            cache: caches,
273            cache_metrics,
274            provider,
275            metrics,
276            terminate_execution,
277            precompile_cache_disabled,
278            mut precompile_cache_map,
279        } = self;
280
281        let state_provider = match provider.build() {
282            Ok(provider) => provider,
283            Err(err) => {
284                trace!(
285                    target: "engine::tree",
286                    %err,
287                    "Failed to build state provider in prewarm thread"
288                );
289                return None
290            }
291        };
292
293        // Use the caches to create a new provider with caching
294        let state_provider =
295            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
296
297        let state_provider = StateProviderDatabase::new(state_provider);
298
299        let mut evm_env = env.evm_env;
300
301        // we must disable the nonce check so that we can execute the transaction even if the nonce
302        // doesn't match what's on chain.
303        evm_env.cfg_env.disable_nonce_check = true;
304
305        // create a new executor and disable nonce checks in the env
306        let spec_id = *evm_env.spec_id();
307        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
308
309        if !precompile_cache_disabled {
310            // Only cache pure precompiles to avoid issues with stateful precompiles
311            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
312                CachedPrecompile::wrap(
313                    precompile,
314                    precompile_cache_map.cache_for_address(*address),
315                    spec_id,
316                    None, // No metrics for prewarm
317                )
318            });
319        }
320
321        Some((evm, metrics, terminate_execution))
322    }
323
324    /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
325    /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
326    ///
327    /// Returns `None` if executing the transactions failed to a non Revert error.
328    /// Returns the touched+modified state of the transaction.
329    ///
330    /// Note: There are no ordering guarantees; this does not reflect the state produced by
331    /// sequential execution.
332    fn transact_batch(
333        self,
334        txs: mpsc::Receiver<impl ExecutableTxFor<Evm>>,
335        sender: Sender<PrewarmTaskEvent>,
336        done_tx: Sender<()>,
337    ) {
338        let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
339
340        while let Ok(tx) = txs.recv() {
341            // If the task was cancelled, stop execution, send an empty result to notify the task,
342            // and exit.
343            if terminate_execution.load(Ordering::Relaxed) {
344                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
345                break
346            }
347
348            // create the tx env
349            let start = Instant::now();
350            let res = match evm.transact(&tx) {
351                Ok(res) => res,
352                Err(err) => {
353                    trace!(
354                        target: "engine::tree::prewarm",
355                        %err,
356                        tx_hash=%tx.tx().tx_hash(),
357                        sender=%tx.signer(),
358                        "Error when executing prewarm transaction",
359                    );
360                    // Track transaction execution errors
361                    metrics.transaction_errors.increment(1);
362                    // skip error because we can ignore these errors and continue with the next tx
363                    continue
364                }
365            };
366            metrics.execution_duration.record(start.elapsed());
367
368            let (targets, storage_targets) = multiproof_targets_from_state(res.state);
369            metrics.prefetch_storage_targets.record(storage_targets as f64);
370            metrics.total_runtime.record(start.elapsed());
371
372            let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
373        }
374
375        // send a message to the main task to flag that we're done
376        let _ = done_tx.send(());
377    }
378}
379
380/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
381/// given state.
382fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
383    let mut targets = MultiProofTargets::with_capacity(state.len());
384    let mut storage_targets = 0;
385    for (addr, account) in state {
386        // if the account was not touched, or if the account was selfdestructed, do not
387        // fetch proofs for it
388        //
389        // Since selfdestruct can only happen in the same transaction, we can skip
390        // prefetching proofs for selfdestructed accounts
391        //
392        // See: https://eips.ethereum.org/EIPS/eip-6780
393        if !account.is_touched() || account.is_selfdestructed() {
394            continue
395        }
396
397        let mut storage_set =
398            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
399        for (key, slot) in account.storage {
400            // do nothing if unchanged
401            if !slot.is_changed() {
402                continue
403            }
404
405            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
406        }
407
408        storage_targets += storage_set.len();
409        targets.insert(keccak256(addr), storage_set);
410    }
411
412    (targets, storage_targets)
413}
414
415/// The events the pre-warm task can handle.
416pub(super) enum PrewarmTaskEvent {
417    /// Forcefully terminate all remaining transaction execution.
418    TerminateTransactionExecution,
419    /// Forcefully terminate the task on demand and update the shared cache with the given output
420    /// before exiting.
421    Terminate {
422        /// The final block state output.
423        block_output: Option<BundleState>,
424    },
425    /// The outcome of a pre-warm task
426    Outcome {
427        /// The prepared proof targets based on the evm state outcome
428        proof_targets: Option<MultiProofTargets>,
429    },
430    /// Finished executing all transactions
431    FinishedTxExecution {
432        /// Number of transactions executed
433        executed_transactions: usize,
434    },
435}
436
437/// Metrics for transactions prewarming.
438#[derive(Metrics, Clone)]
439#[metrics(scope = "sync.prewarm")]
440pub(crate) struct PrewarmMetrics {
441    /// The number of transactions to prewarm
442    pub(crate) transactions: Gauge,
443    /// A histogram of the number of transactions to prewarm
444    pub(crate) transactions_histogram: Histogram,
445    /// A histogram of duration per transaction prewarming
446    pub(crate) total_runtime: Histogram,
447    /// A histogram of EVM execution duration per transaction prewarming
448    pub(crate) execution_duration: Histogram,
449    /// A histogram for prefetch targets per transaction prewarming
450    pub(crate) prefetch_storage_targets: Histogram,
451    /// A histogram of duration for cache saving
452    pub(crate) cache_saving_duration: Gauge,
453    /// Counter for transaction execution errors during prewarming
454    pub(crate) transaction_errors: Counter,
455}