1use crate::tree::{
15 cached_state::{CachedStateProvider, SavedCache},
16 payload_processor::{
17 executor::WorkloadExecutor, multiproof::MultiProofMessage,
18 ExecutionCache as PayloadExecutionCache,
19 },
20 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
21 ExecutionEnv, StateProviderBuilder,
22};
23use alloy_consensus::transaction::TxHashRef;
24use alloy_eips::Typed2718;
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use crossbeam_channel::Sender as CrossbeamSender;
28use metrics::{Counter, Gauge, Histogram};
29use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
30use reth_metrics::Metrics;
31use reth_primitives_traits::NodePrimitives;
32use reth_provider::{BlockReader, StateProviderBox, StateProviderFactory, StateReader};
33use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
34use reth_trie::MultiProofTargets;
35use std::{
36 sync::{
37 atomic::{AtomicBool, Ordering},
38 mpsc::{self, channel, Receiver, Sender},
39 Arc,
40 },
41 time::Instant,
42};
43use tracing::{debug, debug_span, instrument, trace, warn, Span};
44
45#[derive(Clone)]
47struct IndexedTransaction<Tx> {
48 index: usize,
50 tx: Tx,
52}
53
54const MAX_STANDARD_TX_TYPE: u8 = 4;
66
67pub(super) struct PrewarmCacheTask<N, P, Evm>
72where
73 N: NodePrimitives,
74 Evm: ConfigureEvm<Primitives = N>,
75{
76 executor: WorkloadExecutor,
78 execution_cache: PayloadExecutionCache,
80 ctx: PrewarmContext<N, P, Evm>,
82 max_concurrency: usize,
84 transaction_count_hint: usize,
86 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
88 actions_rx: Receiver<PrewarmTaskEvent>,
90 parent_span: Span,
92}
93
94impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
95where
96 N: NodePrimitives,
97 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
98 Evm: ConfigureEvm<Primitives = N> + 'static,
99{
100 pub(super) fn new(
102 executor: WorkloadExecutor,
103 execution_cache: PayloadExecutionCache,
104 ctx: PrewarmContext<N, P, Evm>,
105 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
106 transaction_count_hint: usize,
107 max_concurrency: usize,
108 ) -> (Self, Sender<PrewarmTaskEvent>) {
109 let (actions_tx, actions_rx) = channel();
110
111 trace!(
112 target: "engine::tree::payload_processor::prewarm",
113 max_concurrency,
114 transaction_count_hint,
115 "Initialized prewarm task"
116 );
117
118 (
119 Self {
120 executor,
121 execution_cache,
122 ctx,
123 max_concurrency,
124 transaction_count_hint,
125 to_multi_proof,
126 actions_rx,
127 parent_span: Span::current(),
128 },
129 actions_tx,
130 )
131 }
132
133 fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
139 where
140 Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
141 {
142 let executor = self.executor.clone();
143 let ctx = self.ctx.clone();
144 let max_concurrency = self.max_concurrency;
145 let transaction_count_hint = self.transaction_count_hint;
146 let span = Span::current();
147
148 self.executor.spawn_blocking(move || {
149 let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
150
151 let (done_tx, done_rx) = mpsc::channel();
152
153 let workers_needed = if transaction_count_hint == 0 {
157 max_concurrency
158 } else {
159 transaction_count_hint.min(max_concurrency)
160 };
161
162 let mut handles = Vec::with_capacity(workers_needed);
164
165 for i in 0..workers_needed {
167 handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
168 }
169
170 let mut tx_index = 0usize;
172 while let Ok(tx) = pending.recv() {
173 if ctx.terminate_execution.load(Ordering::Relaxed) {
175 trace!(
176 target: "engine::tree::payload_processor::prewarm",
177 "Termination requested, stopping transaction distribution"
178 );
179 break;
180 }
181
182 let indexed_tx = IndexedTransaction { index: tx_index, tx };
183 let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
184
185 if tx_index == 0 && is_system_tx {
191 for handle in &handles {
192 let _ = handle.send(indexed_tx.clone());
197 }
198 } else {
199 let worker_idx = tx_index % workers_needed;
201 let _ = handles[worker_idx].send(indexed_tx);
206 }
207
208 tx_index += 1;
209 }
210
211 drop(done_tx);
213 drop(handles);
214 while done_rx.recv().is_ok() {}
215
216 let _ = actions_tx
217 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
218 });
219 }
220
221 fn is_execution_terminated(&self) -> bool {
223 self.ctx.terminate_execution.load(Ordering::Relaxed)
224 }
225
226 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
228 if self.is_execution_terminated() {
229 return
232 }
233
234 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
235 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
236 }
237 }
238
239 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
251 fn save_cache(self, state: BundleState) {
252 let start = Instant::now();
253
254 let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
255 self;
256 let hash = env.hash;
257
258 if let Some(saved_cache) = saved_cache {
259 debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
260 execution_cache.update_with_guard(|cached| {
262 let (caches, cache_metrics) = saved_cache.split();
265 let new_cache = SavedCache::new(hash, caches, cache_metrics);
266
267 if new_cache.cache().insert_state(&state).is_err() {
269 *cached = None;
271 debug!(target: "engine::caching", "cleared execution cache on update error");
272 return;
273 }
274
275 new_cache.update_metrics();
276
277 *cached = Some(new_cache);
280 });
281
282 let elapsed = start.elapsed();
283 debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
284
285 metrics.cache_saving_duration.set(elapsed.as_secs_f64());
286 }
287 }
288
289 #[instrument(
294 parent = &self.parent_span,
295 level = "debug",
296 target = "engine::tree::payload_processor::prewarm",
297 name = "prewarm and caching",
298 skip_all
299 )]
300 pub(super) fn run(
301 self,
302 pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
303 actions_tx: Sender<PrewarmTaskEvent>,
304 ) {
305 self.spawn_all(pending, actions_tx);
307
308 let mut final_block_output = None;
309 let mut finished_execution = false;
310 while let Ok(event) = self.actions_rx.recv() {
311 match event {
312 PrewarmTaskEvent::TerminateTransactionExecution => {
313 debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
315 self.ctx.terminate_execution.store(true, Ordering::Relaxed);
316 }
317 PrewarmTaskEvent::Outcome { proof_targets } => {
318 self.send_multi_proof_targets(proof_targets);
320 }
321 PrewarmTaskEvent::Terminate { block_output } => {
322 trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
323 final_block_output = Some(block_output);
324
325 if finished_execution {
326 break
328 }
329 }
330 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
331 trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
332 self.ctx.metrics.transactions.set(executed_transactions as f64);
333 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
334
335 finished_execution = true;
336
337 if final_block_output.is_some() {
338 break
340 }
341 }
342 }
343 }
344
345 debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
346
347 if let Some(Some(state)) = final_block_output {
349 self.save_cache(state);
350 }
351 }
352}
353
354#[derive(Debug, Clone)]
356pub(super) struct PrewarmContext<N, P, Evm>
357where
358 N: NodePrimitives,
359 Evm: ConfigureEvm<Primitives = N>,
360{
361 pub(super) env: ExecutionEnv<Evm>,
362 pub(super) evm_config: Evm,
363 pub(super) saved_cache: Option<SavedCache>,
364 pub(super) provider: StateProviderBuilder<N, P>,
366 pub(super) metrics: PrewarmMetrics,
367 pub(super) terminate_execution: Arc<AtomicBool>,
369 pub(super) precompile_cache_disabled: bool,
370 pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
371}
372
373impl<N, P, Evm> PrewarmContext<N, P, Evm>
374where
375 N: NodePrimitives,
376 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
377 Evm: ConfigureEvm<Primitives = N> + 'static,
378{
379 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
382 fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
383 let Self {
384 env,
385 evm_config,
386 saved_cache,
387 provider,
388 metrics,
389 terminate_execution,
390 precompile_cache_disabled,
391 mut precompile_cache_map,
392 } = self;
393
394 let state_provider = match provider.build() {
395 Ok(provider) => provider,
396 Err(err) => {
397 trace!(
398 target: "engine::tree::payload_processor::prewarm",
399 %err,
400 "Failed to build state provider in prewarm thread"
401 );
402 return None
403 }
404 };
405
406 let state_provider: StateProviderBox = if let Some(saved_cache) = saved_cache {
408 let caches = saved_cache.cache().clone();
409 let cache_metrics = saved_cache.metrics().clone();
410 Box::new(CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics))
411 } else {
412 state_provider
413 };
414
415 let state_provider = StateProviderDatabase::new(state_provider);
416
417 let mut evm_env = env.evm_env;
418
419 evm_env.cfg_env.disable_nonce_check = true;
422
423 let spec_id = *evm_env.spec_id();
425 let mut evm = evm_config.evm_with_env(state_provider, evm_env);
426
427 if !precompile_cache_disabled {
428 evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
430 CachedPrecompile::wrap(
431 precompile,
432 precompile_cache_map.cache_for_address(*address),
433 spec_id,
434 None, )
436 });
437 }
438
439 Some((evm, metrics, terminate_execution))
440 }
441
442 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
452 fn transact_batch<Tx>(
453 self,
454 txs: mpsc::Receiver<IndexedTransaction<Tx>>,
455 sender: Sender<PrewarmTaskEvent>,
456 done_tx: Sender<()>,
457 ) where
458 Tx: ExecutableTxFor<Evm>,
459 {
460 let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
461
462 while let Ok(IndexedTransaction { index, tx }) = {
463 let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
464 .entered();
465 txs.recv()
466 } {
467 let enter =
468 debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
469 .entered();
470
471 let start = Instant::now();
473
474 if terminate_execution.load(Ordering::Relaxed) {
477 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
478 break
479 }
480
481 let res = match evm.transact(&tx) {
482 Ok(res) => res,
483 Err(err) => {
484 trace!(
485 target: "engine::tree::payload_processor::prewarm",
486 %err,
487 tx_hash=%tx.tx().tx_hash(),
488 sender=%tx.signer(),
489 "Error when executing prewarm transaction",
490 );
491 metrics.transaction_errors.increment(1);
493 continue
495 }
496 };
497 metrics.execution_duration.record(start.elapsed());
498
499 enter.record("gas_used", res.result.gas_used());
501 enter.record("is_success", res.result.is_success());
502
503 drop(enter);
504
505 if terminate_execution.load(Ordering::Relaxed) {
508 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
509 break
510 }
511
512 if index > 0 {
515 let _enter =
516 debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
517 .entered();
518 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
519 metrics.prefetch_storage_targets.record(storage_targets as f64);
520 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
521 drop(_enter);
522 }
523
524 metrics.total_runtime.record(start.elapsed());
525 }
526
527 let _ = done_tx.send(());
529 }
530
531 fn spawn_worker<Tx>(
533 &self,
534 idx: usize,
535 executor: &WorkloadExecutor,
536 actions_tx: Sender<PrewarmTaskEvent>,
537 done_tx: Sender<()>,
538 ) -> mpsc::Sender<IndexedTransaction<Tx>>
539 where
540 Tx: ExecutableTxFor<Evm> + Send + 'static,
541 {
542 let (tx, rx) = mpsc::channel();
543 let ctx = self.clone();
544 let span =
545 debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
546
547 executor.spawn_blocking(move || {
548 let _enter = span.entered();
549 ctx.transact_batch(rx, actions_tx, done_tx);
550 });
551
552 tx
553 }
554}
555
556fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
559 let mut targets = MultiProofTargets::with_capacity(state.len());
560 let mut storage_targets = 0;
561 for (addr, account) in state {
562 if !account.is_touched() || account.is_selfdestructed() {
570 continue
571 }
572
573 let mut storage_set =
574 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
575 for (key, slot) in account.storage {
576 if !slot.is_changed() {
578 continue
579 }
580
581 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
582 }
583
584 storage_targets += storage_set.len();
585 targets.insert(keccak256(addr), storage_set);
586 }
587
588 (targets, storage_targets)
589}
590
591pub(super) enum PrewarmTaskEvent {
593 TerminateTransactionExecution,
595 Terminate {
598 block_output: Option<BundleState>,
600 },
601 Outcome {
603 proof_targets: Option<MultiProofTargets>,
605 },
606 FinishedTxExecution {
608 executed_transactions: usize,
610 },
611}
612
613#[derive(Metrics, Clone)]
615#[metrics(scope = "sync.prewarm")]
616pub(crate) struct PrewarmMetrics {
617 pub(crate) transactions: Gauge,
619 pub(crate) transactions_histogram: Histogram,
621 pub(crate) total_runtime: Histogram,
623 pub(crate) execution_duration: Histogram,
625 pub(crate) prefetch_storage_targets: Histogram,
627 pub(crate) cache_saving_duration: Gauge,
629 pub(crate) transaction_errors: Counter,
631}