1use crate::tree::{
15 payload_processor::{bal, multiproof::MultiProofMessage},
16 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
17 CachedStateProvider, ExecutionEnv, PayloadExecutionCache, SavedCache, StateProviderBuilder,
18};
19use alloy_consensus::transaction::TxHashRef;
20use alloy_eip7928::BlockAccessList;
21use alloy_eips::eip4895::Withdrawal;
22use alloy_primitives::{keccak256, StorageKey, B256};
23use crossbeam_channel::Sender as CrossbeamSender;
24use metrics::{Counter, Gauge, Histogram};
25use rayon::prelude::*;
26use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
27use reth_metrics::Metrics;
28use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
29use reth_provider::{
30 AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
31 StateReader,
32};
33use reth_revm::{database::StateProviderDatabase, state::EvmState};
34use reth_tasks::{pool::WorkerPool, Runtime};
35use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
36use std::sync::{
37 atomic::{AtomicBool, AtomicUsize, Ordering},
38 mpsc::{self, channel, Receiver, Sender},
39 Arc,
40};
41use tracing::{debug, debug_span, instrument, trace, trace_span, warn, Span};
42
43#[derive(Debug)]
45pub enum PrewarmMode<Tx> {
46 Transactions(Receiver<(usize, Tx)>),
48 BlockAccessList(Arc<BlockAccessList>),
50 Skipped,
53}
54
55#[derive(Debug)]
60pub struct PrewarmCacheTask<N, P, Evm>
61where
62 N: NodePrimitives,
63 Evm: ConfigureEvm<Primitives = N>,
64{
65 executor: Runtime,
67 execution_cache: PayloadExecutionCache,
69 ctx: PrewarmContext<N, P, Evm>,
71 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
73 actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
75 parent_span: Span,
77}
78
79impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
80where
81 N: NodePrimitives,
82 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
83 Evm: ConfigureEvm<Primitives = N> + 'static,
84{
85 pub fn new(
87 executor: Runtime,
88 execution_cache: PayloadExecutionCache,
89 ctx: PrewarmContext<N, P, Evm>,
90 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
91 ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
92 let (actions_tx, actions_rx) = channel();
93
94 trace!(
95 target: "engine::tree::payload_processor::prewarm",
96 prewarming_threads = executor.prewarming_pool().current_num_threads(),
97 transaction_count = ctx.env.transaction_count,
98 "Initialized prewarm task"
99 );
100
101 (
102 Self {
103 executor,
104 execution_cache,
105 ctx,
106 to_multi_proof,
107 actions_rx,
108 parent_span: Span::current(),
109 },
110 actions_tx,
111 )
112 }
113
114 fn spawn_txs_prewarm<Tx>(
121 &self,
122 pending: mpsc::Receiver<(usize, Tx)>,
123 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
124 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
125 ) where
126 Tx: ExecutableTxFor<Evm> + Send + 'static,
127 {
128 let executor = self.executor.clone();
129 let ctx = self.ctx.clone();
130 let span = Span::current();
131
132 self.executor.spawn_blocking_named("prewarm-txs", move || {
133 let _enter = debug_span!(
134 target: "engine::tree::payload_processor::prewarm",
135 parent: span,
136 "prewarm_txs"
137 )
138 .entered();
139
140 let ctx = &ctx;
141 let pool = executor.prewarming_pool();
142
143 let mut tx_count = 0usize;
144 let to_multi_proof = to_multi_proof.as_ref();
145 pool.in_place_scope(|s| {
146 s.spawn(|_| {
147 pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
148 });
149
150 while let Ok((index, tx)) = pending.recv() {
151 if ctx.should_stop() {
152 trace!(
153 target: "engine::tree::payload_processor::prewarm",
154 "Termination requested, stopping transaction distribution"
155 );
156 break;
157 }
158
159 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
161 continue;
162 }
163
164 tx_count += 1;
165 let parent_span = Span::current();
166 s.spawn(move |_| {
167 let _enter = trace_span!(
168 target: "engine::tree::payload_processor::prewarm",
169 parent: parent_span,
170 "prewarm_tx",
171 i = index,
172 )
173 .entered();
174 Self::transact_worker(ctx, index, tx, to_multi_proof);
175 });
176 }
177
178 if let Some(to_multi_proof) = to_multi_proof &&
180 let Some(withdrawals) = &ctx.env.withdrawals &&
181 !withdrawals.is_empty()
182 {
183 let targets = multiproof_targets_from_withdrawals(withdrawals);
184 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
185 }
186 });
187
188 pool.clear();
190
191 let _ = actions_tx
192 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
193 });
194 }
195
196 fn transact_worker<Tx>(
201 ctx: &PrewarmContext<N, P, Evm>,
202 index: usize,
203 tx: Tx,
204 to_multi_proof: Option<&CrossbeamSender<MultiProofMessage>>,
205 ) where
206 Tx: ExecutableTxFor<Evm>,
207 {
208 WorkerPool::with_worker_mut(|worker| {
209 let Some(evm) =
210 worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
211 else {
212 return;
213 };
214
215 if ctx.should_stop() {
216 return;
217 }
218
219 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
221 return;
222 }
223
224 let start = Instant::now();
225
226 let (tx_env, tx) = tx.into_parts();
227 let res = match evm.transact(tx_env) {
228 Ok(res) => res,
229 Err(err) => {
230 trace!(
231 target: "engine::tree::payload_processor::prewarm",
232 %err,
233 tx_hash=%tx.tx().tx_hash(),
234 sender=%tx.signer(),
235 "Error when executing prewarm transaction",
236 );
237 ctx.metrics.transaction_errors.increment(1);
238 return;
239 }
240 };
241 ctx.metrics.execution_duration.record(start.elapsed());
242
243 if ctx.should_stop() {
244 return;
245 }
246
247 if index > 0 {
248 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
249 ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
250 if let Some(to_multi_proof) = to_multi_proof {
251 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
252 }
253 }
254
255 ctx.metrics.total_runtime.record(start.elapsed());
256 });
257 }
258
259 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
271 fn save_cache(
272 self,
273 execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
274 valid_block_rx: mpsc::Receiver<()>,
275 ) {
276 let start = Instant::now();
277
278 let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
279 self;
280 let hash = env.hash;
281
282 if let Some(saved_cache) = saved_cache {
283 debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
284 execution_cache.update_with_guard(|cached| {
286 let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
289 let new_cache = SavedCache::new(hash, caches, cache_metrics)
290 .with_disable_cache_metrics(disable_cache_metrics);
291
292 if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
295 *cached = None;
297 debug!(target: "engine::caching", "cleared execution cache on update error");
298 return;
299 }
300
301 new_cache.update_metrics();
302
303 if valid_block_rx.recv().is_ok() {
304 *cached = Some(new_cache);
307 } else {
308 *cached = None;
311 debug!(target: "engine::caching", "cleared execution cache on invalid block");
312 }
313 });
314
315 let elapsed = start.elapsed();
316 debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
317
318 metrics.cache_saving_duration.set(elapsed.as_secs_f64());
319 }
320 }
321
322 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
325 fn run_bal_prewarm(
326 &self,
327 bal: Arc<BlockAccessList>,
328 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
329 ) {
330 if self.ctx.saved_cache.is_none() {
332 trace!(
333 target: "engine::tree::payload_processor::prewarm",
334 "Skipping BAL prewarm - no cache available"
335 );
336 self.send_bal_hashed_state(&bal);
337 let _ =
338 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
339 return;
340 }
341
342 if bal.is_empty() {
343 self.send_bal_hashed_state(&bal);
344 let _ =
345 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
346 return;
347 }
348
349 trace!(
350 target: "engine::tree::payload_processor::prewarm",
351 accounts = bal.len(),
352 "Starting BAL prewarm"
353 );
354
355 let ctx = self.ctx.clone();
356 self.executor.prewarming_pool().install_fn(|| {
357 bal.par_iter().for_each_init(
358 || (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
359 |(ctx, provider), account| {
360 if ctx.should_stop() {
361 return;
362 }
363 ctx.prefetch_bal_account(provider, account);
364 },
365 );
366 });
367
368 trace!(
369 target: "engine::tree::payload_processor::prewarm",
370 "All BAL prewarm accounts completed"
371 );
372
373 self.send_bal_hashed_state(&bal);
375
376 let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
378 }
379
380 fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
383 let Some(to_multi_proof) = &self.to_multi_proof else { return };
384
385 let provider = match self.ctx.provider.build() {
386 Ok(provider) => provider,
387 Err(err) => {
388 warn!(
389 target: "engine::tree::payload_processor::prewarm",
390 ?err,
391 "Failed to build provider for BAL hashed state conversion"
392 );
393 return;
394 }
395 };
396
397 match bal::bal_to_hashed_post_state(bal, &provider) {
398 Ok(hashed_state) => {
399 debug!(
400 target: "engine::tree::payload_processor::prewarm",
401 accounts = hashed_state.accounts.len(),
402 storages = hashed_state.storages.len(),
403 "Converted BAL to hashed post state"
404 );
405 let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
406 let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
407 }
408 Err(err) => {
409 warn!(
410 target: "engine::tree::payload_processor::prewarm",
411 ?err,
412 "Failed to convert BAL to hashed state"
413 );
414 }
415 }
416 }
417
418 #[instrument(
423 parent = &self.parent_span,
424 level = "debug",
425 target = "engine::tree::payload_processor::prewarm",
426 name = "prewarm and caching",
427 skip_all
428 )]
429 pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
430 where
431 Tx: ExecutableTxFor<Evm> + Send + 'static,
432 {
433 match mode {
435 PrewarmMode::Transactions(pending) => {
436 self.spawn_txs_prewarm(pending, actions_tx, self.to_multi_proof.clone());
437 }
438 PrewarmMode::BlockAccessList(bal) => {
439 self.run_bal_prewarm(bal, actions_tx);
440 }
441 PrewarmMode::Skipped => {
442 let _ = actions_tx
443 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
444 }
445 }
446
447 let mut final_execution_outcome = None;
448 let mut finished_execution = false;
449 while let Ok(event) = self.actions_rx.recv() {
450 match event {
451 PrewarmTaskEvent::TerminateTransactionExecution => {
452 debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
454 self.ctx.stop();
455 }
456 PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
457 trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
458 final_execution_outcome =
459 Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
460
461 if finished_execution {
462 break
464 }
465 }
466 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
467 trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
468 self.ctx.metrics.transactions.set(executed_transactions as f64);
469 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
470
471 finished_execution = true;
472
473 if final_execution_outcome.is_some() {
474 break
476 }
477 }
478 }
479 }
480
481 debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
482
483 if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
485 self.save_cache(execution_outcome, valid_block_rx);
486 }
487 }
488}
489
490#[derive(Debug, Clone)]
492pub struct PrewarmContext<N, P, Evm>
493where
494 N: NodePrimitives,
495 Evm: ConfigureEvm<Primitives = N>,
496{
497 pub env: ExecutionEnv<Evm>,
499 pub evm_config: Evm,
501 pub saved_cache: Option<SavedCache>,
503 pub provider: StateProviderBuilder<N, P>,
505 pub metrics: PrewarmMetrics,
507 pub terminate_execution: Arc<AtomicBool>,
509 pub executed_tx_index: Arc<AtomicUsize>,
513 pub precompile_cache_disabled: bool,
515 pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
517}
518
519type PrewarmEvmState<Evm> =
522 Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
523
524impl<N, P, Evm> PrewarmContext<N, P, Evm>
525where
526 N: NodePrimitives,
527 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
528 Evm: ConfigureEvm<Primitives = N> + 'static,
529{
530 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
532 fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
533 let mut state_provider = match self.provider.build() {
534 Ok(provider) => provider,
535 Err(err) => {
536 trace!(
537 target: "engine::tree::payload_processor::prewarm",
538 %err,
539 "Failed to build state provider in prewarm thread"
540 );
541 return None
542 }
543 };
544
545 if let Some(saved_cache) = &self.saved_cache {
547 let caches = saved_cache.cache().clone();
548 let cache_metrics = saved_cache.metrics().clone();
549 state_provider =
550 Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
551 }
552
553 let state_provider = StateProviderDatabase::new(state_provider);
554
555 let mut evm_env = self.env.evm_env.clone();
556
557 evm_env.cfg_env.disable_nonce_check = true;
560
561 evm_env.cfg_env.disable_balance_check = true;
564
565 let spec_id = *evm_env.spec_id();
567 let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
568
569 if !self.precompile_cache_disabled {
570 evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
572 CachedPrecompile::wrap(
573 precompile,
574 self.precompile_cache_map.cache_for_address(*address),
575 spec_id,
576 None, )
578 });
579 }
580
581 Some(evm)
582 }
583
584 #[inline]
586 pub fn should_stop(&self) -> bool {
587 self.terminate_execution.load(Ordering::Relaxed)
588 }
589
590 #[inline]
592 pub fn stop(&self) {
593 self.terminate_execution.store(true, Ordering::Relaxed);
594 }
595
596 fn prefetch_bal_account(
601 &self,
602 provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox>>,
603 account: &alloy_eip7928::AccountChanges,
604 ) {
605 let state_provider = match provider {
606 Some(p) => p,
607 slot @ None => {
608 let built = match self.provider.build() {
609 Ok(p) => p,
610 Err(err) => {
611 trace!(
612 target: "engine::tree::payload_processor::prewarm",
613 %err,
614 "Failed to build state provider in BAL prewarm thread"
615 );
616 return;
617 }
618 };
619 let saved_cache =
620 self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
621 let caches = saved_cache.cache().clone();
622 let cache_metrics = saved_cache.metrics().clone();
623 slot.insert(CachedStateProvider::new(built, caches, cache_metrics))
624 }
625 };
626
627 let start = Instant::now();
628
629 let _ = state_provider.basic_account(&account.address);
630
631 for slot in &account.storage_changes {
632 let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
633 }
634 for &slot in &account.storage_reads {
635 let _ = state_provider.storage(account.address, StorageKey::from(slot));
636 }
637
638 self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
639 }
640}
641
642fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
645 let mut targets = MultiProofTargetsV2::default();
646 let mut storage_target_count = 0;
647 for (addr, account) in state {
648 if !account.is_touched() || account.is_selfdestructed() {
656 continue
657 }
658
659 let hashed_address = keccak256(addr);
660 targets.account_targets.push(hashed_address.into());
661
662 let mut storage_slots = Vec::with_capacity(account.storage.len());
663 for (key, slot) in account.storage {
664 if !slot.is_changed() {
666 continue
667 }
668
669 let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
670 storage_slots.push(ProofV2Target::from(hashed_slot));
671 }
672
673 storage_target_count += storage_slots.len();
674 if !storage_slots.is_empty() {
675 targets.storage_targets.insert(hashed_address, storage_slots);
676 }
677 }
678
679 (targets, storage_target_count)
680}
681
682fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
687 MultiProofTargetsV2 {
688 account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
689 ..Default::default()
690 }
691}
692
693#[derive(Debug)]
698pub enum PrewarmTaskEvent<R> {
699 TerminateTransactionExecution,
701 Terminate {
704 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
707 valid_block_rx: mpsc::Receiver<()>,
712 },
713 FinishedTxExecution {
715 executed_transactions: usize,
717 },
718}
719
720#[derive(Metrics, Clone)]
722#[metrics(scope = "sync.prewarm")]
723pub struct PrewarmMetrics {
724 pub(crate) transactions: Gauge,
726 pub(crate) transactions_histogram: Histogram,
728 pub(crate) total_runtime: Histogram,
730 pub(crate) execution_duration: Histogram,
732 pub(crate) prefetch_storage_targets: Histogram,
734 pub(crate) cache_saving_duration: Gauge,
736 pub(crate) transaction_errors: Counter,
738 pub(crate) bal_slot_iteration_duration: Histogram,
740}