Skip to main content

reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    cached_state::{CachedStateProvider, SavedCache},
16    payload_processor::{bal, multiproof::MultiProofMessage, PayloadExecutionCache},
17    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
18    ExecutionEnv, StateProviderBuilder,
19};
20use alloy_consensus::transaction::TxHashRef;
21use alloy_eip7928::BlockAccessList;
22use alloy_eips::eip4895::Withdrawal;
23use alloy_primitives::{keccak256, StorageKey, B256};
24use crossbeam_channel::Sender as CrossbeamSender;
25use metrics::{Counter, Gauge, Histogram};
26use rayon::prelude::*;
27use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
28use reth_metrics::Metrics;
29use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
30use reth_provider::{
31    AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
32    StateReader,
33};
34use reth_revm::{database::StateProviderDatabase, state::EvmState};
35use reth_tasks::{pool::WorkerPool, Runtime};
36use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
37use std::sync::{
38    atomic::{AtomicBool, AtomicUsize, Ordering},
39    mpsc::{self, channel, Receiver, Sender},
40    Arc,
41};
42use tracing::{debug, debug_span, instrument, trace, warn, Span};
43
44/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
45#[derive(Debug)]
46pub enum PrewarmMode<Tx> {
47    /// Prewarm by executing transactions from a stream, each paired with its block index.
48    Transactions(Receiver<(usize, Tx)>),
49    /// Prewarm by prefetching slots from a Block Access List.
50    BlockAccessList(Arc<BlockAccessList>),
51    /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
52    /// benefit). No workers are spawned.
53    Skipped,
54}
55
56/// A task that is responsible for caching and prewarming the cache by executing transactions
57/// individually in parallel.
58///
59/// Note: This task runs until cancelled externally.
60#[derive(Debug)]
61pub struct PrewarmCacheTask<N, P, Evm>
62where
63    N: NodePrimitives,
64    Evm: ConfigureEvm<Primitives = N>,
65{
66    /// The executor used to spawn execution tasks.
67    executor: Runtime,
68    /// Shared execution cache.
69    execution_cache: PayloadExecutionCache,
70    /// Context provided to execution tasks
71    ctx: PrewarmContext<N, P, Evm>,
72    /// Sender to emit evm state outcome messages, if any.
73    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
74    /// Receiver for events produced by tx execution
75    actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
76    /// Parent span for tracing
77    parent_span: Span,
78}
79
80impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
81where
82    N: NodePrimitives,
83    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
84    Evm: ConfigureEvm<Primitives = N> + 'static,
85{
86    /// Initializes the task with the given transactions pending execution
87    pub fn new(
88        executor: Runtime,
89        execution_cache: PayloadExecutionCache,
90        ctx: PrewarmContext<N, P, Evm>,
91        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
92    ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
93        let (actions_tx, actions_rx) = channel();
94
95        trace!(
96            target: "engine::tree::payload_processor::prewarm",
97            prewarming_threads = executor.prewarming_pool().current_num_threads(),
98            transaction_count = ctx.env.transaction_count,
99            "Initialized prewarm task"
100        );
101
102        (
103            Self {
104                executor,
105                execution_cache,
106                ctx,
107                to_multi_proof,
108                actions_rx,
109                parent_span: Span::current(),
110            },
111            actions_tx,
112        )
113    }
114
115    /// Streams pending transactions and executes them in parallel on the prewarming pool.
116    ///
117    /// Kicks off EVM init on every pool thread, then uses `in_place_scope` to dispatch
118    /// transactions as they arrive and wait for all spawned tasks to complete before
119    /// clearing per-thread state. Workers that start via work-stealing lazily initialise
120    /// their EVM state on first access via [`get_or_init`](reth_tasks::pool::Worker::get_or_init).
121    fn spawn_txs_prewarm<Tx>(
122        &self,
123        pending: mpsc::Receiver<(usize, Tx)>,
124        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
125        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
126    ) where
127        Tx: ExecutableTxFor<Evm> + Send + 'static,
128    {
129        let executor = self.executor.clone();
130        let ctx = self.ctx.clone();
131        let span = Span::current();
132
133        self.executor.spawn_blocking_named("prewarm-txs", move || {
134            let _enter = debug_span!(
135                target: "engine::tree::payload_processor::prewarm",
136                parent: span,
137                "prewarm_txs"
138            )
139            .entered();
140
141            let ctx = &ctx;
142            let pool = executor.prewarming_pool();
143
144            let mut tx_count = 0usize;
145            let to_multi_proof = to_multi_proof.as_ref();
146            pool.in_place_scope(|s| {
147                s.spawn(|_| {
148                    pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
149                });
150
151                while let Ok((index, tx)) = pending.recv() {
152                    if ctx.should_stop() {
153                        trace!(
154                            target: "engine::tree::payload_processor::prewarm",
155                            "Termination requested, stopping transaction distribution"
156                        );
157                        break;
158                    }
159
160                    // skip transactions already executed by the main loop
161                    if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
162                        continue;
163                    }
164
165                    tx_count += 1;
166                    let parent_span = Span::current();
167                    s.spawn(move |_| {
168                        let _enter = debug_span!(
169                            target: "engine::tree::payload_processor::prewarm",
170                            parent: parent_span,
171                            "prewarm_tx",
172                            i = index,
173                        )
174                        .entered();
175                        Self::transact_worker(ctx, index, tx, to_multi_proof);
176                    });
177                }
178
179                // Send withdrawal prefetch targets after all transactions dispatched
180                if let Some(to_multi_proof) = to_multi_proof &&
181                    let Some(withdrawals) = &ctx.env.withdrawals &&
182                    !withdrawals.is_empty()
183                {
184                    let targets = multiproof_targets_from_withdrawals(withdrawals);
185                    let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
186                }
187            });
188
189            // All tasks are done — clear per-thread EVM state for the next block.
190            pool.clear();
191
192            let _ = actions_tx
193                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
194        });
195    }
196
197    /// Executes a single prewarm transaction on the current pool thread's EVM.
198    ///
199    /// Lazily initialises per-thread [`PrewarmEvmState`] via
200    /// [`get_or_init`](reth_tasks::pool::Worker::get_or_init) on first access.
201    fn transact_worker<Tx>(
202        ctx: &PrewarmContext<N, P, Evm>,
203        index: usize,
204        tx: Tx,
205        to_multi_proof: Option<&CrossbeamSender<MultiProofMessage>>,
206    ) where
207        Tx: ExecutableTxFor<Evm>,
208    {
209        WorkerPool::with_worker_mut(|worker| {
210            let Some(evm) =
211                worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
212            else {
213                return;
214            };
215
216            if ctx.should_stop() {
217                return;
218            }
219
220            // skip if main execution has already processed this transaction
221            if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
222                return;
223            }
224
225            let start = Instant::now();
226
227            let (tx_env, tx) = tx.into_parts();
228            let res = match evm.transact(tx_env) {
229                Ok(res) => res,
230                Err(err) => {
231                    trace!(
232                        target: "engine::tree::payload_processor::prewarm",
233                        %err,
234                        tx_hash=%tx.tx().tx_hash(),
235                        sender=%tx.signer(),
236                        "Error when executing prewarm transaction",
237                    );
238                    ctx.metrics.transaction_errors.increment(1);
239                    return;
240                }
241            };
242            ctx.metrics.execution_duration.record(start.elapsed());
243
244            if ctx.should_stop() {
245                return;
246            }
247
248            if index > 0 {
249                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
250                ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
251                if let Some(to_multi_proof) = to_multi_proof {
252                    let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
253                }
254            }
255
256            ctx.metrics.total_runtime.record(start.elapsed());
257        });
258    }
259
260    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
261    /// It should only be called after ensuring that:
262    /// 1. All prewarming tasks have completed execution
263    /// 2. No other concurrent operations are accessing the cache
264    ///
265    /// Saves the warmed caches back into the shared slot after prewarming completes.
266    ///
267    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
268    /// the new, warmed cache to be inserted.
269    ///
270    /// This method is called from `run()` only after all execution tasks are complete.
271    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
272    fn save_cache(
273        self,
274        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
275        valid_block_rx: mpsc::Receiver<()>,
276    ) {
277        let start = Instant::now();
278
279        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
280            self;
281        let hash = env.hash;
282
283        if let Some(saved_cache) = saved_cache {
284            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
285            // Perform all cache operations atomically under the lock
286            execution_cache.update_with_guard(|cached| {
287                // consumes the `SavedCache` held by the prewarming task, which releases its usage
288                // guard
289                let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
290                let new_cache = SavedCache::new(hash, caches, cache_metrics)
291                    .with_disable_cache_metrics(disable_cache_metrics);
292
293                // Insert state into cache while holding the lock
294                // Access the BundleState through the shared ExecutionOutcome
295                if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
296                    // Clear the cache on error to prevent having a polluted cache
297                    *cached = None;
298                    debug!(target: "engine::caching", "cleared execution cache on update error");
299                    return;
300                }
301
302                new_cache.update_metrics();
303
304                if valid_block_rx.recv().is_ok() {
305                    // Replace the shared cache with the new one; the previous cache (if any) is
306                    // dropped.
307                    *cached = Some(new_cache);
308                } else {
309                    // Block was invalid; caches were already mutated by insert_state above,
310                    // so we must clear to prevent using polluted state
311                    *cached = None;
312                    debug!(target: "engine::caching", "cleared execution cache on invalid block");
313                }
314            });
315
316            let elapsed = start.elapsed();
317            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
318
319            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
320        }
321    }
322
323    /// Runs BAL-based prewarming by using the prewarming pool's parallel iterator to prefetch
324    /// accounts and storage slots.
325    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
326    fn run_bal_prewarm(
327        &self,
328        bal: Arc<BlockAccessList>,
329        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
330    ) {
331        // Only prefetch if we have a cache to populate
332        if self.ctx.saved_cache.is_none() {
333            trace!(
334                target: "engine::tree::payload_processor::prewarm",
335                "Skipping BAL prewarm - no cache available"
336            );
337            self.send_bal_hashed_state(&bal);
338            let _ =
339                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
340            return;
341        }
342
343        if bal.is_empty() {
344            self.send_bal_hashed_state(&bal);
345            let _ =
346                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
347            return;
348        }
349
350        trace!(
351            target: "engine::tree::payload_processor::prewarm",
352            accounts = bal.len(),
353            "Starting BAL prewarm"
354        );
355
356        let ctx = self.ctx.clone();
357        self.executor.prewarming_pool().install_fn(|| {
358            bal.par_iter().for_each_init(
359                || (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
360                |(ctx, provider), account| {
361                    if ctx.should_stop() {
362                        return;
363                    }
364                    ctx.prefetch_bal_account(provider, account);
365                },
366            );
367        });
368
369        trace!(
370            target: "engine::tree::payload_processor::prewarm",
371            "All BAL prewarm accounts completed"
372        );
373
374        // Convert BAL to HashedPostState and send to multiproof task
375        self.send_bal_hashed_state(&bal);
376
377        // Signal that execution has finished
378        let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
379    }
380
381    /// Converts the BAL to [`HashedPostState`](reth_trie::HashedPostState) and sends it to the
382    /// multiproof task.
383    fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
384        let Some(to_multi_proof) = &self.to_multi_proof else { return };
385
386        let provider = match self.ctx.provider.build() {
387            Ok(provider) => provider,
388            Err(err) => {
389                warn!(
390                    target: "engine::tree::payload_processor::prewarm",
391                    ?err,
392                    "Failed to build provider for BAL hashed state conversion"
393                );
394                return;
395            }
396        };
397
398        match bal::bal_to_hashed_post_state(bal, &provider) {
399            Ok(hashed_state) => {
400                debug!(
401                    target: "engine::tree::payload_processor::prewarm",
402                    accounts = hashed_state.accounts.len(),
403                    storages = hashed_state.storages.len(),
404                    "Converted BAL to hashed post state"
405                );
406                let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
407                let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
408            }
409            Err(err) => {
410                warn!(
411                    target: "engine::tree::payload_processor::prewarm",
412                    ?err,
413                    "Failed to convert BAL to hashed state"
414                );
415            }
416        }
417    }
418
419    /// Executes the task.
420    ///
421    /// This will execute the transactions until all transactions have been processed or the task
422    /// was cancelled.
423    #[instrument(
424        parent = &self.parent_span,
425        level = "debug",
426        target = "engine::tree::payload_processor::prewarm",
427        name = "prewarm and caching",
428        skip_all
429    )]
430    pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
431    where
432        Tx: ExecutableTxFor<Evm> + Send + 'static,
433    {
434        // Spawn execution tasks based on mode
435        match mode {
436            PrewarmMode::Transactions(pending) => {
437                self.spawn_txs_prewarm(pending, actions_tx, self.to_multi_proof.clone());
438            }
439            PrewarmMode::BlockAccessList(bal) => {
440                self.run_bal_prewarm(bal, actions_tx);
441            }
442            PrewarmMode::Skipped => {
443                let _ = actions_tx
444                    .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
445            }
446        }
447
448        let mut final_execution_outcome = None;
449        let mut finished_execution = false;
450        while let Ok(event) = self.actions_rx.recv() {
451            match event {
452                PrewarmTaskEvent::TerminateTransactionExecution => {
453                    // stop tx processing
454                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
455                    self.ctx.stop();
456                }
457                PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
458                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
459                    final_execution_outcome =
460                        Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
461
462                    if finished_execution {
463                        // all tasks are done, we can exit, which will save caches and exit
464                        break
465                    }
466                }
467                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
468                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
469                    self.ctx.metrics.transactions.set(executed_transactions as f64);
470                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
471
472                    finished_execution = true;
473
474                    if final_execution_outcome.is_some() {
475                        // all tasks are done, we can exit, which will save caches and exit
476                        break
477                    }
478                }
479            }
480        }
481
482        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
483
484        // save caches and finish using the shared ExecutionOutcome
485        if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
486            self.save_cache(execution_outcome, valid_block_rx);
487        }
488    }
489}
490
491/// Context required by tx execution tasks.
492#[derive(Debug, Clone)]
493pub struct PrewarmContext<N, P, Evm>
494where
495    N: NodePrimitives,
496    Evm: ConfigureEvm<Primitives = N>,
497{
498    /// The execution environment.
499    pub env: ExecutionEnv<Evm>,
500    /// The EVM configuration.
501    pub evm_config: Evm,
502    /// The saved cache.
503    pub saved_cache: Option<SavedCache>,
504    /// Provider to obtain the state
505    pub provider: StateProviderBuilder<N, P>,
506    /// The metrics for the prewarm task.
507    pub metrics: PrewarmMetrics,
508    /// An atomic bool that tells prewarm tasks to not start any more execution.
509    pub terminate_execution: Arc<AtomicBool>,
510    /// Shared counter tracking the next transaction index to be executed by the main execution
511    /// loop. Prewarm workers skip transactions with `index < counter` since those have already
512    /// been executed.
513    pub executed_tx_index: Arc<AtomicUsize>,
514    /// Whether the precompile cache is disabled.
515    pub precompile_cache_disabled: bool,
516    /// The precompile cache map.
517    pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
518}
519
520/// Per-thread EVM state initialised by [`PrewarmContext::evm_for_ctx`] and stored in
521/// [`WorkerPool`] workers via [`Worker::get_or_init`](reth_tasks::pool::Worker::get_or_init).
522type PrewarmEvmState<Evm> =
523    Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
524
525impl<N, P, Evm> PrewarmContext<N, P, Evm>
526where
527    N: NodePrimitives,
528    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
529    Evm: ConfigureEvm<Primitives = N> + 'static,
530{
531    /// Creates a per-thread EVM for prewarming.
532    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
533    fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
534        let mut state_provider = match self.provider.build() {
535            Ok(provider) => provider,
536            Err(err) => {
537                trace!(
538                    target: "engine::tree::payload_processor::prewarm",
539                    %err,
540                    "Failed to build state provider in prewarm thread"
541                );
542                return None
543            }
544        };
545
546        // Use the caches to create a new provider with caching
547        if let Some(saved_cache) = &self.saved_cache {
548            let caches = saved_cache.cache().clone();
549            let cache_metrics = saved_cache.metrics().clone();
550            state_provider =
551                Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
552        }
553
554        let state_provider = StateProviderDatabase::new(state_provider);
555
556        let mut evm_env = self.env.evm_env.clone();
557
558        // we must disable the nonce check so that we can execute the transaction even if the nonce
559        // doesn't match what's on chain.
560        evm_env.cfg_env.disable_nonce_check = true;
561
562        // disable the balance check so that transactions from senders who were funded by earlier
563        // transactions in the block can still be prewarmed
564        evm_env.cfg_env.disable_balance_check = true;
565
566        // create a new executor and disable nonce checks in the env
567        let spec_id = *evm_env.spec_id();
568        let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
569
570        if !self.precompile_cache_disabled {
571            // Only cache pure precompiles to avoid issues with stateful precompiles
572            evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
573                CachedPrecompile::wrap(
574                    precompile,
575                    self.precompile_cache_map.cache_for_address(*address),
576                    spec_id,
577                    None, // No metrics for prewarm
578                )
579            });
580        }
581
582        Some(evm)
583    }
584
585    /// Returns `true` if prewarming should stop.
586    #[inline]
587    pub fn should_stop(&self) -> bool {
588        self.terminate_execution.load(Ordering::Relaxed)
589    }
590
591    /// Signals all prewarm tasks to stop execution.
592    #[inline]
593    pub fn stop(&self) {
594        self.terminate_execution.store(true, Ordering::Relaxed);
595    }
596
597    /// Prefetches a single account and all its storage slots from the BAL into the cache.
598    ///
599    /// The `provider` is lazily initialized on first call and reused across accounts on the same
600    /// thread.
601    fn prefetch_bal_account(
602        &self,
603        provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox>>,
604        account: &alloy_eip7928::AccountChanges,
605    ) {
606        let state_provider = match provider {
607            Some(p) => p,
608            slot @ None => {
609                let built = match self.provider.build() {
610                    Ok(p) => p,
611                    Err(err) => {
612                        trace!(
613                            target: "engine::tree::payload_processor::prewarm",
614                            %err,
615                            "Failed to build state provider in BAL prewarm thread"
616                        );
617                        return;
618                    }
619                };
620                let saved_cache =
621                    self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
622                let caches = saved_cache.cache().clone();
623                let cache_metrics = saved_cache.metrics().clone();
624                slot.insert(CachedStateProvider::new(built, caches, cache_metrics))
625            }
626        };
627
628        let start = Instant::now();
629
630        let _ = state_provider.basic_account(&account.address);
631
632        for slot in &account.storage_changes {
633            let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
634        }
635        for &slot in &account.storage_reads {
636            let _ = state_provider.storage(account.address, StorageKey::from(slot));
637        }
638
639        self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
640    }
641}
642
643/// Returns a set of [`MultiProofTargetsV2`] and the total amount of storage targets, based on the
644/// given state.
645fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
646    let mut targets = MultiProofTargetsV2::default();
647    let mut storage_target_count = 0;
648    for (addr, account) in state {
649        // if the account was not touched, or if the account was selfdestructed, do not
650        // fetch proofs for it
651        //
652        // Since selfdestruct can only happen in the same transaction, we can skip
653        // prefetching proofs for selfdestructed accounts
654        //
655        // See: https://eips.ethereum.org/EIPS/eip-6780
656        if !account.is_touched() || account.is_selfdestructed() {
657            continue
658        }
659
660        let hashed_address = keccak256(addr);
661        targets.account_targets.push(hashed_address.into());
662
663        let mut storage_slots = Vec::with_capacity(account.storage.len());
664        for (key, slot) in account.storage {
665            // do nothing if unchanged
666            if !slot.is_changed() {
667                continue
668            }
669
670            let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
671            storage_slots.push(ProofV2Target::from(hashed_slot));
672        }
673
674        storage_target_count += storage_slots.len();
675        if !storage_slots.is_empty() {
676            targets.storage_targets.insert(hashed_address, storage_slots);
677        }
678    }
679
680    (targets, storage_target_count)
681}
682
683/// Returns [`MultiProofTargetsV2`] for withdrawal addresses.
684///
685/// Withdrawals only modify account balances (no storage), so the targets contain
686/// only account-level entries with empty storage sets.
687fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
688    MultiProofTargetsV2 {
689        account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
690        ..Default::default()
691    }
692}
693
694/// The events the pre-warm task can handle.
695///
696/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
697/// execution path without cloning the expensive `BundleState`.
698#[derive(Debug)]
699pub enum PrewarmTaskEvent<R> {
700    /// Forcefully terminate all remaining transaction execution.
701    TerminateTransactionExecution,
702    /// Forcefully terminate the task on demand and update the shared cache with the given output
703    /// before exiting.
704    Terminate {
705        /// The final execution outcome. Using `Arc` allows sharing with the main execution
706        /// path without cloning the expensive `BundleState`.
707        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
708        /// Receiver for the block validation result.
709        ///
710        /// Cache saving is racing the state root validation. We optimistically construct the
711        /// updated cache but only save it once we know the block is valid.
712        valid_block_rx: mpsc::Receiver<()>,
713    },
714    /// Finished executing all transactions
715    FinishedTxExecution {
716        /// Number of transactions executed
717        executed_transactions: usize,
718    },
719}
720
721/// Metrics for transactions prewarming.
722#[derive(Metrics, Clone)]
723#[metrics(scope = "sync.prewarm")]
724pub struct PrewarmMetrics {
725    /// The number of transactions to prewarm
726    pub(crate) transactions: Gauge,
727    /// A histogram of the number of transactions to prewarm
728    pub(crate) transactions_histogram: Histogram,
729    /// A histogram of duration per transaction prewarming
730    pub(crate) total_runtime: Histogram,
731    /// A histogram of EVM execution duration per transaction prewarming
732    pub(crate) execution_duration: Histogram,
733    /// A histogram for prefetch targets per transaction prewarming
734    pub(crate) prefetch_storage_targets: Histogram,
735    /// A histogram of duration for cache saving
736    pub(crate) cache_saving_duration: Gauge,
737    /// Counter for transaction execution errors during prewarming
738    pub(crate) transaction_errors: Counter,
739    /// A histogram of BAL slot iteration duration during prefetching
740    pub(crate) bal_slot_iteration_duration: Histogram,
741}