1use crate::tree::{
15 cached_state::{CachedStateProvider, SavedCache},
16 payload_processor::{
17 bal::{total_slots, BALSlotIter},
18 executor::WorkloadExecutor,
19 multiproof::MultiProofMessage,
20 ExecutionCache as PayloadExecutionCache,
21 },
22 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
23 ExecutionEnv, StateProviderBuilder,
24};
25use alloy_consensus::transaction::TxHashRef;
26use alloy_eip7928::BlockAccessList;
27use alloy_eips::Typed2718;
28use alloy_evm::Database;
29use alloy_primitives::{keccak256, map::B256Set, B256};
30use crossbeam_channel::Sender as CrossbeamSender;
31use metrics::{Counter, Gauge, Histogram};
32use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
33use reth_execution_types::ExecutionOutcome;
34use reth_metrics::Metrics;
35use reth_primitives_traits::NodePrimitives;
36use reth_provider::{AccountReader, BlockReader, StateProvider, StateProviderFactory, StateReader};
37use reth_revm::{database::StateProviderDatabase, state::EvmState};
38use reth_trie::MultiProofTargets;
39use std::{
40 ops::Range,
41 sync::{
42 atomic::{AtomicBool, Ordering},
43 mpsc::{self, channel, Receiver, Sender},
44 Arc,
45 },
46 time::Instant,
47};
48use tracing::{debug, debug_span, instrument, trace, warn, Span};
49
50pub(super) enum PrewarmMode<Tx> {
52 Transactions(Receiver<Tx>),
54 BlockAccessList(Arc<BlockAccessList>),
56}
57
58#[derive(Clone)]
60struct IndexedTransaction<Tx> {
61 index: usize,
63 tx: Tx,
65}
66
67const MAX_STANDARD_TX_TYPE: u8 = 4;
79
80pub(super) struct PrewarmCacheTask<N, P, Evm>
85where
86 N: NodePrimitives,
87 Evm: ConfigureEvm<Primitives = N>,
88{
89 executor: WorkloadExecutor,
91 execution_cache: PayloadExecutionCache,
93 ctx: PrewarmContext<N, P, Evm>,
95 max_concurrency: usize,
97 transaction_count_hint: usize,
99 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
101 actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
103 parent_span: Span,
105}
106
107impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
108where
109 N: NodePrimitives,
110 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
111 Evm: ConfigureEvm<Primitives = N> + 'static,
112{
113 pub(super) fn new(
115 executor: WorkloadExecutor,
116 execution_cache: PayloadExecutionCache,
117 ctx: PrewarmContext<N, P, Evm>,
118 to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
119 transaction_count_hint: usize,
120 max_concurrency: usize,
121 ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
122 let (actions_tx, actions_rx) = channel();
123
124 trace!(
125 target: "engine::tree::payload_processor::prewarm",
126 max_concurrency,
127 transaction_count_hint,
128 "Initialized prewarm task"
129 );
130
131 (
132 Self {
133 executor,
134 execution_cache,
135 ctx,
136 max_concurrency,
137 transaction_count_hint,
138 to_multi_proof,
139 actions_rx,
140 parent_span: Span::current(),
141 },
142 actions_tx,
143 )
144 }
145
146 fn spawn_all<Tx>(
152 &self,
153 pending: mpsc::Receiver<Tx>,
154 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
155 ) where
156 Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
157 {
158 let executor = self.executor.clone();
159 let ctx = self.ctx.clone();
160 let max_concurrency = self.max_concurrency;
161 let transaction_count_hint = self.transaction_count_hint;
162 let span = Span::current();
163
164 self.executor.spawn_blocking(move || {
165 let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
166
167 let (done_tx, done_rx) = mpsc::channel();
168
169 let workers_needed = if transaction_count_hint == 0 {
173 max_concurrency
174 } else {
175 transaction_count_hint.min(max_concurrency)
176 };
177
178 let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
180
181 let mut tx_index = 0usize;
183 while let Ok(tx) = pending.recv() {
184 if ctx.terminate_execution.load(Ordering::Relaxed) {
186 trace!(
187 target: "engine::tree::payload_processor::prewarm",
188 "Termination requested, stopping transaction distribution"
189 );
190 break;
191 }
192
193 let indexed_tx = IndexedTransaction { index: tx_index, tx };
194 let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
195
196 if tx_index == 0 && is_system_tx {
202 for handle in &handles {
203 let _ = handle.send(indexed_tx.clone());
208 }
209 } else {
210 let worker_idx = tx_index % workers_needed;
212 let _ = handles[worker_idx].send(indexed_tx);
217 }
218
219 tx_index += 1;
220 }
221
222 drop(done_tx);
224 drop(handles);
225 while done_rx.recv().is_ok() {}
226
227 let _ = actions_tx
228 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
229 });
230 }
231
232 fn is_execution_terminated(&self) -> bool {
234 self.ctx.terminate_execution.load(Ordering::Relaxed)
235 }
236
237 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
239 if self.is_execution_terminated() {
240 return
243 }
244
245 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
246 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
247 }
248 }
249
250 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
262 fn save_cache(self, execution_outcome: Arc<ExecutionOutcome<N::Receipt>>) {
263 let start = Instant::now();
264
265 let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
266 self;
267 let hash = env.hash;
268
269 if let Some(saved_cache) = saved_cache {
270 debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
271 execution_cache.update_with_guard(|cached| {
273 let (caches, cache_metrics) = saved_cache.split();
276 let new_cache = SavedCache::new(hash, caches, cache_metrics);
277
278 if new_cache.cache().insert_state(execution_outcome.state()).is_err() {
281 *cached = None;
283 debug!(target: "engine::caching", "cleared execution cache on update error");
284 return;
285 }
286
287 new_cache.update_metrics();
288
289 *cached = Some(new_cache);
292 });
293
294 let elapsed = start.elapsed();
295 debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
296
297 metrics.cache_saving_duration.set(elapsed.as_secs_f64());
298 }
299 }
300
301 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
306 fn run_bal_prewarm(
307 &self,
308 bal: Arc<BlockAccessList>,
309 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
310 ) {
311 if self.ctx.saved_cache.is_none() {
313 trace!(
314 target: "engine::tree::payload_processor::prewarm",
315 "Skipping BAL prewarm - no cache available"
316 );
317 let _ =
318 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
319 return;
320 }
321
322 let total_slots = total_slots(&bal);
323
324 trace!(
325 target: "engine::tree::payload_processor::prewarm",
326 total_slots,
327 max_concurrency = self.max_concurrency,
328 "Starting BAL prewarm"
329 );
330
331 if total_slots == 0 {
332 let _ =
334 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
335 return;
336 }
337
338 let (done_tx, done_rx) = mpsc::channel();
339
340 let workers_needed = total_slots.min(self.max_concurrency);
342
343 let slots_per_worker = total_slots / workers_needed;
345 let remainder = total_slots % workers_needed;
346
347 for i in 0..workers_needed {
349 let start = i * slots_per_worker + i.min(remainder);
350 let extra = if i < remainder { 1 } else { 0 };
351 let end = start + slots_per_worker + extra;
352
353 self.ctx.spawn_bal_worker(
354 i,
355 &self.executor,
356 Arc::clone(&bal),
357 start..end,
358 done_tx.clone(),
359 );
360 }
361
362 drop(done_tx);
364
365 let mut completed_workers = 0;
367 while done_rx.recv().is_ok() {
368 completed_workers += 1;
369 }
370
371 trace!(
372 target: "engine::tree::payload_processor::prewarm",
373 completed_workers,
374 "All BAL prewarm workers completed"
375 );
376
377 let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
379 }
380
381 #[instrument(
386 parent = &self.parent_span,
387 level = "debug",
388 target = "engine::tree::payload_processor::prewarm",
389 name = "prewarm and caching",
390 skip_all
391 )]
392 pub(super) fn run<Tx>(
393 self,
394 mode: PrewarmMode<Tx>,
395 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
396 ) where
397 Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
398 {
399 match mode {
401 PrewarmMode::Transactions(pending) => {
402 self.spawn_all(pending, actions_tx);
403 }
404 PrewarmMode::BlockAccessList(bal) => {
405 self.run_bal_prewarm(bal, actions_tx);
406 }
407 }
408
409 let mut final_execution_outcome = None;
410 let mut finished_execution = false;
411 while let Ok(event) = self.actions_rx.recv() {
412 match event {
413 PrewarmTaskEvent::TerminateTransactionExecution => {
414 debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
416 self.ctx.terminate_execution.store(true, Ordering::Relaxed);
417 }
418 PrewarmTaskEvent::Outcome { proof_targets } => {
419 self.send_multi_proof_targets(proof_targets);
421 }
422 PrewarmTaskEvent::Terminate { execution_outcome } => {
423 trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
424 final_execution_outcome = Some(execution_outcome);
425
426 if finished_execution {
427 break
429 }
430 }
431 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
432 trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
433 self.ctx.metrics.transactions.set(executed_transactions as f64);
434 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
435
436 finished_execution = true;
437
438 if final_execution_outcome.is_some() {
439 break
441 }
442 }
443 }
444 }
445
446 debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
447
448 if let Some(Some(execution_outcome)) = final_execution_outcome {
450 self.save_cache(execution_outcome);
451 }
452 }
453}
454
455#[derive(Debug, Clone)]
457pub(super) struct PrewarmContext<N, P, Evm>
458where
459 N: NodePrimitives,
460 Evm: ConfigureEvm<Primitives = N>,
461{
462 pub(super) env: ExecutionEnv<Evm>,
463 pub(super) evm_config: Evm,
464 pub(super) saved_cache: Option<SavedCache>,
465 pub(super) provider: StateProviderBuilder<N, P>,
467 pub(super) metrics: PrewarmMetrics,
468 pub(super) terminate_execution: Arc<AtomicBool>,
470 pub(super) precompile_cache_disabled: bool,
471 pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
472}
473
474impl<N, P, Evm> PrewarmContext<N, P, Evm>
475where
476 N: NodePrimitives,
477 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
478 Evm: ConfigureEvm<Primitives = N> + 'static,
479{
480 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
483 fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
484 let Self {
485 env,
486 evm_config,
487 saved_cache,
488 provider,
489 metrics,
490 terminate_execution,
491 precompile_cache_disabled,
492 precompile_cache_map,
493 } = self;
494
495 let mut state_provider = match provider.build() {
496 Ok(provider) => provider,
497 Err(err) => {
498 trace!(
499 target: "engine::tree::payload_processor::prewarm",
500 %err,
501 "Failed to build state provider in prewarm thread"
502 );
503 return None
504 }
505 };
506
507 if let Some(saved_cache) = saved_cache {
509 let caches = saved_cache.cache().clone();
510 let cache_metrics = saved_cache.metrics().clone();
511 state_provider = Box::new(
512 CachedStateProvider::new(state_provider, caches, cache_metrics)
513 .prewarm(),
515 );
516 }
517
518 let state_provider = StateProviderDatabase::new(state_provider);
519
520 let mut evm_env = env.evm_env;
521
522 evm_env.cfg_env.disable_nonce_check = true;
525
526 let spec_id = *evm_env.spec_id();
528 let mut evm = evm_config.evm_with_env(state_provider, evm_env);
529
530 if !precompile_cache_disabled {
531 evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
533 CachedPrecompile::wrap(
534 precompile,
535 precompile_cache_map.cache_for_address(*address),
536 spec_id,
537 None, )
539 });
540 }
541
542 Some((evm, metrics, terminate_execution))
543 }
544
545 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
555 fn transact_batch<Tx>(
556 self,
557 txs: mpsc::Receiver<IndexedTransaction<Tx>>,
558 sender: Sender<PrewarmTaskEvent<N::Receipt>>,
559 done_tx: Sender<()>,
560 ) where
561 Tx: ExecutableTxFor<Evm>,
562 {
563 let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
564
565 while let Ok(IndexedTransaction { index, tx }) = {
566 let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
567 .entered();
568 txs.recv()
569 } {
570 let enter =
571 debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
572 .entered();
573
574 let start = Instant::now();
576
577 if terminate_execution.load(Ordering::Relaxed) {
580 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
581 break
582 }
583
584 let res = match evm.transact(&tx) {
585 Ok(res) => res,
586 Err(err) => {
587 trace!(
588 target: "engine::tree::payload_processor::prewarm",
589 %err,
590 tx_hash=%tx.tx().tx_hash(),
591 sender=%tx.signer(),
592 "Error when executing prewarm transaction",
593 );
594 metrics.transaction_errors.increment(1);
596 continue
598 }
599 };
600 metrics.execution_duration.record(start.elapsed());
601
602 enter.record("gas_used", res.result.gas_used());
604 enter.record("is_success", res.result.is_success());
605
606 drop(enter);
607
608 if terminate_execution.load(Ordering::Relaxed) {
611 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
612 break
613 }
614
615 if index > 0 {
618 let _enter =
619 debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
620 .entered();
621 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
622 metrics.prefetch_storage_targets.record(storage_targets as f64);
623 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
624 drop(_enter);
625 }
626
627 metrics.total_runtime.record(start.elapsed());
628 }
629
630 let _ = done_tx.send(());
632 }
633
634 fn spawn_workers<Tx>(
636 self,
637 workers_needed: usize,
638 task_executor: &WorkloadExecutor,
639 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
640 done_tx: Sender<()>,
641 ) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
642 where
643 Tx: ExecutableTxFor<Evm> + Send + 'static,
644 {
645 let mut handles = Vec::with_capacity(workers_needed);
646 let mut receivers = Vec::with_capacity(workers_needed);
647
648 for _ in 0..workers_needed {
649 let (tx, rx) = mpsc::channel();
650 handles.push(tx);
651 receivers.push(rx);
652 }
653
654 let executor = task_executor.clone();
656 let span = Span::current();
657 task_executor.spawn_blocking(move || {
658 let _enter = span.entered();
659 for (idx, rx) in receivers.into_iter().enumerate() {
660 let ctx = self.clone();
661 let actions_tx = actions_tx.clone();
662 let done_tx = done_tx.clone();
663 let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
664 executor.spawn_blocking(move || {
665 let _enter = span.entered();
666 ctx.transact_batch(rx, actions_tx, done_tx);
667 });
668 }
669 });
670
671 handles
672 }
673
674 fn spawn_bal_worker(
679 &self,
680 idx: usize,
681 executor: &WorkloadExecutor,
682 bal: Arc<BlockAccessList>,
683 range: Range<usize>,
684 done_tx: Sender<()>,
685 ) {
686 let ctx = self.clone();
687 let span = debug_span!(
688 target: "engine::tree::payload_processor::prewarm",
689 "bal prewarm worker",
690 idx,
691 range_start = range.start,
692 range_end = range.end
693 );
694
695 executor.spawn_blocking(move || {
696 let _enter = span.entered();
697 ctx.prefetch_bal_slots(bal, range, done_tx);
698 });
699 }
700
701 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
706 fn prefetch_bal_slots(
707 self,
708 bal: Arc<BlockAccessList>,
709 range: Range<usize>,
710 done_tx: Sender<()>,
711 ) {
712 let Self { saved_cache, provider, metrics, .. } = self;
713
714 let state_provider = match provider.build() {
716 Ok(provider) => provider,
717 Err(err) => {
718 trace!(
719 target: "engine::tree::payload_processor::prewarm",
720 %err,
721 "Failed to build state provider in BAL prewarm thread"
722 );
723 let _ = done_tx.send(());
724 return;
725 }
726 };
727
728 let saved_cache = saved_cache.expect("BAL prewarm should only run with cache");
730 let caches = saved_cache.cache().clone();
731 let cache_metrics = saved_cache.metrics().clone();
732 let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics);
733
734 let start = Instant::now();
735
736 let mut last_address = None;
738
739 for (address, slot) in BALSlotIter::new(&bal, range.clone()) {
741 if last_address != Some(address) {
743 let _ = state_provider.basic_account(&address);
744 last_address = Some(address);
745 }
746
747 let _ = state_provider.storage(address, slot);
749 }
750
751 let elapsed = start.elapsed();
752
753 trace!(
754 target: "engine::tree::payload_processor::prewarm",
755 ?range,
756 elapsed_ms = elapsed.as_millis(),
757 "BAL prewarm worker completed"
758 );
759
760 let _ = done_tx.send(());
762 metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64());
763 }
764}
765
766fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
769 let mut targets = MultiProofTargets::with_capacity(state.len());
770 let mut storage_targets = 0;
771 for (addr, account) in state {
772 if !account.is_touched() || account.is_selfdestructed() {
780 continue
781 }
782
783 let mut storage_set =
784 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
785 for (key, slot) in account.storage {
786 if !slot.is_changed() {
788 continue
789 }
790
791 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
792 }
793
794 storage_targets += storage_set.len();
795 targets.insert(keccak256(addr), storage_set);
796 }
797
798 (targets, storage_targets)
799}
800
801pub(super) enum PrewarmTaskEvent<R> {
806 TerminateTransactionExecution,
808 Terminate {
811 execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
814 },
815 Outcome {
817 proof_targets: Option<MultiProofTargets>,
819 },
820 FinishedTxExecution {
822 executed_transactions: usize,
824 },
825}
826
827#[derive(Metrics, Clone)]
829#[metrics(scope = "sync.prewarm")]
830pub(crate) struct PrewarmMetrics {
831 pub(crate) transactions: Gauge,
833 pub(crate) transactions_histogram: Histogram,
835 pub(crate) total_runtime: Histogram,
837 pub(crate) execution_duration: Histogram,
839 pub(crate) prefetch_storage_targets: Histogram,
841 pub(crate) cache_saving_duration: Gauge,
843 pub(crate) transaction_errors: Counter,
845 pub(crate) bal_slot_iteration_duration: Histogram,
847}