1use crate::tree::{
15 cached_state::{CachedStateProvider, SavedCache},
16 payload_processor::{bal, multiproof::MultiProofMessage, PayloadExecutionCache},
17 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
18 ExecutionEnv, StateProviderBuilder,
19};
20use alloy_consensus::transaction::TxHashRef;
21use alloy_eip7928::BlockAccessList;
22use alloy_eips::eip4895::Withdrawal;
23use alloy_primitives::{keccak256, StorageKey, B256};
24use crossbeam_channel::Sender as CrossbeamSender;
25use metrics::{Counter, Gauge, Histogram};
26use rayon::prelude::*;
27use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
28use reth_metrics::Metrics;
29use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
30use reth_provider::{
31 AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
32 StateReader,
33};
34use reth_revm::{database::StateProviderDatabase, state::EvmState};
35use reth_tasks::{pool::WorkerPool, Runtime};
36use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
37use std::sync::{
38 atomic::{AtomicBool, AtomicUsize, Ordering},
39 mpsc::{self, channel, Receiver, Sender},
40 Arc,
41};
42use tracing::{debug, debug_span, instrument, trace, warn, Span};
43
44#[derive(Debug)]
46pub enum PrewarmMode<Tx> {
47 Transactions(Receiver<(usize, Tx)>),
49 BlockAccessList(Arc<BlockAccessList>),
51 Skipped,
54}
55
56#[derive(Debug)]
61pub struct PrewarmCacheTask<N, P, Evm>
62where
63 N: NodePrimitives,
64 Evm: ConfigureEvm<Primitives = N>,
65{
66 executor: Runtime,
68 execution_cache: PayloadExecutionCache,
70 ctx: PrewarmContext<N, P, Evm>,
72 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
74 actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
76 parent_span: Span,
78}
79
80impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
81where
82 N: NodePrimitives,
83 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
84 Evm: ConfigureEvm<Primitives = N> + 'static,
85{
86 pub fn new(
88 executor: Runtime,
89 execution_cache: PayloadExecutionCache,
90 ctx: PrewarmContext<N, P, Evm>,
91 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
92 ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
93 let (actions_tx, actions_rx) = channel();
94
95 trace!(
96 target: "engine::tree::payload_processor::prewarm",
97 prewarming_threads = executor.prewarming_pool().current_num_threads(),
98 transaction_count = ctx.env.transaction_count,
99 "Initialized prewarm task"
100 );
101
102 (
103 Self {
104 executor,
105 execution_cache,
106 ctx,
107 to_multi_proof,
108 actions_rx,
109 parent_span: Span::current(),
110 },
111 actions_tx,
112 )
113 }
114
115 fn spawn_txs_prewarm<Tx>(
122 &self,
123 pending: mpsc::Receiver<(usize, Tx)>,
124 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
125 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
126 ) where
127 Tx: ExecutableTxFor<Evm> + Send + 'static,
128 {
129 let executor = self.executor.clone();
130 let ctx = self.ctx.clone();
131 let span = Span::current();
132
133 self.executor.spawn_blocking_named("prewarm-txs", move || {
134 let _enter = debug_span!(
135 target: "engine::tree::payload_processor::prewarm",
136 parent: span,
137 "prewarm_txs"
138 )
139 .entered();
140
141 let ctx = &ctx;
142 let pool = executor.prewarming_pool();
143
144 let mut tx_count = 0usize;
145 let to_multi_proof = to_multi_proof.as_ref();
146 pool.in_place_scope(|s| {
147 s.spawn(|_| {
148 pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
149 });
150
151 while let Ok((index, tx)) = pending.recv() {
152 if ctx.should_stop() {
153 trace!(
154 target: "engine::tree::payload_processor::prewarm",
155 "Termination requested, stopping transaction distribution"
156 );
157 break;
158 }
159
160 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
162 continue;
163 }
164
165 tx_count += 1;
166 let parent_span = Span::current();
167 s.spawn(move |_| {
168 let _enter = debug_span!(
169 target: "engine::tree::payload_processor::prewarm",
170 parent: parent_span,
171 "prewarm_tx",
172 i = index,
173 )
174 .entered();
175 Self::transact_worker(ctx, index, tx, to_multi_proof);
176 });
177 }
178
179 if let Some(to_multi_proof) = to_multi_proof &&
181 let Some(withdrawals) = &ctx.env.withdrawals &&
182 !withdrawals.is_empty()
183 {
184 let targets = multiproof_targets_from_withdrawals(withdrawals);
185 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
186 }
187 });
188
189 pool.clear();
191
192 let _ = actions_tx
193 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
194 });
195 }
196
197 fn transact_worker<Tx>(
202 ctx: &PrewarmContext<N, P, Evm>,
203 index: usize,
204 tx: Tx,
205 to_multi_proof: Option<&CrossbeamSender<MultiProofMessage>>,
206 ) where
207 Tx: ExecutableTxFor<Evm>,
208 {
209 WorkerPool::with_worker_mut(|worker| {
210 let Some(evm) =
211 worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
212 else {
213 return;
214 };
215
216 if ctx.should_stop() {
217 return;
218 }
219
220 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
222 return;
223 }
224
225 let start = Instant::now();
226
227 let (tx_env, tx) = tx.into_parts();
228 let res = match evm.transact(tx_env) {
229 Ok(res) => res,
230 Err(err) => {
231 trace!(
232 target: "engine::tree::payload_processor::prewarm",
233 %err,
234 tx_hash=%tx.tx().tx_hash(),
235 sender=%tx.signer(),
236 "Error when executing prewarm transaction",
237 );
238 ctx.metrics.transaction_errors.increment(1);
239 return;
240 }
241 };
242 ctx.metrics.execution_duration.record(start.elapsed());
243
244 if ctx.should_stop() {
245 return;
246 }
247
248 if index > 0 {
249 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
250 ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
251 if let Some(to_multi_proof) = to_multi_proof {
252 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
253 }
254 }
255
256 ctx.metrics.total_runtime.record(start.elapsed());
257 });
258 }
259
260 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
272 fn save_cache(
273 self,
274 execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
275 valid_block_rx: mpsc::Receiver<()>,
276 ) {
277 let start = Instant::now();
278
279 let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
280 self;
281 let hash = env.hash;
282
283 if let Some(saved_cache) = saved_cache {
284 debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
285 execution_cache.update_with_guard(|cached| {
287 let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
290 let new_cache = SavedCache::new(hash, caches, cache_metrics)
291 .with_disable_cache_metrics(disable_cache_metrics);
292
293 if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
296 *cached = None;
298 debug!(target: "engine::caching", "cleared execution cache on update error");
299 return;
300 }
301
302 new_cache.update_metrics();
303
304 if valid_block_rx.recv().is_ok() {
305 *cached = Some(new_cache);
308 } else {
309 *cached = None;
312 debug!(target: "engine::caching", "cleared execution cache on invalid block");
313 }
314 });
315
316 let elapsed = start.elapsed();
317 debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
318
319 metrics.cache_saving_duration.set(elapsed.as_secs_f64());
320 }
321 }
322
323 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
326 fn run_bal_prewarm(
327 &self,
328 bal: Arc<BlockAccessList>,
329 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
330 ) {
331 if self.ctx.saved_cache.is_none() {
333 trace!(
334 target: "engine::tree::payload_processor::prewarm",
335 "Skipping BAL prewarm - no cache available"
336 );
337 self.send_bal_hashed_state(&bal);
338 let _ =
339 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
340 return;
341 }
342
343 if bal.is_empty() {
344 self.send_bal_hashed_state(&bal);
345 let _ =
346 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
347 return;
348 }
349
350 trace!(
351 target: "engine::tree::payload_processor::prewarm",
352 accounts = bal.len(),
353 "Starting BAL prewarm"
354 );
355
356 let ctx = self.ctx.clone();
357 self.executor.prewarming_pool().install_fn(|| {
358 bal.par_iter().for_each_init(
359 || (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
360 |(ctx, provider), account| {
361 if ctx.should_stop() {
362 return;
363 }
364 ctx.prefetch_bal_account(provider, account);
365 },
366 );
367 });
368
369 trace!(
370 target: "engine::tree::payload_processor::prewarm",
371 "All BAL prewarm accounts completed"
372 );
373
374 self.send_bal_hashed_state(&bal);
376
377 let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
379 }
380
381 fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
384 let Some(to_multi_proof) = &self.to_multi_proof else { return };
385
386 let provider = match self.ctx.provider.build() {
387 Ok(provider) => provider,
388 Err(err) => {
389 warn!(
390 target: "engine::tree::payload_processor::prewarm",
391 ?err,
392 "Failed to build provider for BAL hashed state conversion"
393 );
394 return;
395 }
396 };
397
398 match bal::bal_to_hashed_post_state(bal, &provider) {
399 Ok(hashed_state) => {
400 debug!(
401 target: "engine::tree::payload_processor::prewarm",
402 accounts = hashed_state.accounts.len(),
403 storages = hashed_state.storages.len(),
404 "Converted BAL to hashed post state"
405 );
406 let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
407 let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
408 }
409 Err(err) => {
410 warn!(
411 target: "engine::tree::payload_processor::prewarm",
412 ?err,
413 "Failed to convert BAL to hashed state"
414 );
415 }
416 }
417 }
418
419 #[instrument(
424 parent = &self.parent_span,
425 level = "debug",
426 target = "engine::tree::payload_processor::prewarm",
427 name = "prewarm and caching",
428 skip_all
429 )]
430 pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
431 where
432 Tx: ExecutableTxFor<Evm> + Send + 'static,
433 {
434 match mode {
436 PrewarmMode::Transactions(pending) => {
437 self.spawn_txs_prewarm(pending, actions_tx, self.to_multi_proof.clone());
438 }
439 PrewarmMode::BlockAccessList(bal) => {
440 self.run_bal_prewarm(bal, actions_tx);
441 }
442 PrewarmMode::Skipped => {
443 let _ = actions_tx
444 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
445 }
446 }
447
448 let mut final_execution_outcome = None;
449 let mut finished_execution = false;
450 while let Ok(event) = self.actions_rx.recv() {
451 match event {
452 PrewarmTaskEvent::TerminateTransactionExecution => {
453 debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
455 self.ctx.stop();
456 }
457 PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
458 trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
459 final_execution_outcome =
460 Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
461
462 if finished_execution {
463 break
465 }
466 }
467 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
468 trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
469 self.ctx.metrics.transactions.set(executed_transactions as f64);
470 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
471
472 finished_execution = true;
473
474 if final_execution_outcome.is_some() {
475 break
477 }
478 }
479 }
480 }
481
482 debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
483
484 if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
486 self.save_cache(execution_outcome, valid_block_rx);
487 }
488 }
489}
490
491#[derive(Debug, Clone)]
493pub struct PrewarmContext<N, P, Evm>
494where
495 N: NodePrimitives,
496 Evm: ConfigureEvm<Primitives = N>,
497{
498 pub env: ExecutionEnv<Evm>,
500 pub evm_config: Evm,
502 pub saved_cache: Option<SavedCache>,
504 pub provider: StateProviderBuilder<N, P>,
506 pub metrics: PrewarmMetrics,
508 pub terminate_execution: Arc<AtomicBool>,
510 pub executed_tx_index: Arc<AtomicUsize>,
514 pub precompile_cache_disabled: bool,
516 pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
518}
519
520type PrewarmEvmState<Evm> =
523 Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
524
525impl<N, P, Evm> PrewarmContext<N, P, Evm>
526where
527 N: NodePrimitives,
528 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
529 Evm: ConfigureEvm<Primitives = N> + 'static,
530{
531 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
533 fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
534 let mut state_provider = match self.provider.build() {
535 Ok(provider) => provider,
536 Err(err) => {
537 trace!(
538 target: "engine::tree::payload_processor::prewarm",
539 %err,
540 "Failed to build state provider in prewarm thread"
541 );
542 return None
543 }
544 };
545
546 if let Some(saved_cache) = &self.saved_cache {
548 let caches = saved_cache.cache().clone();
549 let cache_metrics = saved_cache.metrics().clone();
550 state_provider =
551 Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
552 }
553
554 let state_provider = StateProviderDatabase::new(state_provider);
555
556 let mut evm_env = self.env.evm_env.clone();
557
558 evm_env.cfg_env.disable_nonce_check = true;
561
562 evm_env.cfg_env.disable_balance_check = true;
565
566 let spec_id = *evm_env.spec_id();
568 let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
569
570 if !self.precompile_cache_disabled {
571 evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
573 CachedPrecompile::wrap(
574 precompile,
575 self.precompile_cache_map.cache_for_address(*address),
576 spec_id,
577 None, )
579 });
580 }
581
582 Some(evm)
583 }
584
585 #[inline]
587 pub fn should_stop(&self) -> bool {
588 self.terminate_execution.load(Ordering::Relaxed)
589 }
590
591 #[inline]
593 pub fn stop(&self) {
594 self.terminate_execution.store(true, Ordering::Relaxed);
595 }
596
597 fn prefetch_bal_account(
602 &self,
603 provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox>>,
604 account: &alloy_eip7928::AccountChanges,
605 ) {
606 let state_provider = match provider {
607 Some(p) => p,
608 slot @ None => {
609 let built = match self.provider.build() {
610 Ok(p) => p,
611 Err(err) => {
612 trace!(
613 target: "engine::tree::payload_processor::prewarm",
614 %err,
615 "Failed to build state provider in BAL prewarm thread"
616 );
617 return;
618 }
619 };
620 let saved_cache =
621 self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
622 let caches = saved_cache.cache().clone();
623 let cache_metrics = saved_cache.metrics().clone();
624 slot.insert(CachedStateProvider::new(built, caches, cache_metrics))
625 }
626 };
627
628 let start = Instant::now();
629
630 let _ = state_provider.basic_account(&account.address);
631
632 for slot in &account.storage_changes {
633 let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
634 }
635 for &slot in &account.storage_reads {
636 let _ = state_provider.storage(account.address, StorageKey::from(slot));
637 }
638
639 self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
640 }
641}
642
643fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
646 let mut targets = MultiProofTargetsV2::default();
647 let mut storage_target_count = 0;
648 for (addr, account) in state {
649 if !account.is_touched() || account.is_selfdestructed() {
657 continue
658 }
659
660 let hashed_address = keccak256(addr);
661 targets.account_targets.push(hashed_address.into());
662
663 let mut storage_slots = Vec::with_capacity(account.storage.len());
664 for (key, slot) in account.storage {
665 if !slot.is_changed() {
667 continue
668 }
669
670 let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
671 storage_slots.push(ProofV2Target::from(hashed_slot));
672 }
673
674 storage_target_count += storage_slots.len();
675 if !storage_slots.is_empty() {
676 targets.storage_targets.insert(hashed_address, storage_slots);
677 }
678 }
679
680 (targets, storage_target_count)
681}
682
683fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
688 MultiProofTargetsV2 {
689 account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
690 ..Default::default()
691 }
692}
693
694#[derive(Debug)]
699pub enum PrewarmTaskEvent<R> {
700 TerminateTransactionExecution,
702 Terminate {
705 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
708 valid_block_rx: mpsc::Receiver<()>,
713 },
714 FinishedTxExecution {
716 executed_transactions: usize,
718 },
719}
720
721#[derive(Metrics, Clone)]
723#[metrics(scope = "sync.prewarm")]
724pub struct PrewarmMetrics {
725 pub(crate) transactions: Gauge,
727 pub(crate) transactions_histogram: Histogram,
729 pub(crate) total_runtime: Histogram,
731 pub(crate) execution_duration: Histogram,
733 pub(crate) prefetch_storage_targets: Histogram,
735 pub(crate) cache_saving_duration: Gauge,
737 pub(crate) transaction_errors: Counter,
739 pub(crate) bal_slot_iteration_duration: Histogram,
741}