reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5    payload_processor::{
6        executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7    },
8    StateProviderBuilder,
9};
10use alloy_consensus::transaction::Recovered;
11use alloy_evm::Database;
12use alloy_primitives::{keccak256, map::B256Set, B256};
13use itertools::Itertools;
14use metrics::{Gauge, Histogram};
15use reth_evm::{ConfigureEvm, Evm, EvmFor};
16use reth_metrics::Metrics;
17use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction};
18use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader};
19use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
20use reth_trie::MultiProofTargets;
21use std::{
22    collections::VecDeque,
23    sync::mpsc::{channel, Receiver, Sender},
24    time::Instant,
25};
26use tracing::{debug, trace};
27
28/// A task that is responsible for caching and prewarming the cache by executing transactions
29/// individually in parallel.
30///
31/// Note: This task runs until cancelled externally.
32pub(super) struct PrewarmCacheTask<N: NodePrimitives, P, Evm> {
33    /// The executor used to spawn execution tasks.
34    executor: WorkloadExecutor,
35    /// Shared execution cache.
36    execution_cache: ExecutionCache,
37    /// Transactions pending execution.
38    pending: VecDeque<Recovered<N::SignedTx>>,
39    /// Context provided to execution tasks
40    ctx: PrewarmContext<N, P, Evm>,
41    /// How many transactions should be executed in parallel
42    max_concurrency: usize,
43    /// Sender to emit evm state outcome messages, if any.
44    to_multi_proof: Option<Sender<MultiProofMessage>>,
45    /// Receiver for events produced by tx execution
46    actions_rx: Receiver<PrewarmTaskEvent>,
47    /// Sender the transactions use to send their result back
48    actions_tx: Sender<PrewarmTaskEvent>,
49}
50
51impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
52where
53    N: NodePrimitives,
54    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
55    Evm: ConfigureEvm<Primitives = N> + 'static,
56{
57    /// Initializes the task with the given transactions pending execution
58    pub(super) fn new(
59        executor: WorkloadExecutor,
60        execution_cache: ExecutionCache,
61        ctx: PrewarmContext<N, P, Evm>,
62        to_multi_proof: Option<Sender<MultiProofMessage>>,
63        pending: VecDeque<Recovered<N::SignedTx>>,
64    ) -> Self {
65        let (actions_tx, actions_rx) = channel();
66        Self {
67            executor,
68            execution_cache,
69            pending,
70            ctx,
71            max_concurrency: 64,
72            to_multi_proof,
73            actions_rx,
74            actions_tx,
75        }
76    }
77
78    /// Returns the sender that can communicate with this task.
79    pub(super) fn actions_tx(&self) -> Sender<PrewarmTaskEvent> {
80        self.actions_tx.clone()
81    }
82
83    /// Spawns all pending transactions as blocking tasks by first chunking them.
84    fn spawn_all(&mut self) {
85        let chunk_size = (self.pending.len() / self.max_concurrency).max(1);
86
87        for chunk in &self.pending.drain(..).chunks(chunk_size) {
88            let sender = self.actions_tx.clone();
89            let ctx = self.ctx.clone();
90            let pending_chunk = chunk.collect::<Vec<_>>();
91
92            self.executor.spawn_blocking(move || {
93                ctx.transact_batch(&pending_chunk, sender);
94            });
95        }
96    }
97
98    /// If configured and the tx returned proof targets, emit the targets the transaction produced
99    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
100        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
101            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
102        }
103    }
104
105    /// Save the state to the shared cache for the given block.
106    fn save_cache(&self, state: BundleState) {
107        let start = Instant::now();
108        let cache = SavedCache::new(
109            self.ctx.header.hash(),
110            self.ctx.cache.clone(),
111            self.ctx.cache_metrics.clone(),
112        );
113        if cache.cache().insert_state(&state).is_err() {
114            return
115        }
116
117        cache.update_metrics();
118
119        debug!(target: "engine::caching", "Updated state caches");
120
121        // update the cache for the executed block
122        self.execution_cache.save_cache(cache);
123        self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
124    }
125
126    /// Executes the task.
127    ///
128    /// This will execute the transactions until all transactions have been processed or the task
129    /// was cancelled.
130    pub(super) fn run(mut self) {
131        self.ctx.metrics.transactions.set(self.pending.len() as f64);
132        self.ctx.metrics.transactions_histogram.record(self.pending.len() as f64);
133
134        // spawn execution tasks.
135        self.spawn_all();
136
137        while let Ok(event) = self.actions_rx.recv() {
138            match event {
139                PrewarmTaskEvent::TerminateTransactionExecution => {
140                    // stop tx processing
141                    self.pending.clear();
142                }
143                PrewarmTaskEvent::Outcome { proof_targets } => {
144                    // completed a transaction, frees up one slot
145                    self.send_multi_proof_targets(proof_targets);
146                }
147                PrewarmTaskEvent::Terminate { block_output } => {
148                    // terminate the task
149                    if let Some(state) = block_output {
150                        self.save_cache(state);
151                    }
152
153                    break
154                }
155            }
156        }
157    }
158}
159
160/// Context required by tx execution tasks.
161#[derive(Debug, Clone)]
162pub(super) struct PrewarmContext<N: NodePrimitives, P, Evm> {
163    pub(super) header: SealedHeaderFor<N>,
164    pub(super) evm_config: Evm,
165    pub(super) cache: ProviderCaches,
166    pub(super) cache_metrics: CachedStateMetrics,
167    /// Provider to obtain the state
168    pub(super) provider: StateProviderBuilder<N, P>,
169    pub(super) metrics: PrewarmMetrics,
170}
171
172impl<N, P, Evm> PrewarmContext<N, P, Evm>
173where
174    N: NodePrimitives,
175    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
176    Evm: ConfigureEvm<Primitives = N> + 'static,
177{
178    /// Splits this context into an evm, an evm config, and metrics.
179    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, Evm, PrewarmMetrics)> {
180        let Self { header, evm_config, cache: caches, cache_metrics, provider, metrics } = self;
181
182        let state_provider = match provider.build() {
183            Ok(provider) => provider,
184            Err(err) => {
185                trace!(
186                    target: "engine::tree",
187                    %err,
188                    "Failed to build state provider in prewarm thread"
189                );
190                return None
191            }
192        };
193
194        // Use the caches to create a new provider with caching
195        let state_provider =
196            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
197
198        let state_provider = StateProviderDatabase::new(state_provider);
199
200        let mut evm_env = evm_config.evm_env(&header);
201
202        // we must disable the nonce check so that we can execute the transaction even if the nonce
203        // doesn't match what's on chain.
204        evm_env.cfg_env.disable_nonce_check = true;
205
206        // create a new executor and disable nonce checks in the env
207        let evm = evm_config.evm_with_env(state_provider, evm_env);
208
209        Some((evm, evm_config, metrics))
210    }
211
212    /// Transacts the vec of transactions and returns the state outcome.
213    ///
214    /// Returns `None` if executing the transactions failed to a non Revert error.
215    /// Returns the touched+modified state of the transaction.
216    ///
217    /// Note: Since here are no ordering guarantees this won't the state the txs produce when
218    /// executed sequentially.
219    fn transact_batch(self, txs: &[Recovered<N::SignedTx>], sender: Sender<PrewarmTaskEvent>) {
220        let Some((mut evm, evm_config, metrics)) = self.evm_for_ctx() else { return };
221
222        for tx in txs {
223            // create the tx env
224            let tx_env = evm_config.tx_env(tx);
225            let start = Instant::now();
226            let res = match evm.transact(tx_env) {
227                Ok(res) => res,
228                Err(err) => {
229                    trace!(
230                        target: "engine::tree",
231                        %err,
232                        tx_hash=%tx.tx_hash(),
233                        sender=%tx.signer(),
234                        "Error when executing prewarm transaction",
235                    );
236                    return
237                }
238            };
239            metrics.execution_duration.record(start.elapsed());
240
241            let (targets, storage_targets) = multiproof_targets_from_state(res.state);
242            metrics.prefetch_storage_targets.record(storage_targets as f64);
243            metrics.total_runtime.record(start.elapsed());
244
245            let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
246        }
247    }
248}
249
250/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
251/// given state.
252fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
253    let mut targets = MultiProofTargets::with_capacity(state.len());
254    let mut storage_targets = 0;
255    for (addr, account) in state {
256        // if the account was not touched, or if the account was selfdestructed, do not
257        // fetch proofs for it
258        //
259        // Since selfdestruct can only happen in the same transaction, we can skip
260        // prefetching proofs for selfdestructed accounts
261        //
262        // See: https://eips.ethereum.org/EIPS/eip-6780
263        if !account.is_touched() || account.is_selfdestructed() {
264            continue
265        }
266
267        let mut storage_set =
268            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
269        for (key, slot) in account.storage {
270            // do nothing if unchanged
271            if !slot.is_changed() {
272                continue
273            }
274
275            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
276        }
277
278        storage_targets += storage_set.len();
279        targets.insert(keccak256(addr), storage_set);
280    }
281
282    (targets, storage_targets)
283}
284
285/// The events the pre-warm task can handle.
286pub(super) enum PrewarmTaskEvent {
287    /// Forcefully terminate all remaining transaction execution.
288    TerminateTransactionExecution,
289    /// Forcefully terminate the task on demand and update the shared cache with the given output
290    /// before exiting.
291    Terminate {
292        /// The final block state output.
293        block_output: Option<BundleState>,
294    },
295    /// The outcome of a pre-warm task
296    Outcome {
297        /// The prepared proof targets based on the evm state outcome
298        proof_targets: Option<MultiProofTargets>,
299    },
300}
301
302/// Metrics for transactions prewarming.
303#[derive(Metrics, Clone)]
304#[metrics(scope = "sync.prewarm")]
305pub(crate) struct PrewarmMetrics {
306    /// The number of transactions to prewarm
307    pub(crate) transactions: Gauge,
308    /// A histogram of the number of transactions to prewarm
309    pub(crate) transactions_histogram: Histogram,
310    /// A histogram of duration per transaction prewarming
311    pub(crate) total_runtime: Histogram,
312    /// A histogram of EVM execution duration per transaction prewarming
313    pub(crate) execution_duration: Histogram,
314    /// A histogram for prefetch targets per transaction prewarming
315    pub(crate) prefetch_storage_targets: Histogram,
316    /// A histogram of duration for cache saving
317    pub(crate) cache_saving_duration: Gauge,
318}