reth_engine_tree/tree/payload_processor/
prewarm.rs1use crate::tree::{
15 cached_state::{
16 CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache, SavedCache,
17 },
18 payload_processor::{
19 executor::WorkloadExecutor, multiproof::MultiProofMessage,
20 ExecutionCache as PayloadExecutionCache,
21 },
22 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
23 ExecutionEnv, StateProviderBuilder,
24};
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use metrics::{Counter, Gauge, Histogram};
28use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
29use reth_metrics::Metrics;
30use reth_primitives_traits::{NodePrimitives, SignedTransaction};
31use reth_provider::{BlockReader, StateProviderFactory, StateReader};
32use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
33use reth_trie::MultiProofTargets;
34use std::{
35 sync::{
36 atomic::{AtomicBool, Ordering},
37 mpsc::{self, channel, Receiver, Sender},
38 Arc,
39 },
40 time::Instant,
41};
42use tracing::{debug, trace};
43
44pub(super) struct PrewarmCacheTask<N, P, Evm>
49where
50 N: NodePrimitives,
51 Evm: ConfigureEvm<Primitives = N>,
52{
53 executor: WorkloadExecutor,
55 execution_cache: PayloadExecutionCache,
57 ctx: PrewarmContext<N, P, Evm>,
59 max_concurrency: usize,
61 to_multi_proof: Option<Sender<MultiProofMessage>>,
63 actions_rx: Receiver<PrewarmTaskEvent>,
65}
66
67impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
68where
69 N: NodePrimitives,
70 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
71 Evm: ConfigureEvm<Primitives = N> + 'static,
72{
73 pub(super) fn new(
75 executor: WorkloadExecutor,
76 execution_cache: PayloadExecutionCache,
77 ctx: PrewarmContext<N, P, Evm>,
78 to_multi_proof: Option<Sender<MultiProofMessage>>,
79 ) -> (Self, Sender<PrewarmTaskEvent>) {
80 let (actions_tx, actions_rx) = channel();
81 (
82 Self {
83 executor,
84 execution_cache,
85 ctx,
86 max_concurrency: 64,
87 to_multi_proof,
88 actions_rx,
89 },
90 actions_tx,
91 )
92 }
93
94 fn spawn_all(
96 &self,
97 pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
98 actions_tx: Sender<PrewarmTaskEvent>,
99 ) {
100 let executor = self.executor.clone();
101 let ctx = self.ctx.clone();
102 let max_concurrency = self.max_concurrency;
103
104 self.executor.spawn_blocking(move || {
105 let mut handles = Vec::with_capacity(max_concurrency);
106 let (done_tx, done_rx) = mpsc::channel();
107 let mut executing = 0;
108 while let Ok(executable) = pending.recv() {
109 let task_idx = executing % max_concurrency;
110
111 if handles.len() <= task_idx {
112 let (tx, rx) = mpsc::channel();
113 let sender = actions_tx.clone();
114 let ctx = ctx.clone();
115 let done_tx = done_tx.clone();
116
117 executor.spawn_blocking(move || {
118 ctx.transact_batch(rx, sender, done_tx);
119 });
120
121 handles.push(tx);
122 }
123
124 let _ = handles[task_idx].send(executable);
125
126 executing += 1;
127 }
128
129 drop(done_tx);
131 drop(handles);
132 while done_rx.recv().is_ok() {}
133
134 let _ = actions_tx
135 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: executing });
136 });
137 }
138
139 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
141 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
142 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
143 }
144 }
145
146 fn save_cache(self, state: BundleState) {
154 let start = Instant::now();
155
156 let hash = self.ctx.env.hash;
158 let caches = self.ctx.cache.clone();
159 let metrics = self.ctx.cache_metrics.clone();
160
161 self.execution_cache.update_with_guard(|cached| {
163 let cache = SavedCache::new(hash, caches, metrics);
164
165 if cache.cache().insert_state(&state).is_err() {
167 *cached = None;
169 debug!(target: "engine::caching", "cleared execution cache on update error");
170 return;
171 }
172
173 cache.update_metrics();
174 debug!(target: "engine::caching", parent_hash=?cache.executed_block_hash(), "Updated execution cache");
175
176 *cached = Some(cache);
178 });
179
180 self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
181 }
182
183 pub(super) fn run(
188 self,
189 pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
190 actions_tx: Sender<PrewarmTaskEvent>,
191 ) {
192 self.spawn_all(pending, actions_tx);
194
195 let mut final_block_output = None;
196 let mut finished_execution = false;
197 while let Ok(event) = self.actions_rx.recv() {
198 match event {
199 PrewarmTaskEvent::TerminateTransactionExecution => {
200 self.ctx.terminate_execution.store(true, Ordering::Relaxed);
202 }
203 PrewarmTaskEvent::Outcome { proof_targets } => {
204 self.send_multi_proof_targets(proof_targets);
206 }
207 PrewarmTaskEvent::Terminate { block_output } => {
208 trace!(target: "engine::tree::prewarm", "Received termination signal");
209 final_block_output = Some(block_output);
210
211 if finished_execution {
212 break
214 }
215 }
216 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
217 trace!(target: "engine::tree::prewarm", "Finished prewarm execution signal");
218 self.ctx.metrics.transactions.set(executed_transactions as f64);
219 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
220
221 finished_execution = true;
222
223 if final_block_output.is_some() {
224 break
226 }
227 }
228 }
229 }
230
231 trace!(target: "engine::tree::prewarm", "Completed prewarm execution");
232
233 if let Some(Some(state)) = final_block_output {
235 self.save_cache(state);
236 }
237 }
238}
239
240#[derive(Debug, Clone)]
242pub(super) struct PrewarmContext<N, P, Evm>
243where
244 N: NodePrimitives,
245 Evm: ConfigureEvm<Primitives = N>,
246{
247 pub(super) env: ExecutionEnv<Evm>,
248 pub(super) evm_config: Evm,
249 pub(super) cache: StateExecutionCache,
250 pub(super) cache_metrics: CachedStateMetrics,
251 pub(super) provider: StateProviderBuilder<N, P>,
253 pub(super) metrics: PrewarmMetrics,
254 pub(super) terminate_execution: Arc<AtomicBool>,
256 pub(super) precompile_cache_disabled: bool,
257 pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
258}
259
260impl<N, P, Evm> PrewarmContext<N, P, Evm>
261where
262 N: NodePrimitives,
263 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
264 Evm: ConfigureEvm<Primitives = N> + 'static,
265{
266 fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
269 let Self {
270 env,
271 evm_config,
272 cache: caches,
273 cache_metrics,
274 provider,
275 metrics,
276 terminate_execution,
277 precompile_cache_disabled,
278 mut precompile_cache_map,
279 } = self;
280
281 let state_provider = match provider.build() {
282 Ok(provider) => provider,
283 Err(err) => {
284 trace!(
285 target: "engine::tree",
286 %err,
287 "Failed to build state provider in prewarm thread"
288 );
289 return None
290 }
291 };
292
293 let state_provider =
295 CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
296
297 let state_provider = StateProviderDatabase::new(state_provider);
298
299 let mut evm_env = env.evm_env;
300
301 evm_env.cfg_env.disable_nonce_check = true;
304
305 let spec_id = *evm_env.spec_id();
307 let mut evm = evm_config.evm_with_env(state_provider, evm_env);
308
309 if !precompile_cache_disabled {
310 evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
312 CachedPrecompile::wrap(
313 precompile,
314 precompile_cache_map.cache_for_address(*address),
315 spec_id,
316 None, )
318 });
319 }
320
321 Some((evm, metrics, terminate_execution))
322 }
323
324 fn transact_batch(
333 self,
334 txs: mpsc::Receiver<impl ExecutableTxFor<Evm>>,
335 sender: Sender<PrewarmTaskEvent>,
336 done_tx: Sender<()>,
337 ) {
338 let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
339
340 while let Ok(tx) = txs.recv() {
341 if terminate_execution.load(Ordering::Relaxed) {
344 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
345 break
346 }
347
348 let start = Instant::now();
350 let res = match evm.transact(&tx) {
351 Ok(res) => res,
352 Err(err) => {
353 trace!(
354 target: "engine::tree::prewarm",
355 %err,
356 tx_hash=%tx.tx().tx_hash(),
357 sender=%tx.signer(),
358 "Error when executing prewarm transaction",
359 );
360 metrics.transaction_errors.increment(1);
362 continue
364 }
365 };
366 metrics.execution_duration.record(start.elapsed());
367
368 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
369 metrics.prefetch_storage_targets.record(storage_targets as f64);
370 metrics.total_runtime.record(start.elapsed());
371
372 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
373 }
374
375 let _ = done_tx.send(());
377 }
378}
379
380fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
383 let mut targets = MultiProofTargets::with_capacity(state.len());
384 let mut storage_targets = 0;
385 for (addr, account) in state {
386 if !account.is_touched() || account.is_selfdestructed() {
394 continue
395 }
396
397 let mut storage_set =
398 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
399 for (key, slot) in account.storage {
400 if !slot.is_changed() {
402 continue
403 }
404
405 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
406 }
407
408 storage_targets += storage_set.len();
409 targets.insert(keccak256(addr), storage_set);
410 }
411
412 (targets, storage_targets)
413}
414
415pub(super) enum PrewarmTaskEvent {
417 TerminateTransactionExecution,
419 Terminate {
422 block_output: Option<BundleState>,
424 },
425 Outcome {
427 proof_targets: Option<MultiProofTargets>,
429 },
430 FinishedTxExecution {
432 executed_transactions: usize,
434 },
435}
436
437#[derive(Metrics, Clone)]
439#[metrics(scope = "sync.prewarm")]
440pub(crate) struct PrewarmMetrics {
441 pub(crate) transactions: Gauge,
443 pub(crate) transactions_histogram: Histogram,
445 pub(crate) total_runtime: Histogram,
447 pub(crate) execution_duration: Histogram,
449 pub(crate) prefetch_storage_targets: Histogram,
451 pub(crate) cache_saving_duration: Gauge,
453 pub(crate) transaction_errors: Counter,
455}