reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5    payload_processor::{
6        executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7    },
8    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
9    ExecutionEnv, StateProviderBuilder,
10};
11use alloy_evm::Database;
12use alloy_primitives::{keccak256, map::B256Set, B256};
13use metrics::{Gauge, Histogram};
14use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
15use reth_metrics::Metrics;
16use reth_primitives_traits::{NodePrimitives, SignedTransaction};
17use reth_provider::{BlockReader, StateProviderFactory, StateReader};
18use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
19use reth_trie::MultiProofTargets;
20use std::{
21    sync::{
22        atomic::{AtomicBool, Ordering},
23        mpsc::{self, channel, Receiver, Sender},
24        Arc,
25    },
26    time::Instant,
27};
28use tracing::{debug, trace};
29
30/// A task that is responsible for caching and prewarming the cache by executing transactions
31/// individually in parallel.
32///
33/// Note: This task runs until cancelled externally.
34pub(super) struct PrewarmCacheTask<N, P, Evm>
35where
36    N: NodePrimitives,
37    Evm: ConfigureEvm<Primitives = N>,
38{
39    /// The executor used to spawn execution tasks.
40    executor: WorkloadExecutor,
41    /// Shared execution cache.
42    execution_cache: ExecutionCache,
43    /// Context provided to execution tasks
44    ctx: PrewarmContext<N, P, Evm>,
45    /// How many transactions should be executed in parallel
46    max_concurrency: usize,
47    /// Sender to emit evm state outcome messages, if any.
48    to_multi_proof: Option<Sender<MultiProofMessage>>,
49    /// Receiver for events produced by tx execution
50    actions_rx: Receiver<PrewarmTaskEvent>,
51}
52
53impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
54where
55    N: NodePrimitives,
56    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
57    Evm: ConfigureEvm<Primitives = N> + 'static,
58{
59    /// Initializes the task with the given transactions pending execution
60    pub(super) fn new(
61        executor: WorkloadExecutor,
62        execution_cache: ExecutionCache,
63        ctx: PrewarmContext<N, P, Evm>,
64        to_multi_proof: Option<Sender<MultiProofMessage>>,
65    ) -> (Self, Sender<PrewarmTaskEvent>) {
66        let (actions_tx, actions_rx) = channel();
67        (
68            Self {
69                executor,
70                execution_cache,
71                ctx,
72                max_concurrency: 64,
73                to_multi_proof,
74                actions_rx,
75            },
76            actions_tx,
77        )
78    }
79
80    /// Spawns all pending transactions as blocking tasks by first chunking them.
81    fn spawn_all(
82        &self,
83        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
84        actions_tx: Sender<PrewarmTaskEvent>,
85    ) {
86        let executor = self.executor.clone();
87        let ctx = self.ctx.clone();
88        let max_concurrency = self.max_concurrency;
89
90        self.executor.spawn_blocking(move || {
91            let mut handles = Vec::with_capacity(max_concurrency);
92            let (done_tx, done_rx) = mpsc::channel();
93            let mut executing = 0;
94            while let Ok(executable) = pending.recv() {
95                let task_idx = executing % max_concurrency;
96
97                if handles.len() <= task_idx {
98                    let (tx, rx) = mpsc::channel();
99                    let sender = actions_tx.clone();
100                    let ctx = ctx.clone();
101                    let done_tx = done_tx.clone();
102
103                    executor.spawn_blocking(move || {
104                        ctx.transact_batch(rx, sender, done_tx);
105                    });
106
107                    handles.push(tx);
108                }
109
110                let _ = handles[task_idx].send(executable);
111
112                executing += 1;
113            }
114
115            // drop handle and wait for all tasks to finish and drop theirs
116            drop(done_tx);
117            drop(handles);
118            while done_rx.recv().is_ok() {}
119
120            let _ = actions_tx
121                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: executing });
122        });
123    }
124
125    /// If configured and the tx returned proof targets, emit the targets the transaction produced
126    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
127        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
128            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
129        }
130    }
131
132    /// Save the state to the shared cache for the given block.
133    fn save_cache(self, state: BundleState) {
134        let start = Instant::now();
135        let cache = SavedCache::new(
136            self.ctx.env.hash,
137            self.ctx.cache.clone(),
138            self.ctx.cache_metrics.clone(),
139        );
140        if cache.cache().insert_state(&state).is_err() {
141            return
142        }
143
144        cache.update_metrics();
145
146        debug!(target: "engine::caching", "Updated state caches");
147
148        // update the cache for the executed block
149        self.execution_cache.save_cache(cache);
150        self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
151    }
152
153    /// Executes the task.
154    ///
155    /// This will execute the transactions until all transactions have been processed or the task
156    /// was cancelled.
157    pub(super) fn run(
158        self,
159        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
160        actions_tx: Sender<PrewarmTaskEvent>,
161    ) {
162        // spawn execution tasks.
163        self.spawn_all(pending, actions_tx);
164
165        let mut final_block_output = None;
166        let mut finished_execution = false;
167        while let Ok(event) = self.actions_rx.recv() {
168            match event {
169                PrewarmTaskEvent::TerminateTransactionExecution => {
170                    // stop tx processing
171                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
172                }
173                PrewarmTaskEvent::Outcome { proof_targets } => {
174                    // completed executing a set of transactions
175                    self.send_multi_proof_targets(proof_targets);
176                }
177                PrewarmTaskEvent::Terminate { block_output } => {
178                    trace!(target: "engine::tree::prewarm", "Received termination signal");
179                    final_block_output = Some(block_output);
180
181                    if finished_execution {
182                        // all tasks are done, we can exit, which will save caches and exit
183                        break
184                    }
185                }
186                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
187                    trace!(target: "engine::tree::prewarm", "Finished prewarm execution signal");
188                    self.ctx.metrics.transactions.set(executed_transactions as f64);
189                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
190
191                    finished_execution = true;
192
193                    if final_block_output.is_some() {
194                        // all tasks are done, we can exit, which will save caches and exit
195                        break
196                    }
197                }
198            }
199        }
200
201        trace!(target: "engine::tree::prewarm", "Completed prewarm execution");
202
203        // save caches and finish
204        if let Some(Some(state)) = final_block_output {
205            self.save_cache(state);
206        }
207    }
208}
209
210/// Context required by tx execution tasks.
211#[derive(Debug, Clone)]
212pub(super) struct PrewarmContext<N, P, Evm>
213where
214    N: NodePrimitives,
215    Evm: ConfigureEvm<Primitives = N>,
216{
217    pub(super) env: ExecutionEnv<Evm>,
218    pub(super) evm_config: Evm,
219    pub(super) cache: ProviderCaches,
220    pub(super) cache_metrics: CachedStateMetrics,
221    /// Provider to obtain the state
222    pub(super) provider: StateProviderBuilder<N, P>,
223    pub(super) metrics: PrewarmMetrics,
224    /// An atomic bool that tells prewarm tasks to not start any more execution.
225    pub(super) terminate_execution: Arc<AtomicBool>,
226    pub(super) precompile_cache_disabled: bool,
227    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
228}
229
230impl<N, P, Evm> PrewarmContext<N, P, Evm>
231where
232    N: NodePrimitives,
233    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
234    Evm: ConfigureEvm<Primitives = N> + 'static,
235{
236    /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
237    /// execution.
238    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
239        let Self {
240            env,
241            evm_config,
242            cache: caches,
243            cache_metrics,
244            provider,
245            metrics,
246            terminate_execution,
247            precompile_cache_disabled,
248            mut precompile_cache_map,
249        } = self;
250
251        let state_provider = match provider.build() {
252            Ok(provider) => provider,
253            Err(err) => {
254                trace!(
255                    target: "engine::tree",
256                    %err,
257                    "Failed to build state provider in prewarm thread"
258                );
259                return None
260            }
261        };
262
263        // Use the caches to create a new provider with caching
264        let state_provider =
265            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
266
267        let state_provider = StateProviderDatabase::new(state_provider);
268
269        let mut evm_env = env.evm_env;
270
271        // we must disable the nonce check so that we can execute the transaction even if the nonce
272        // doesn't match what's on chain.
273        evm_env.cfg_env.disable_nonce_check = true;
274
275        // create a new executor and disable nonce checks in the env
276        let spec_id = *evm_env.spec_id();
277        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
278
279        if !precompile_cache_disabled {
280            // Only cache pure precompiles to avoid issues with stateful precompiles
281            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
282                CachedPrecompile::wrap(
283                    precompile,
284                    precompile_cache_map.cache_for_address(*address),
285                    spec_id,
286                    None, // No metrics for prewarm
287                )
288            });
289        }
290
291        Some((evm, metrics, terminate_execution))
292    }
293
294    /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
295    /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
296    ///
297    /// Returns `None` if executing the transactions failed to a non Revert error.
298    /// Returns the touched+modified state of the transaction.
299    ///
300    /// Note: Since here are no ordering guarantees this won't the state the txs produce when
301    /// executed sequentially.
302    fn transact_batch(
303        self,
304        txs: mpsc::Receiver<impl ExecutableTxFor<Evm>>,
305        sender: Sender<PrewarmTaskEvent>,
306        done_tx: Sender<()>,
307    ) {
308        let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
309
310        while let Ok(tx) = txs.recv() {
311            // If the task was cancelled, stop execution, send an empty result to notify the task,
312            // and exit.
313            if terminate_execution.load(Ordering::Relaxed) {
314                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
315                break
316            }
317
318            // create the tx env
319            let start = Instant::now();
320            let res = match evm.transact(&tx) {
321                Ok(res) => res,
322                Err(err) => {
323                    trace!(
324                        target: "engine::tree",
325                        %err,
326                        tx_hash=%tx.tx().tx_hash(),
327                        sender=%tx.signer(),
328                        "Error when executing prewarm transaction",
329                    );
330                    return
331                }
332            };
333            metrics.execution_duration.record(start.elapsed());
334
335            let (targets, storage_targets) = multiproof_targets_from_state(res.state);
336            metrics.prefetch_storage_targets.record(storage_targets as f64);
337            metrics.total_runtime.record(start.elapsed());
338
339            let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
340        }
341
342        // send a message to the main task to flag that we're done
343        let _ = done_tx.send(());
344    }
345}
346
347/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
348/// given state.
349fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
350    let mut targets = MultiProofTargets::with_capacity(state.len());
351    let mut storage_targets = 0;
352    for (addr, account) in state {
353        // if the account was not touched, or if the account was selfdestructed, do not
354        // fetch proofs for it
355        //
356        // Since selfdestruct can only happen in the same transaction, we can skip
357        // prefetching proofs for selfdestructed accounts
358        //
359        // See: https://eips.ethereum.org/EIPS/eip-6780
360        if !account.is_touched() || account.is_selfdestructed() {
361            continue
362        }
363
364        let mut storage_set =
365            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
366        for (key, slot) in account.storage {
367            // do nothing if unchanged
368            if !slot.is_changed() {
369                continue
370            }
371
372            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
373        }
374
375        storage_targets += storage_set.len();
376        targets.insert(keccak256(addr), storage_set);
377    }
378
379    (targets, storage_targets)
380}
381
382/// The events the pre-warm task can handle.
383pub(super) enum PrewarmTaskEvent {
384    /// Forcefully terminate all remaining transaction execution.
385    TerminateTransactionExecution,
386    /// Forcefully terminate the task on demand and update the shared cache with the given output
387    /// before exiting.
388    Terminate {
389        /// The final block state output.
390        block_output: Option<BundleState>,
391    },
392    /// The outcome of a pre-warm task
393    Outcome {
394        /// The prepared proof targets based on the evm state outcome
395        proof_targets: Option<MultiProofTargets>,
396    },
397    /// Finished executing all transactions
398    FinishedTxExecution {
399        /// Number of transactions executed
400        executed_transactions: usize,
401    },
402}
403
404/// Metrics for transactions prewarming.
405#[derive(Metrics, Clone)]
406#[metrics(scope = "sync.prewarm")]
407pub(crate) struct PrewarmMetrics {
408    /// The number of transactions to prewarm
409    pub(crate) transactions: Gauge,
410    /// A histogram of the number of transactions to prewarm
411    pub(crate) transactions_histogram: Histogram,
412    /// A histogram of duration per transaction prewarming
413    pub(crate) total_runtime: Histogram,
414    /// A histogram of EVM execution duration per transaction prewarming
415    pub(crate) execution_duration: Histogram,
416    /// A histogram for prefetch targets per transaction prewarming
417    pub(crate) prefetch_storage_targets: Histogram,
418    /// A histogram of duration for cache saving
419    pub(crate) cache_saving_duration: Gauge,
420}