Skip to main content

reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    payload_processor::{bal, multiproof::MultiProofMessage},
16    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
17    CachedStateProvider, ExecutionEnv, PayloadExecutionCache, SavedCache, StateProviderBuilder,
18};
19use alloy_consensus::transaction::TxHashRef;
20use alloy_eip7928::BlockAccessList;
21use alloy_eips::eip4895::Withdrawal;
22use alloy_primitives::{keccak256, StorageKey, B256};
23use crossbeam_channel::Sender as CrossbeamSender;
24use metrics::{Counter, Gauge, Histogram};
25use rayon::prelude::*;
26use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
27use reth_metrics::Metrics;
28use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
29use reth_provider::{
30    AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
31    StateReader,
32};
33use reth_revm::{database::StateProviderDatabase, state::EvmState};
34use reth_tasks::{pool::WorkerPool, Runtime};
35use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
36use std::sync::{
37    atomic::{AtomicBool, AtomicUsize, Ordering},
38    mpsc::{self, channel, Receiver, Sender},
39    Arc,
40};
41use tracing::{debug, debug_span, instrument, trace, trace_span, warn, Span};
42
43/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
44#[derive(Debug)]
45pub enum PrewarmMode<Tx> {
46    /// Prewarm by executing transactions from a stream, each paired with its block index.
47    Transactions(Receiver<(usize, Tx)>),
48    /// Prewarm by prefetching slots from a Block Access List.
49    BlockAccessList(Arc<BlockAccessList>),
50    /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
51    /// benefit). No workers are spawned.
52    Skipped,
53}
54
55/// A task that is responsible for caching and prewarming the cache by executing transactions
56/// individually in parallel.
57///
58/// Note: This task runs until cancelled externally.
59#[derive(Debug)]
60pub struct PrewarmCacheTask<N, P, Evm>
61where
62    N: NodePrimitives,
63    Evm: ConfigureEvm<Primitives = N>,
64{
65    /// The executor used to spawn execution tasks.
66    executor: Runtime,
67    /// Shared execution cache.
68    execution_cache: PayloadExecutionCache,
69    /// Context provided to execution tasks
70    ctx: PrewarmContext<N, P, Evm>,
71    /// Sender to emit evm state outcome messages, if any.
72    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
73    /// Receiver for events produced by tx execution
74    actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
75    /// Parent span for tracing
76    parent_span: Span,
77}
78
79impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
80where
81    N: NodePrimitives,
82    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
83    Evm: ConfigureEvm<Primitives = N> + 'static,
84{
85    /// Initializes the task with the given transactions pending execution
86    pub fn new(
87        executor: Runtime,
88        execution_cache: PayloadExecutionCache,
89        ctx: PrewarmContext<N, P, Evm>,
90        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
91    ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
92        let (actions_tx, actions_rx) = channel();
93
94        trace!(
95            target: "engine::tree::payload_processor::prewarm",
96            prewarming_threads = executor.prewarming_pool().current_num_threads(),
97            transaction_count = ctx.env.transaction_count,
98            "Initialized prewarm task"
99        );
100
101        (
102            Self {
103                executor,
104                execution_cache,
105                ctx,
106                to_multi_proof,
107                actions_rx,
108                parent_span: Span::current(),
109            },
110            actions_tx,
111        )
112    }
113
114    /// Streams pending transactions and executes them in parallel on the prewarming pool.
115    ///
116    /// Kicks off EVM init on every pool thread, then uses `in_place_scope` to dispatch
117    /// transactions as they arrive and wait for all spawned tasks to complete before
118    /// clearing per-thread state. Workers that start via work-stealing lazily initialise
119    /// their EVM state on first access via [`get_or_init`](reth_tasks::pool::Worker::get_or_init).
120    fn spawn_txs_prewarm<Tx>(
121        &self,
122        pending: mpsc::Receiver<(usize, Tx)>,
123        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
124        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
125    ) where
126        Tx: ExecutableTxFor<Evm> + Send + 'static,
127    {
128        let executor = self.executor.clone();
129        let ctx = self.ctx.clone();
130        let span = Span::current();
131
132        self.executor.spawn_blocking_named("prewarm-txs", move || {
133            let _enter = debug_span!(
134                target: "engine::tree::payload_processor::prewarm",
135                parent: span,
136                "prewarm_txs"
137            )
138            .entered();
139
140            let ctx = &ctx;
141            let pool = executor.prewarming_pool();
142
143            let mut tx_count = 0usize;
144            let to_multi_proof = to_multi_proof.as_ref();
145            pool.in_place_scope(|s| {
146                s.spawn(|_| {
147                    pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
148                });
149
150                while let Ok((index, tx)) = pending.recv() {
151                    if ctx.should_stop() {
152                        trace!(
153                            target: "engine::tree::payload_processor::prewarm",
154                            "Termination requested, stopping transaction distribution"
155                        );
156                        break;
157                    }
158
159                    // skip transactions already executed by the main loop
160                    if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
161                        continue;
162                    }
163
164                    tx_count += 1;
165                    let parent_span = Span::current();
166                    s.spawn(move |_| {
167                        let _enter = trace_span!(
168                            target: "engine::tree::payload_processor::prewarm",
169                            parent: parent_span,
170                            "prewarm_tx",
171                            i = index,
172                        )
173                        .entered();
174                        Self::transact_worker(ctx, index, tx, to_multi_proof);
175                    });
176                }
177
178                // Send withdrawal prefetch targets after all transactions dispatched
179                if let Some(to_multi_proof) = to_multi_proof &&
180                    let Some(withdrawals) = &ctx.env.withdrawals &&
181                    !withdrawals.is_empty()
182                {
183                    let targets = multiproof_targets_from_withdrawals(withdrawals);
184                    let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
185                }
186            });
187
188            // All tasks are done — clear per-thread EVM state for the next block.
189            pool.clear();
190
191            let _ = actions_tx
192                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
193        });
194    }
195
196    /// Executes a single prewarm transaction on the current pool thread's EVM.
197    ///
198    /// Lazily initialises per-thread [`PrewarmEvmState`] via
199    /// [`get_or_init`](reth_tasks::pool::Worker::get_or_init) on first access.
200    fn transact_worker<Tx>(
201        ctx: &PrewarmContext<N, P, Evm>,
202        index: usize,
203        tx: Tx,
204        to_multi_proof: Option<&CrossbeamSender<MultiProofMessage>>,
205    ) where
206        Tx: ExecutableTxFor<Evm>,
207    {
208        WorkerPool::with_worker_mut(|worker| {
209            let Some(evm) =
210                worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
211            else {
212                return;
213            };
214
215            if ctx.should_stop() {
216                return;
217            }
218
219            // skip if main execution has already processed this transaction
220            if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
221                return;
222            }
223
224            let start = Instant::now();
225
226            let (tx_env, tx) = tx.into_parts();
227            let res = match evm.transact(tx_env) {
228                Ok(res) => res,
229                Err(err) => {
230                    trace!(
231                        target: "engine::tree::payload_processor::prewarm",
232                        %err,
233                        tx_hash=%tx.tx().tx_hash(),
234                        sender=%tx.signer(),
235                        "Error when executing prewarm transaction",
236                    );
237                    ctx.metrics.transaction_errors.increment(1);
238                    return;
239                }
240            };
241            ctx.metrics.execution_duration.record(start.elapsed());
242
243            if ctx.should_stop() {
244                return;
245            }
246
247            if index > 0 {
248                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
249                ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
250                if let Some(to_multi_proof) = to_multi_proof {
251                    let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
252                }
253            }
254
255            ctx.metrics.total_runtime.record(start.elapsed());
256        });
257    }
258
259    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
260    /// It should only be called after ensuring that:
261    /// 1. All prewarming tasks have completed execution
262    /// 2. No other concurrent operations are accessing the cache
263    ///
264    /// Saves the warmed caches back into the shared slot after prewarming completes.
265    ///
266    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
267    /// the new, warmed cache to be inserted.
268    ///
269    /// This method is called from `run()` only after all execution tasks are complete.
270    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
271    fn save_cache(
272        self,
273        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
274        valid_block_rx: mpsc::Receiver<()>,
275    ) {
276        let start = Instant::now();
277
278        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
279            self;
280        let hash = env.hash;
281
282        if let Some(saved_cache) = saved_cache {
283            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
284            // Perform all cache operations atomically under the lock
285            execution_cache.update_with_guard(|cached| {
286                // consumes the `SavedCache` held by the prewarming task, which releases its usage
287                // guard
288                let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
289                let new_cache = SavedCache::new(hash, caches, cache_metrics)
290                    .with_disable_cache_metrics(disable_cache_metrics);
291
292                // Insert state into cache while holding the lock
293                // Access the BundleState through the shared ExecutionOutcome
294                if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
295                    // Clear the cache on error to prevent having a polluted cache
296                    *cached = None;
297                    debug!(target: "engine::caching", "cleared execution cache on update error");
298                    return;
299                }
300
301                new_cache.update_metrics();
302
303                if valid_block_rx.recv().is_ok() {
304                    // Replace the shared cache with the new one; the previous cache (if any) is
305                    // dropped.
306                    *cached = Some(new_cache);
307                } else {
308                    // Block was invalid; caches were already mutated by insert_state above,
309                    // so we must clear to prevent using polluted state
310                    *cached = None;
311                    debug!(target: "engine::caching", "cleared execution cache on invalid block");
312                }
313            });
314
315            let elapsed = start.elapsed();
316            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
317
318            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
319        }
320    }
321
322    /// Runs BAL-based prewarming by using the prewarming pool's parallel iterator to prefetch
323    /// accounts and storage slots.
324    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
325    fn run_bal_prewarm(
326        &self,
327        bal: Arc<BlockAccessList>,
328        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
329    ) {
330        // Only prefetch if we have a cache to populate
331        if self.ctx.saved_cache.is_none() {
332            trace!(
333                target: "engine::tree::payload_processor::prewarm",
334                "Skipping BAL prewarm - no cache available"
335            );
336            self.send_bal_hashed_state(&bal);
337            let _ =
338                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
339            return;
340        }
341
342        if bal.is_empty() {
343            self.send_bal_hashed_state(&bal);
344            let _ =
345                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
346            return;
347        }
348
349        trace!(
350            target: "engine::tree::payload_processor::prewarm",
351            accounts = bal.len(),
352            "Starting BAL prewarm"
353        );
354
355        let ctx = self.ctx.clone();
356        self.executor.prewarming_pool().install_fn(|| {
357            bal.par_iter().for_each_init(
358                || (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
359                |(ctx, provider), account| {
360                    if ctx.should_stop() {
361                        return;
362                    }
363                    ctx.prefetch_bal_account(provider, account);
364                },
365            );
366        });
367
368        trace!(
369            target: "engine::tree::payload_processor::prewarm",
370            "All BAL prewarm accounts completed"
371        );
372
373        // Convert BAL to HashedPostState and send to multiproof task
374        self.send_bal_hashed_state(&bal);
375
376        // Signal that execution has finished
377        let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
378    }
379
380    /// Converts the BAL to [`HashedPostState`](reth_trie::HashedPostState) and sends it to the
381    /// multiproof task.
382    fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
383        let Some(to_multi_proof) = &self.to_multi_proof else { return };
384
385        let provider = match self.ctx.provider.build() {
386            Ok(provider) => provider,
387            Err(err) => {
388                warn!(
389                    target: "engine::tree::payload_processor::prewarm",
390                    ?err,
391                    "Failed to build provider for BAL hashed state conversion"
392                );
393                return;
394            }
395        };
396
397        match bal::bal_to_hashed_post_state(bal, &provider) {
398            Ok(hashed_state) => {
399                debug!(
400                    target: "engine::tree::payload_processor::prewarm",
401                    accounts = hashed_state.accounts.len(),
402                    storages = hashed_state.storages.len(),
403                    "Converted BAL to hashed post state"
404                );
405                let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
406                let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
407            }
408            Err(err) => {
409                warn!(
410                    target: "engine::tree::payload_processor::prewarm",
411                    ?err,
412                    "Failed to convert BAL to hashed state"
413                );
414            }
415        }
416    }
417
418    /// Executes the task.
419    ///
420    /// This will execute the transactions until all transactions have been processed or the task
421    /// was cancelled.
422    #[instrument(
423        parent = &self.parent_span,
424        level = "debug",
425        target = "engine::tree::payload_processor::prewarm",
426        name = "prewarm and caching",
427        skip_all
428    )]
429    pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
430    where
431        Tx: ExecutableTxFor<Evm> + Send + 'static,
432    {
433        // Spawn execution tasks based on mode
434        match mode {
435            PrewarmMode::Transactions(pending) => {
436                self.spawn_txs_prewarm(pending, actions_tx, self.to_multi_proof.clone());
437            }
438            PrewarmMode::BlockAccessList(bal) => {
439                self.run_bal_prewarm(bal, actions_tx);
440            }
441            PrewarmMode::Skipped => {
442                let _ = actions_tx
443                    .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
444            }
445        }
446
447        let mut final_execution_outcome = None;
448        let mut finished_execution = false;
449        while let Ok(event) = self.actions_rx.recv() {
450            match event {
451                PrewarmTaskEvent::TerminateTransactionExecution => {
452                    // stop tx processing
453                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
454                    self.ctx.stop();
455                }
456                PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
457                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
458                    final_execution_outcome =
459                        Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
460
461                    if finished_execution {
462                        // all tasks are done, we can exit, which will save caches and exit
463                        break
464                    }
465                }
466                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
467                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
468                    self.ctx.metrics.transactions.set(executed_transactions as f64);
469                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
470
471                    finished_execution = true;
472
473                    if final_execution_outcome.is_some() {
474                        // all tasks are done, we can exit, which will save caches and exit
475                        break
476                    }
477                }
478            }
479        }
480
481        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
482
483        // save caches and finish using the shared ExecutionOutcome
484        if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
485            self.save_cache(execution_outcome, valid_block_rx);
486        }
487    }
488}
489
490/// Context required by tx execution tasks.
491#[derive(Debug, Clone)]
492pub struct PrewarmContext<N, P, Evm>
493where
494    N: NodePrimitives,
495    Evm: ConfigureEvm<Primitives = N>,
496{
497    /// The execution environment.
498    pub env: ExecutionEnv<Evm>,
499    /// The EVM configuration.
500    pub evm_config: Evm,
501    /// The saved cache.
502    pub saved_cache: Option<SavedCache>,
503    /// Provider to obtain the state
504    pub provider: StateProviderBuilder<N, P>,
505    /// The metrics for the prewarm task.
506    pub metrics: PrewarmMetrics,
507    /// An atomic bool that tells prewarm tasks to not start any more execution.
508    pub terminate_execution: Arc<AtomicBool>,
509    /// Shared counter tracking the next transaction index to be executed by the main execution
510    /// loop. Prewarm workers skip transactions with `index < counter` since those have already
511    /// been executed.
512    pub executed_tx_index: Arc<AtomicUsize>,
513    /// Whether the precompile cache is disabled.
514    pub precompile_cache_disabled: bool,
515    /// The precompile cache map.
516    pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
517}
518
519/// Per-thread EVM state initialised by [`PrewarmContext::evm_for_ctx`] and stored in
520/// [`WorkerPool`] workers via [`Worker::get_or_init`](reth_tasks::pool::Worker::get_or_init).
521type PrewarmEvmState<Evm> =
522    Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
523
524impl<N, P, Evm> PrewarmContext<N, P, Evm>
525where
526    N: NodePrimitives,
527    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
528    Evm: ConfigureEvm<Primitives = N> + 'static,
529{
530    /// Creates a per-thread EVM for prewarming.
531    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
532    fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
533        let mut state_provider = match self.provider.build() {
534            Ok(provider) => provider,
535            Err(err) => {
536                trace!(
537                    target: "engine::tree::payload_processor::prewarm",
538                    %err,
539                    "Failed to build state provider in prewarm thread"
540                );
541                return None
542            }
543        };
544
545        // Use the caches to create a new provider with caching
546        if let Some(saved_cache) = &self.saved_cache {
547            let caches = saved_cache.cache().clone();
548            let cache_metrics = saved_cache.metrics().clone();
549            state_provider =
550                Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
551        }
552
553        let state_provider = StateProviderDatabase::new(state_provider);
554
555        let mut evm_env = self.env.evm_env.clone();
556
557        // we must disable the nonce check so that we can execute the transaction even if the nonce
558        // doesn't match what's on chain.
559        evm_env.cfg_env.disable_nonce_check = true;
560
561        // disable the balance check so that transactions from senders who were funded by earlier
562        // transactions in the block can still be prewarmed
563        evm_env.cfg_env.disable_balance_check = true;
564
565        // create a new executor and disable nonce checks in the env
566        let spec_id = *evm_env.spec_id();
567        let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
568
569        if !self.precompile_cache_disabled {
570            // Only cache pure precompiles to avoid issues with stateful precompiles
571            evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
572                CachedPrecompile::wrap(
573                    precompile,
574                    self.precompile_cache_map.cache_for_address(*address),
575                    spec_id,
576                    None, // No metrics for prewarm
577                )
578            });
579        }
580
581        Some(evm)
582    }
583
584    /// Returns `true` if prewarming should stop.
585    #[inline]
586    pub fn should_stop(&self) -> bool {
587        self.terminate_execution.load(Ordering::Relaxed)
588    }
589
590    /// Signals all prewarm tasks to stop execution.
591    #[inline]
592    pub fn stop(&self) {
593        self.terminate_execution.store(true, Ordering::Relaxed);
594    }
595
596    /// Prefetches a single account and all its storage slots from the BAL into the cache.
597    ///
598    /// The `provider` is lazily initialized on first call and reused across accounts on the same
599    /// thread.
600    fn prefetch_bal_account(
601        &self,
602        provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox>>,
603        account: &alloy_eip7928::AccountChanges,
604    ) {
605        let state_provider = match provider {
606            Some(p) => p,
607            slot @ None => {
608                let built = match self.provider.build() {
609                    Ok(p) => p,
610                    Err(err) => {
611                        trace!(
612                            target: "engine::tree::payload_processor::prewarm",
613                            %err,
614                            "Failed to build state provider in BAL prewarm thread"
615                        );
616                        return;
617                    }
618                };
619                let saved_cache =
620                    self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
621                let caches = saved_cache.cache().clone();
622                let cache_metrics = saved_cache.metrics().clone();
623                slot.insert(CachedStateProvider::new(built, caches, cache_metrics))
624            }
625        };
626
627        let start = Instant::now();
628
629        let _ = state_provider.basic_account(&account.address);
630
631        for slot in &account.storage_changes {
632            let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
633        }
634        for &slot in &account.storage_reads {
635            let _ = state_provider.storage(account.address, StorageKey::from(slot));
636        }
637
638        self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
639    }
640}
641
642/// Returns a set of [`MultiProofTargetsV2`] and the total amount of storage targets, based on the
643/// given state.
644fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
645    let mut targets = MultiProofTargetsV2::default();
646    let mut storage_target_count = 0;
647    for (addr, account) in state {
648        // if the account was not touched, or if the account was selfdestructed, do not
649        // fetch proofs for it
650        //
651        // Since selfdestruct can only happen in the same transaction, we can skip
652        // prefetching proofs for selfdestructed accounts
653        //
654        // See: https://eips.ethereum.org/EIPS/eip-6780
655        if !account.is_touched() || account.is_selfdestructed() {
656            continue
657        }
658
659        let hashed_address = keccak256(addr);
660        targets.account_targets.push(hashed_address.into());
661
662        let mut storage_slots = Vec::with_capacity(account.storage.len());
663        for (key, slot) in account.storage {
664            // do nothing if unchanged
665            if !slot.is_changed() {
666                continue
667            }
668
669            let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
670            storage_slots.push(ProofV2Target::from(hashed_slot));
671        }
672
673        storage_target_count += storage_slots.len();
674        if !storage_slots.is_empty() {
675            targets.storage_targets.insert(hashed_address, storage_slots);
676        }
677    }
678
679    (targets, storage_target_count)
680}
681
682/// Returns [`MultiProofTargetsV2`] for withdrawal addresses.
683///
684/// Withdrawals only modify account balances (no storage), so the targets contain
685/// only account-level entries with empty storage sets.
686fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
687    MultiProofTargetsV2 {
688        account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
689        ..Default::default()
690    }
691}
692
693/// The events the pre-warm task can handle.
694///
695/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
696/// execution path without cloning the expensive `BundleState`.
697#[derive(Debug)]
698pub enum PrewarmTaskEvent<R> {
699    /// Forcefully terminate all remaining transaction execution.
700    TerminateTransactionExecution,
701    /// Forcefully terminate the task on demand and update the shared cache with the given output
702    /// before exiting.
703    Terminate {
704        /// The final execution outcome. Using `Arc` allows sharing with the main execution
705        /// path without cloning the expensive `BundleState`.
706        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
707        /// Receiver for the block validation result.
708        ///
709        /// Cache saving is racing the state root validation. We optimistically construct the
710        /// updated cache but only save it once we know the block is valid.
711        valid_block_rx: mpsc::Receiver<()>,
712    },
713    /// Finished executing all transactions
714    FinishedTxExecution {
715        /// Number of transactions executed
716        executed_transactions: usize,
717    },
718}
719
720/// Metrics for transactions prewarming.
721#[derive(Metrics, Clone)]
722#[metrics(scope = "sync.prewarm")]
723pub struct PrewarmMetrics {
724    /// The number of transactions to prewarm
725    pub(crate) transactions: Gauge,
726    /// A histogram of the number of transactions to prewarm
727    pub(crate) transactions_histogram: Histogram,
728    /// A histogram of duration per transaction prewarming
729    pub(crate) total_runtime: Histogram,
730    /// A histogram of EVM execution duration per transaction prewarming
731    pub(crate) execution_duration: Histogram,
732    /// A histogram for prefetch targets per transaction prewarming
733    pub(crate) prefetch_storage_targets: Histogram,
734    /// A histogram of duration for cache saving
735    pub(crate) cache_saving_duration: Gauge,
736    /// Counter for transaction execution errors during prewarming
737    pub(crate) transaction_errors: Counter,
738    /// A histogram of BAL slot iteration duration during prefetching
739    pub(crate) bal_slot_iteration_duration: Histogram,
740}