reth_engine_tree/tree/payload_processor/
prewarm.rs1use crate::tree::{
4 cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5 payload_processor::{
6 executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7 },
8 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
9 ExecutionEnv, StateProviderBuilder,
10};
11use alloy_evm::Database;
12use alloy_primitives::{keccak256, map::B256Set, B256};
13use metrics::{Gauge, Histogram};
14use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
15use reth_metrics::Metrics;
16use reth_primitives_traits::{NodePrimitives, SignedTransaction};
17use reth_provider::{BlockReader, StateProviderFactory, StateReader};
18use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
19use reth_trie::MultiProofTargets;
20use std::{
21 sync::{
22 atomic::{AtomicBool, Ordering},
23 mpsc::{self, channel, Receiver, Sender},
24 Arc,
25 },
26 time::Instant,
27};
28use tracing::{debug, trace};
29
30pub(super) struct PrewarmCacheTask<N, P, Evm>
35where
36 N: NodePrimitives,
37 Evm: ConfigureEvm<Primitives = N>,
38{
39 executor: WorkloadExecutor,
41 execution_cache: ExecutionCache,
43 ctx: PrewarmContext<N, P, Evm>,
45 max_concurrency: usize,
47 to_multi_proof: Option<Sender<MultiProofMessage>>,
49 actions_rx: Receiver<PrewarmTaskEvent>,
51}
52
53impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
54where
55 N: NodePrimitives,
56 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
57 Evm: ConfigureEvm<Primitives = N> + 'static,
58{
59 pub(super) fn new(
61 executor: WorkloadExecutor,
62 execution_cache: ExecutionCache,
63 ctx: PrewarmContext<N, P, Evm>,
64 to_multi_proof: Option<Sender<MultiProofMessage>>,
65 ) -> (Self, Sender<PrewarmTaskEvent>) {
66 let (actions_tx, actions_rx) = channel();
67 (
68 Self {
69 executor,
70 execution_cache,
71 ctx,
72 max_concurrency: 64,
73 to_multi_proof,
74 actions_rx,
75 },
76 actions_tx,
77 )
78 }
79
80 fn spawn_all(
82 &self,
83 pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
84 actions_tx: Sender<PrewarmTaskEvent>,
85 ) {
86 let executor = self.executor.clone();
87 let ctx = self.ctx.clone();
88 let max_concurrency = self.max_concurrency;
89
90 self.executor.spawn_blocking(move || {
91 let mut handles = Vec::with_capacity(max_concurrency);
92 let (done_tx, done_rx) = mpsc::channel();
93 let mut executing = 0;
94 while let Ok(executable) = pending.recv() {
95 let task_idx = executing % max_concurrency;
96
97 if handles.len() <= task_idx {
98 let (tx, rx) = mpsc::channel();
99 let sender = actions_tx.clone();
100 let ctx = ctx.clone();
101 let done_tx = done_tx.clone();
102
103 executor.spawn_blocking(move || {
104 ctx.transact_batch(rx, sender, done_tx);
105 });
106
107 handles.push(tx);
108 }
109
110 let _ = handles[task_idx].send(executable);
111
112 executing += 1;
113 }
114
115 drop(done_tx);
117 drop(handles);
118 while done_rx.recv().is_ok() {}
119
120 let _ = actions_tx
121 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: executing });
122 });
123 }
124
125 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
127 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
128 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
129 }
130 }
131
132 fn save_cache(self, state: BundleState) {
134 let start = Instant::now();
135 let cache = SavedCache::new(
136 self.ctx.env.hash,
137 self.ctx.cache.clone(),
138 self.ctx.cache_metrics.clone(),
139 );
140 if cache.cache().insert_state(&state).is_err() {
141 return
142 }
143
144 cache.update_metrics();
145
146 debug!(target: "engine::caching", "Updated state caches");
147
148 self.execution_cache.save_cache(cache);
150 self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
151 }
152
153 pub(super) fn run(
158 self,
159 pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
160 actions_tx: Sender<PrewarmTaskEvent>,
161 ) {
162 self.spawn_all(pending, actions_tx);
164
165 let mut final_block_output = None;
166 let mut finished_execution = false;
167 while let Ok(event) = self.actions_rx.recv() {
168 match event {
169 PrewarmTaskEvent::TerminateTransactionExecution => {
170 self.ctx.terminate_execution.store(true, Ordering::Relaxed);
172 }
173 PrewarmTaskEvent::Outcome { proof_targets } => {
174 self.send_multi_proof_targets(proof_targets);
176 }
177 PrewarmTaskEvent::Terminate { block_output } => {
178 trace!(target: "engine::tree::prewarm", "Received termination signal");
179 final_block_output = Some(block_output);
180
181 if finished_execution {
182 break
184 }
185 }
186 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
187 trace!(target: "engine::tree::prewarm", "Finished prewarm execution signal");
188 self.ctx.metrics.transactions.set(executed_transactions as f64);
189 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
190
191 finished_execution = true;
192
193 if final_block_output.is_some() {
194 break
196 }
197 }
198 }
199 }
200
201 trace!(target: "engine::tree::prewarm", "Completed prewarm execution");
202
203 if let Some(Some(state)) = final_block_output {
205 self.save_cache(state);
206 }
207 }
208}
209
210#[derive(Debug, Clone)]
212pub(super) struct PrewarmContext<N, P, Evm>
213where
214 N: NodePrimitives,
215 Evm: ConfigureEvm<Primitives = N>,
216{
217 pub(super) env: ExecutionEnv<Evm>,
218 pub(super) evm_config: Evm,
219 pub(super) cache: ProviderCaches,
220 pub(super) cache_metrics: CachedStateMetrics,
221 pub(super) provider: StateProviderBuilder<N, P>,
223 pub(super) metrics: PrewarmMetrics,
224 pub(super) terminate_execution: Arc<AtomicBool>,
226 pub(super) precompile_cache_disabled: bool,
227 pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
228}
229
230impl<N, P, Evm> PrewarmContext<N, P, Evm>
231where
232 N: NodePrimitives,
233 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
234 Evm: ConfigureEvm<Primitives = N> + 'static,
235{
236 fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
239 let Self {
240 env,
241 evm_config,
242 cache: caches,
243 cache_metrics,
244 provider,
245 metrics,
246 terminate_execution,
247 precompile_cache_disabled,
248 mut precompile_cache_map,
249 } = self;
250
251 let state_provider = match provider.build() {
252 Ok(provider) => provider,
253 Err(err) => {
254 trace!(
255 target: "engine::tree",
256 %err,
257 "Failed to build state provider in prewarm thread"
258 );
259 return None
260 }
261 };
262
263 let state_provider =
265 CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
266
267 let state_provider = StateProviderDatabase::new(state_provider);
268
269 let mut evm_env = env.evm_env;
270
271 evm_env.cfg_env.disable_nonce_check = true;
274
275 let spec_id = *evm_env.spec_id();
277 let mut evm = evm_config.evm_with_env(state_provider, evm_env);
278
279 if !precompile_cache_disabled {
280 evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
282 CachedPrecompile::wrap(
283 precompile,
284 precompile_cache_map.cache_for_address(*address),
285 spec_id,
286 None, )
288 });
289 }
290
291 Some((evm, metrics, terminate_execution))
292 }
293
294 fn transact_batch(
303 self,
304 txs: mpsc::Receiver<impl ExecutableTxFor<Evm>>,
305 sender: Sender<PrewarmTaskEvent>,
306 done_tx: Sender<()>,
307 ) {
308 let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
309
310 while let Ok(tx) = txs.recv() {
311 if terminate_execution.load(Ordering::Relaxed) {
314 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
315 break
316 }
317
318 let start = Instant::now();
320 let res = match evm.transact(&tx) {
321 Ok(res) => res,
322 Err(err) => {
323 trace!(
324 target: "engine::tree",
325 %err,
326 tx_hash=%tx.tx().tx_hash(),
327 sender=%tx.signer(),
328 "Error when executing prewarm transaction",
329 );
330 return
331 }
332 };
333 metrics.execution_duration.record(start.elapsed());
334
335 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
336 metrics.prefetch_storage_targets.record(storage_targets as f64);
337 metrics.total_runtime.record(start.elapsed());
338
339 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
340 }
341
342 let _ = done_tx.send(());
344 }
345}
346
347fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
350 let mut targets = MultiProofTargets::with_capacity(state.len());
351 let mut storage_targets = 0;
352 for (addr, account) in state {
353 if !account.is_touched() || account.is_selfdestructed() {
361 continue
362 }
363
364 let mut storage_set =
365 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
366 for (key, slot) in account.storage {
367 if !slot.is_changed() {
369 continue
370 }
371
372 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
373 }
374
375 storage_targets += storage_set.len();
376 targets.insert(keccak256(addr), storage_set);
377 }
378
379 (targets, storage_targets)
380}
381
382pub(super) enum PrewarmTaskEvent {
384 TerminateTransactionExecution,
386 Terminate {
389 block_output: Option<BundleState>,
391 },
392 Outcome {
394 proof_targets: Option<MultiProofTargets>,
396 },
397 FinishedTxExecution {
399 executed_transactions: usize,
401 },
402}
403
404#[derive(Metrics, Clone)]
406#[metrics(scope = "sync.prewarm")]
407pub(crate) struct PrewarmMetrics {
408 pub(crate) transactions: Gauge,
410 pub(crate) transactions_histogram: Histogram,
412 pub(crate) total_runtime: Histogram,
414 pub(crate) execution_duration: Histogram,
416 pub(crate) prefetch_storage_targets: Histogram,
418 pub(crate) cache_saving_duration: Gauge,
420}