reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5    payload_processor::{
6        executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7    },
8    StateProviderBuilder,
9};
10use alloy_consensus::transaction::Recovered;
11use alloy_primitives::{keccak256, map::B256Set, B256};
12use metrics::{Gauge, Histogram};
13use reth_evm::{ConfigureEvm, Evm};
14use reth_metrics::Metrics;
15use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction};
16use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader};
17use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
18use reth_trie::MultiProofTargets;
19use std::{
20    collections::VecDeque,
21    sync::mpsc::{channel, Receiver, Sender},
22    time::Instant,
23};
24use tracing::{debug, trace};
25
26/// A task that is responsible for caching and prewarming the cache by executing transactions
27/// individually in parallel.
28///
29/// Note: This task runs until cancelled externally.
30pub(super) struct PrewarmCacheTask<N: NodePrimitives, P, Evm> {
31    /// The executor used to spawn execution tasks.
32    executor: WorkloadExecutor,
33    /// Shared execution cache.
34    execution_cache: ExecutionCache,
35    /// Transactions pending execution.
36    pending: VecDeque<Recovered<N::SignedTx>>,
37    /// Context provided to execution tasks
38    ctx: PrewarmContext<N, P, Evm>,
39    /// How many txs are currently in progress
40    in_progress: usize,
41    /// How many transactions should be executed in parallel
42    max_concurrency: usize,
43    /// Sender to emit evm state outcome messages, if any.
44    to_multi_proof: Option<Sender<MultiProofMessage>>,
45    /// Receiver for events produced by tx execution
46    actions_rx: Receiver<PrewarmTaskEvent>,
47    /// Sender the transactions use to send their result back
48    actions_tx: Sender<PrewarmTaskEvent>,
49}
50
51impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
52where
53    N: NodePrimitives,
54    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
55    Evm: ConfigureEvm<Primitives = N> + 'static,
56{
57    /// Initializes the task with the given transactions pending execution
58    pub(super) fn new(
59        executor: WorkloadExecutor,
60        execution_cache: ExecutionCache,
61        ctx: PrewarmContext<N, P, Evm>,
62        to_multi_proof: Option<Sender<MultiProofMessage>>,
63        pending: VecDeque<Recovered<N::SignedTx>>,
64    ) -> Self {
65        let (actions_tx, actions_rx) = channel();
66        Self {
67            executor,
68            execution_cache,
69            pending,
70            ctx,
71            in_progress: 0,
72            // TODO settings
73            max_concurrency: 4,
74            to_multi_proof,
75            actions_rx,
76            actions_tx,
77        }
78    }
79
80    /// Returns the sender that can communicate with this task.
81    pub(super) fn actions_tx(&self) -> Sender<PrewarmTaskEvent> {
82        self.actions_tx.clone()
83    }
84
85    /// Spawns the next transactions
86    fn spawn_next(&mut self) {
87        while self.in_progress < self.max_concurrency {
88            if let Some(tx) = self.pending.pop_front() {
89                self.spawn_transaction(tx);
90            } else {
91                break
92            }
93        }
94    }
95
96    /// Spawns the given transaction as a blocking task.
97    fn spawn_transaction(&self, tx: Recovered<N::SignedTx>) {
98        let ctx = self.ctx.clone();
99        let metrics = self.ctx.metrics.clone();
100        let actions_tx = self.actions_tx.clone();
101        let prepare_proof_targets = self.should_prepare_multi_proof_targets();
102        self.executor.spawn_blocking(move || {
103            let start = Instant::now();
104            // depending on whether this task needs he proof targets we either just transact or
105            // transact and prepare the targets
106            let proof_targets = if prepare_proof_targets {
107                ctx.prepare_multiproof_targets(tx)
108            } else {
109                ctx.transact(tx);
110                None
111            };
112            let _ = actions_tx.send(PrewarmTaskEvent::Outcome { proof_targets });
113            metrics.total_runtime.record(start.elapsed());
114        });
115    }
116
117    /// Returns true if the tx prewarming tasks should prepare multiproof targets.
118    fn should_prepare_multi_proof_targets(&self) -> bool {
119        self.to_multi_proof.is_some()
120    }
121
122    /// If configured and the tx returned proof targets, emit the targets the transaction produced
123    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
124        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
125            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
126        }
127    }
128
129    /// Save the state to the shared cache for the given block.
130    fn save_cache(&self, state: BundleState) {
131        let start = Instant::now();
132        let cache = SavedCache::new(
133            self.ctx.header.hash(),
134            self.ctx.cache.clone(),
135            self.ctx.cache_metrics.clone(),
136        );
137        if cache.cache().insert_state(&state).is_err() {
138            return
139        }
140
141        cache.update_metrics();
142
143        debug!(target: "engine::caching", "Updated state caches");
144
145        // update the cache for the executed block
146        self.execution_cache.save_cache(cache);
147        self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
148    }
149
150    /// Executes the task.
151    ///
152    /// This will execute the transactions until all transactions have been processed or the task
153    /// was cancelled.
154    pub(super) fn run(mut self) {
155        self.ctx.metrics.transactions.set(self.pending.len() as f64);
156        self.ctx.metrics.transactions_histogram.record(self.pending.len() as f64);
157
158        // spawn execution tasks.
159        self.spawn_next();
160
161        while let Ok(event) = self.actions_rx.recv() {
162            match event {
163                PrewarmTaskEvent::TerminateTransactionExecution => {
164                    // stop tx processing
165                    self.pending.clear();
166                }
167                PrewarmTaskEvent::Outcome { proof_targets } => {
168                    // completed a transaction, frees up one slot
169                    self.in_progress -= 1;
170                    self.send_multi_proof_targets(proof_targets);
171                }
172                PrewarmTaskEvent::Terminate { block_output } => {
173                    // terminate the task
174                    if let Some(state) = block_output {
175                        self.save_cache(state);
176                    }
177
178                    break
179                }
180            }
181
182            // schedule followup transactions
183            self.spawn_next();
184        }
185    }
186}
187
188/// Context required by tx execution tasks.
189#[derive(Debug, Clone)]
190pub(super) struct PrewarmContext<N: NodePrimitives, P, Evm> {
191    pub(super) header: SealedHeaderFor<N>,
192    pub(super) evm_config: Evm,
193    pub(super) cache: ProviderCaches,
194    pub(super) cache_metrics: CachedStateMetrics,
195    /// Provider to obtain the state
196    pub(super) provider: StateProviderBuilder<N, P>,
197    pub(super) metrics: PrewarmMetrics,
198}
199
200impl<N, P, Evm> PrewarmContext<N, P, Evm>
201where
202    N: NodePrimitives,
203    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
204    Evm: ConfigureEvm<Primitives = N> + 'static,
205{
206    /// Transacts the transactions and transform the state into [`MultiProofTargets`].
207    fn prepare_multiproof_targets(self, tx: Recovered<N::SignedTx>) -> Option<MultiProofTargets> {
208        let metrics = self.metrics.clone();
209        let state = self.transact(tx)?;
210
211        let mut targets = MultiProofTargets::with_capacity(state.len());
212        let mut storage_targets = 0;
213
214        for (addr, account) in state {
215            // if the account was not touched, or if the account was selfdestructed, do not
216            // fetch proofs for it
217            //
218            // Since selfdestruct can only happen in the same transaction, we can skip
219            // prefetching proofs for selfdestructed accounts
220            //
221            // See: https://eips.ethereum.org/EIPS/eip-6780
222            if !account.is_touched() || account.is_selfdestructed() {
223                continue
224            }
225
226            let mut storage_set =
227                B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
228            for (key, slot) in account.storage {
229                // do nothing if unchanged
230                if !slot.is_changed() {
231                    continue
232                }
233
234                storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
235            }
236
237            storage_targets += storage_set.len();
238            targets.insert(keccak256(addr), storage_set);
239        }
240
241        metrics.prefetch_storage_targets.record(storage_targets as f64);
242
243        Some(targets)
244    }
245
246    /// Transacts the transaction and returns the state outcome.
247    ///
248    /// Returns `None` if executing the transaction failed to a non Revert error.
249    /// Returns the touched+modified state of the transaction.
250    ///
251    /// Note: Since here are no ordering guarantees this won't the state the tx produces when
252    /// executed sequentially.
253    fn transact(self, tx: Recovered<N::SignedTx>) -> Option<EvmState> {
254        let Self { header, evm_config, cache: caches, cache_metrics, provider, metrics } = self;
255        // Create the state provider inside the thread
256        let state_provider = match provider.build() {
257            Ok(provider) => provider,
258            Err(err) => {
259                trace!(
260                    target: "engine::tree",
261                    %err,
262                    "Failed to build state provider in prewarm thread"
263                );
264                return None
265            }
266        };
267
268        // Use the caches to create a new provider with caching
269        let state_provider =
270            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
271
272        let state_provider = StateProviderDatabase::new(&state_provider);
273
274        let mut evm_env = evm_config.evm_env(&header);
275
276        // we must disable the nonce check so that we can execute the transaction even if the nonce
277        // doesn't match what's on chain.
278        evm_env.cfg_env.disable_nonce_check = true;
279
280        // create a new executor and disable nonce checks in the env
281        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
282
283        // create the tx env and reset nonce
284        let tx_env = evm_config.tx_env(&tx);
285        let start = Instant::now();
286        let res = match evm.transact(tx_env) {
287            Ok(res) => res,
288            Err(err) => {
289                trace!(
290                    target: "engine::tree",
291                    %err,
292                    tx_hash=%tx.tx_hash(),
293                    sender=%tx.signer(),
294                    "Error when executing prewarm transaction",
295                );
296                return None
297            }
298        };
299        metrics.execution_duration.record(start.elapsed());
300
301        Some(res.state)
302    }
303}
304
305/// The events the pre-warm task can handle.
306pub(super) enum PrewarmTaskEvent {
307    /// Forcefully terminate all remaining transaction execution.
308    TerminateTransactionExecution,
309    /// Forcefully terminate the task on demand and update the shared cache with the given output
310    /// before exiting.
311    Terminate {
312        /// The final block state output.
313        block_output: Option<BundleState>,
314    },
315    /// The outcome of a pre-warm task
316    Outcome {
317        /// The prepared proof targets based on the evm state outcome
318        proof_targets: Option<MultiProofTargets>,
319    },
320}
321
322/// Metrics for transactions prewarming.
323#[derive(Metrics, Clone)]
324#[metrics(scope = "sync.prewarm")]
325pub(crate) struct PrewarmMetrics {
326    /// The number of transactions to prewarm
327    pub(crate) transactions: Gauge,
328    /// A histogram of the number of transactions to prewarm
329    pub(crate) transactions_histogram: Histogram,
330    /// A histogram of duration per transaction prewarming
331    pub(crate) total_runtime: Histogram,
332    /// A histogram of EVM execution duration per transaction prewarming
333    pub(crate) execution_duration: Histogram,
334    /// A histogram for prefetch targets per transaction prewarming
335    pub(crate) prefetch_storage_targets: Histogram,
336    /// A histogram of duration for cache saving
337    pub(crate) cache_saving_duration: Gauge,
338}