reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    cached_state::{CachedStateProvider, SavedCache},
16    payload_processor::{
17        bal::{total_slots, BALSlotIter},
18        executor::WorkloadExecutor,
19        multiproof::MultiProofMessage,
20        ExecutionCache as PayloadExecutionCache,
21    },
22    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
23    ExecutionEnv, StateProviderBuilder,
24};
25use alloy_consensus::transaction::TxHashRef;
26use alloy_eip7928::BlockAccessList;
27use alloy_eips::Typed2718;
28use alloy_evm::Database;
29use alloy_primitives::{keccak256, map::B256Set, B256};
30use crossbeam_channel::Sender as CrossbeamSender;
31use metrics::{Counter, Gauge, Histogram};
32use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
33use reth_execution_types::ExecutionOutcome;
34use reth_metrics::Metrics;
35use reth_primitives_traits::NodePrimitives;
36use reth_provider::{AccountReader, BlockReader, StateProvider, StateProviderFactory, StateReader};
37use reth_revm::{database::StateProviderDatabase, state::EvmState};
38use reth_trie::MultiProofTargets;
39use std::{
40    ops::Range,
41    sync::{
42        atomic::{AtomicBool, Ordering},
43        mpsc::{self, channel, Receiver, Sender},
44        Arc,
45    },
46    time::Instant,
47};
48use tracing::{debug, debug_span, instrument, trace, warn, Span};
49
50/// Determines the prewarming mode: transaction-based or BAL-based.
51pub(super) enum PrewarmMode<Tx> {
52    /// Prewarm by executing transactions from a stream.
53    Transactions(Receiver<Tx>),
54    /// Prewarm by prefetching slots from a Block Access List.
55    BlockAccessList(Arc<BlockAccessList>),
56}
57
58/// A wrapper for transactions that includes their index in the block.
59#[derive(Clone)]
60struct IndexedTransaction<Tx> {
61    /// The transaction index in the block.
62    index: usize,
63    /// The wrapped transaction.
64    tx: Tx,
65}
66
67/// Maximum standard Ethereum transaction type value.
68///
69/// Standard transaction types are:
70/// - Type 0: Legacy transactions (original Ethereum)
71/// - Type 1: EIP-2930 (access list transactions)
72/// - Type 2: EIP-1559 (dynamic fee transactions)
73/// - Type 3: EIP-4844 (blob transactions)
74/// - Type 4: EIP-7702 (set code authorization transactions)
75///
76/// Any transaction with a type > 4 is considered a non-standard/system transaction,
77/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
78const MAX_STANDARD_TX_TYPE: u8 = 4;
79
80/// A task that is responsible for caching and prewarming the cache by executing transactions
81/// individually in parallel.
82///
83/// Note: This task runs until cancelled externally.
84pub(super) struct PrewarmCacheTask<N, P, Evm>
85where
86    N: NodePrimitives,
87    Evm: ConfigureEvm<Primitives = N>,
88{
89    /// The executor used to spawn execution tasks.
90    executor: WorkloadExecutor,
91    /// Shared execution cache.
92    execution_cache: PayloadExecutionCache,
93    /// Context provided to execution tasks
94    ctx: PrewarmContext<N, P, Evm>,
95    /// How many transactions should be executed in parallel
96    max_concurrency: usize,
97    /// The number of transactions to be processed
98    transaction_count_hint: usize,
99    /// Sender to emit evm state outcome messages, if any.
100    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
101    /// Receiver for events produced by tx execution
102    actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
103    /// Parent span for tracing
104    parent_span: Span,
105}
106
107impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
108where
109    N: NodePrimitives,
110    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
111    Evm: ConfigureEvm<Primitives = N> + 'static,
112{
113    /// Initializes the task with the given transactions pending execution
114    pub(super) fn new(
115        executor: WorkloadExecutor,
116        execution_cache: PayloadExecutionCache,
117        ctx: PrewarmContext<N, P, Evm>,
118        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
119        transaction_count_hint: usize,
120        max_concurrency: usize,
121    ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
122        let (actions_tx, actions_rx) = channel();
123
124        trace!(
125            target: "engine::tree::payload_processor::prewarm",
126            max_concurrency,
127            transaction_count_hint,
128            "Initialized prewarm task"
129        );
130
131        (
132            Self {
133                executor,
134                execution_cache,
135                ctx,
136                max_concurrency,
137                transaction_count_hint,
138                to_multi_proof,
139                actions_rx,
140                parent_span: Span::current(),
141            },
142            actions_tx,
143        )
144    }
145
146    /// Spawns all pending transactions as blocking tasks by first chunking them.
147    ///
148    /// For Optimism chains, special handling is applied to the first transaction if it's a
149    /// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
150    /// subsequent transactions in the block.
151    fn spawn_all<Tx>(
152        &self,
153        pending: mpsc::Receiver<Tx>,
154        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
155    ) where
156        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
157    {
158        let executor = self.executor.clone();
159        let ctx = self.ctx.clone();
160        let max_concurrency = self.max_concurrency;
161        let transaction_count_hint = self.transaction_count_hint;
162        let span = Span::current();
163
164        self.executor.spawn_blocking(move || {
165            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
166
167            let (done_tx, done_rx) = mpsc::channel();
168
169            // When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
170            // max workers to handle potentially many transactions in parallel rather
171            // than bottlenecking on a single worker.
172            let workers_needed = if transaction_count_hint == 0 {
173                max_concurrency
174            } else {
175                transaction_count_hint.min(max_concurrency)
176            };
177
178            // Initialize worker handles container
179            let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
180
181            // Distribute transactions to workers
182            let mut tx_index = 0usize;
183            while let Ok(tx) = pending.recv() {
184                // Stop distributing if termination was requested
185                if ctx.terminate_execution.load(Ordering::Relaxed) {
186                    trace!(
187                        target: "engine::tree::payload_processor::prewarm",
188                        "Termination requested, stopping transaction distribution"
189                    );
190                    break;
191                }
192
193                let indexed_tx = IndexedTransaction { index: tx_index, tx };
194                let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
195
196                // System transactions (type > 4) in the first position set critical metadata
197                // that affects all subsequent transactions (e.g., L1 block info on L2s).
198                // Broadcast the first system transaction to all workers to ensure they have
199                // the critical state. This is particularly important for L2s like Optimism
200                // where the first deposit transaction (type 126) contains essential block metadata.
201                if tx_index == 0 && is_system_tx {
202                    for handle in &handles {
203                        // Ignore send errors: workers listen to terminate_execution and may
204                        // exit early when signaled. Sending to a disconnected worker is
205                        // possible and harmless and should happen at most once due to
206                        //  the terminate_execution check above.
207                        let _ = handle.send(indexed_tx.clone());
208                    }
209                } else {
210                    // Round-robin distribution for all other transactions
211                    let worker_idx = tx_index % workers_needed;
212                    // Ignore send errors: workers listen to terminate_execution and may
213                    // exit early when signaled. Sending to a disconnected worker is
214                    // possible and harmless and should happen at most once due to
215                    //  the terminate_execution check above.
216                    let _ = handles[worker_idx].send(indexed_tx);
217                }
218
219                tx_index += 1;
220            }
221
222            // drop handle and wait for all tasks to finish and drop theirs
223            drop(done_tx);
224            drop(handles);
225            while done_rx.recv().is_ok() {}
226
227            let _ = actions_tx
228                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
229        });
230    }
231
232    /// Returns true if prewarming was terminated and no more transactions should be prewarmed.
233    fn is_execution_terminated(&self) -> bool {
234        self.ctx.terminate_execution.load(Ordering::Relaxed)
235    }
236
237    /// If configured and the tx returned proof targets, emit the targets the transaction produced
238    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
239        if self.is_execution_terminated() {
240            // if execution is already terminated then we dont need to send more proof fetch
241            // messages
242            return
243        }
244
245        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
246            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
247        }
248    }
249
250    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
251    /// It should only be called after ensuring that:
252    /// 1. All prewarming tasks have completed execution
253    /// 2. No other concurrent operations are accessing the cache
254    ///
255    /// Saves the warmed caches back into the shared slot after prewarming completes.
256    ///
257    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
258    /// the new, warmed cache to be inserted.
259    ///
260    /// This method is called from `run()` only after all execution tasks are complete.
261    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
262    fn save_cache(self, execution_outcome: Arc<ExecutionOutcome<N::Receipt>>) {
263        let start = Instant::now();
264
265        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
266            self;
267        let hash = env.hash;
268
269        if let Some(saved_cache) = saved_cache {
270            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
271            // Perform all cache operations atomically under the lock
272            execution_cache.update_with_guard(|cached| {
273                // consumes the `SavedCache` held by the prewarming task, which releases its usage
274                // guard
275                let (caches, cache_metrics) = saved_cache.split();
276                let new_cache = SavedCache::new(hash, caches, cache_metrics);
277
278                // Insert state into cache while holding the lock
279                // Access the BundleState through the shared ExecutionOutcome
280                if new_cache.cache().insert_state(execution_outcome.state()).is_err() {
281                    // Clear the cache on error to prevent having a polluted cache
282                    *cached = None;
283                    debug!(target: "engine::caching", "cleared execution cache on update error");
284                    return;
285                }
286
287                new_cache.update_metrics();
288
289                // Replace the shared cache with the new one; the previous cache (if any) is
290                // dropped.
291                *cached = Some(new_cache);
292            });
293
294            let elapsed = start.elapsed();
295            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
296
297            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
298        }
299    }
300
301    /// Runs BAL-based prewarming by spawning workers to prefetch storage slots.
302    ///
303    /// Divides the total slots across `max_concurrency` workers, each responsible for
304    /// prefetching a range of slots from the BAL.
305    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
306    fn run_bal_prewarm(
307        &self,
308        bal: Arc<BlockAccessList>,
309        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
310    ) {
311        // Only prefetch if we have a cache to populate
312        if self.ctx.saved_cache.is_none() {
313            trace!(
314                target: "engine::tree::payload_processor::prewarm",
315                "Skipping BAL prewarm - no cache available"
316            );
317            let _ =
318                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
319            return;
320        }
321
322        let total_slots = total_slots(&bal);
323
324        trace!(
325            target: "engine::tree::payload_processor::prewarm",
326            total_slots,
327            max_concurrency = self.max_concurrency,
328            "Starting BAL prewarm"
329        );
330
331        if total_slots == 0 {
332            // No slots to prefetch, signal completion immediately
333            let _ =
334                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
335            return;
336        }
337
338        let (done_tx, done_rx) = mpsc::channel();
339
340        // Calculate number of workers needed (at most max_concurrency)
341        let workers_needed = total_slots.min(self.max_concurrency);
342
343        // Calculate slots per worker
344        let slots_per_worker = total_slots / workers_needed;
345        let remainder = total_slots % workers_needed;
346
347        // Spawn workers with their assigned ranges
348        for i in 0..workers_needed {
349            let start = i * slots_per_worker + i.min(remainder);
350            let extra = if i < remainder { 1 } else { 0 };
351            let end = start + slots_per_worker + extra;
352
353            self.ctx.spawn_bal_worker(
354                i,
355                &self.executor,
356                Arc::clone(&bal),
357                start..end,
358                done_tx.clone(),
359            );
360        }
361
362        // Drop our handle to done_tx so we can detect completion
363        drop(done_tx);
364
365        // Wait for all workers to complete
366        let mut completed_workers = 0;
367        while done_rx.recv().is_ok() {
368            completed_workers += 1;
369        }
370
371        trace!(
372            target: "engine::tree::payload_processor::prewarm",
373            completed_workers,
374            "All BAL prewarm workers completed"
375        );
376
377        // Signal that execution has finished
378        let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
379    }
380
381    /// Executes the task.
382    ///
383    /// This will execute the transactions until all transactions have been processed or the task
384    /// was cancelled.
385    #[instrument(
386        parent = &self.parent_span,
387        level = "debug",
388        target = "engine::tree::payload_processor::prewarm",
389        name = "prewarm and caching",
390        skip_all
391    )]
392    pub(super) fn run<Tx>(
393        self,
394        mode: PrewarmMode<Tx>,
395        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
396    ) where
397        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
398    {
399        // Spawn execution tasks based on mode
400        match mode {
401            PrewarmMode::Transactions(pending) => {
402                self.spawn_all(pending, actions_tx);
403            }
404            PrewarmMode::BlockAccessList(bal) => {
405                self.run_bal_prewarm(bal, actions_tx);
406            }
407        }
408
409        let mut final_execution_outcome = None;
410        let mut finished_execution = false;
411        while let Ok(event) = self.actions_rx.recv() {
412            match event {
413                PrewarmTaskEvent::TerminateTransactionExecution => {
414                    // stop tx processing
415                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
416                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
417                }
418                PrewarmTaskEvent::Outcome { proof_targets } => {
419                    // completed executing a set of transactions
420                    self.send_multi_proof_targets(proof_targets);
421                }
422                PrewarmTaskEvent::Terminate { execution_outcome } => {
423                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
424                    final_execution_outcome = Some(execution_outcome);
425
426                    if finished_execution {
427                        // all tasks are done, we can exit, which will save caches and exit
428                        break
429                    }
430                }
431                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
432                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
433                    self.ctx.metrics.transactions.set(executed_transactions as f64);
434                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
435
436                    finished_execution = true;
437
438                    if final_execution_outcome.is_some() {
439                        // all tasks are done, we can exit, which will save caches and exit
440                        break
441                    }
442                }
443            }
444        }
445
446        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
447
448        // save caches and finish using the shared ExecutionOutcome
449        if let Some(Some(execution_outcome)) = final_execution_outcome {
450            self.save_cache(execution_outcome);
451        }
452    }
453}
454
455/// Context required by tx execution tasks.
456#[derive(Debug, Clone)]
457pub(super) struct PrewarmContext<N, P, Evm>
458where
459    N: NodePrimitives,
460    Evm: ConfigureEvm<Primitives = N>,
461{
462    pub(super) env: ExecutionEnv<Evm>,
463    pub(super) evm_config: Evm,
464    pub(super) saved_cache: Option<SavedCache>,
465    /// Provider to obtain the state
466    pub(super) provider: StateProviderBuilder<N, P>,
467    pub(super) metrics: PrewarmMetrics,
468    /// An atomic bool that tells prewarm tasks to not start any more execution.
469    pub(super) terminate_execution: Arc<AtomicBool>,
470    pub(super) precompile_cache_disabled: bool,
471    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
472}
473
474impl<N, P, Evm> PrewarmContext<N, P, Evm>
475where
476    N: NodePrimitives,
477    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
478    Evm: ConfigureEvm<Primitives = N> + 'static,
479{
480    /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
481    /// execution.
482    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
483    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
484        let Self {
485            env,
486            evm_config,
487            saved_cache,
488            provider,
489            metrics,
490            terminate_execution,
491            precompile_cache_disabled,
492            precompile_cache_map,
493        } = self;
494
495        let mut state_provider = match provider.build() {
496            Ok(provider) => provider,
497            Err(err) => {
498                trace!(
499                    target: "engine::tree::payload_processor::prewarm",
500                    %err,
501                    "Failed to build state provider in prewarm thread"
502                );
503                return None
504            }
505        };
506
507        // Use the caches to create a new provider with caching
508        if let Some(saved_cache) = saved_cache {
509            let caches = saved_cache.cache().clone();
510            let cache_metrics = saved_cache.metrics().clone();
511            state_provider = Box::new(
512                CachedStateProvider::new(state_provider, caches, cache_metrics)
513                    // ensure we pre-warm the cache
514                    .prewarm(),
515            );
516        }
517
518        let state_provider = StateProviderDatabase::new(state_provider);
519
520        let mut evm_env = env.evm_env;
521
522        // we must disable the nonce check so that we can execute the transaction even if the nonce
523        // doesn't match what's on chain.
524        evm_env.cfg_env.disable_nonce_check = true;
525
526        // create a new executor and disable nonce checks in the env
527        let spec_id = *evm_env.spec_id();
528        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
529
530        if !precompile_cache_disabled {
531            // Only cache pure precompiles to avoid issues with stateful precompiles
532            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
533                CachedPrecompile::wrap(
534                    precompile,
535                    precompile_cache_map.cache_for_address(*address),
536                    spec_id,
537                    None, // No metrics for prewarm
538                )
539            });
540        }
541
542        Some((evm, metrics, terminate_execution))
543    }
544
545    /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
546    /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
547    ///
548    /// This function processes transactions sequentially from the receiver and emits outcome events
549    /// via the provided sender. Execution errors are logged and tracked but do not stop the batch
550    /// processing unless the task is explicitly cancelled.
551    ///
552    /// Note: There are no ordering guarantees; this does not reflect the state produced by
553    /// sequential execution.
554    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
555    fn transact_batch<Tx>(
556        self,
557        txs: mpsc::Receiver<IndexedTransaction<Tx>>,
558        sender: Sender<PrewarmTaskEvent<N::Receipt>>,
559        done_tx: Sender<()>,
560    ) where
561        Tx: ExecutableTxFor<Evm>,
562    {
563        let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
564
565        while let Ok(IndexedTransaction { index, tx }) = {
566            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
567                .entered();
568            txs.recv()
569        } {
570            let enter =
571                debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
572                    .entered();
573
574            // create the tx env
575            let start = Instant::now();
576
577            // If the task was cancelled, stop execution, send an empty result to notify the task,
578            // and exit.
579            if terminate_execution.load(Ordering::Relaxed) {
580                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
581                break
582            }
583
584            let res = match evm.transact(&tx) {
585                Ok(res) => res,
586                Err(err) => {
587                    trace!(
588                        target: "engine::tree::payload_processor::prewarm",
589                        %err,
590                        tx_hash=%tx.tx().tx_hash(),
591                        sender=%tx.signer(),
592                        "Error when executing prewarm transaction",
593                    );
594                    // Track transaction execution errors
595                    metrics.transaction_errors.increment(1);
596                    // skip error because we can ignore these errors and continue with the next tx
597                    continue
598                }
599            };
600            metrics.execution_duration.record(start.elapsed());
601
602            // record some basic information about the transactions
603            enter.record("gas_used", res.result.gas_used());
604            enter.record("is_success", res.result.is_success());
605
606            drop(enter);
607
608            // If the task was cancelled, stop execution, send an empty result to notify the task,
609            // and exit.
610            if terminate_execution.load(Ordering::Relaxed) {
611                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
612                break
613            }
614
615            // Only send outcome for transactions after the first txn
616            // as the main execution will be just as fast
617            if index > 0 {
618                let _enter =
619                    debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
620                        .entered();
621                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
622                metrics.prefetch_storage_targets.record(storage_targets as f64);
623                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
624                drop(_enter);
625            }
626
627            metrics.total_runtime.record(start.elapsed());
628        }
629
630        // send a message to the main task to flag that we're done
631        let _ = done_tx.send(());
632    }
633
634    /// Spawns a worker task for transaction execution and returns its sender channel.
635    fn spawn_workers<Tx>(
636        self,
637        workers_needed: usize,
638        task_executor: &WorkloadExecutor,
639        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
640        done_tx: Sender<()>,
641    ) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
642    where
643        Tx: ExecutableTxFor<Evm> + Send + 'static,
644    {
645        let mut handles = Vec::with_capacity(workers_needed);
646        let mut receivers = Vec::with_capacity(workers_needed);
647
648        for _ in 0..workers_needed {
649            let (tx, rx) = mpsc::channel();
650            handles.push(tx);
651            receivers.push(rx);
652        }
653
654        // Spawn a separate task spawning workers in parallel.
655        let executor = task_executor.clone();
656        let span = Span::current();
657        task_executor.spawn_blocking(move || {
658            let _enter = span.entered();
659            for (idx, rx) in receivers.into_iter().enumerate() {
660                let ctx = self.clone();
661                let actions_tx = actions_tx.clone();
662                let done_tx = done_tx.clone();
663                let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
664                executor.spawn_blocking(move || {
665                    let _enter = span.entered();
666                    ctx.transact_batch(rx, actions_tx, done_tx);
667                });
668            }
669        });
670
671        handles
672    }
673
674    /// Spawns a worker task for BAL slot prefetching.
675    ///
676    /// The worker iterates over the specified range of slots in the BAL and ensures
677    /// each slot is loaded into the cache by accessing it through the state provider.
678    fn spawn_bal_worker(
679        &self,
680        idx: usize,
681        executor: &WorkloadExecutor,
682        bal: Arc<BlockAccessList>,
683        range: Range<usize>,
684        done_tx: Sender<()>,
685    ) {
686        let ctx = self.clone();
687        let span = debug_span!(
688            target: "engine::tree::payload_processor::prewarm",
689            "bal prewarm worker",
690            idx,
691            range_start = range.start,
692            range_end = range.end
693        );
694
695        executor.spawn_blocking(move || {
696            let _enter = span.entered();
697            ctx.prefetch_bal_slots(bal, range, done_tx);
698        });
699    }
700
701    /// Prefetches storage slots from a BAL range into the cache.
702    ///
703    /// This iterates through the specified range of slots and accesses them via the state
704    /// provider to populate the cache.
705    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
706    fn prefetch_bal_slots(
707        self,
708        bal: Arc<BlockAccessList>,
709        range: Range<usize>,
710        done_tx: Sender<()>,
711    ) {
712        let Self { saved_cache, provider, metrics, .. } = self;
713
714        // Build state provider
715        let state_provider = match provider.build() {
716            Ok(provider) => provider,
717            Err(err) => {
718                trace!(
719                    target: "engine::tree::payload_processor::prewarm",
720                    %err,
721                    "Failed to build state provider in BAL prewarm thread"
722                );
723                let _ = done_tx.send(());
724                return;
725            }
726        };
727
728        // Wrap with cache (guaranteed to be Some since run_bal_prewarm checks)
729        let saved_cache = saved_cache.expect("BAL prewarm should only run with cache");
730        let caches = saved_cache.cache().clone();
731        let cache_metrics = saved_cache.metrics().clone();
732        let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics);
733
734        let start = Instant::now();
735
736        // Track last seen address to avoid fetching the same account multiple times.
737        let mut last_address = None;
738
739        // Iterate through the assigned range of slots
740        for (address, slot) in BALSlotIter::new(&bal, range.clone()) {
741            // Fetch the account if this is a different address than the last one
742            if last_address != Some(address) {
743                let _ = state_provider.basic_account(&address);
744                last_address = Some(address);
745            }
746
747            // Access the slot to populate the cache
748            let _ = state_provider.storage(address, slot);
749        }
750
751        let elapsed = start.elapsed();
752
753        trace!(
754            target: "engine::tree::payload_processor::prewarm",
755            ?range,
756            elapsed_ms = elapsed.as_millis(),
757            "BAL prewarm worker completed"
758        );
759
760        // Signal completion
761        let _ = done_tx.send(());
762        metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64());
763    }
764}
765
766/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
767/// given state.
768fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
769    let mut targets = MultiProofTargets::with_capacity(state.len());
770    let mut storage_targets = 0;
771    for (addr, account) in state {
772        // if the account was not touched, or if the account was selfdestructed, do not
773        // fetch proofs for it
774        //
775        // Since selfdestruct can only happen in the same transaction, we can skip
776        // prefetching proofs for selfdestructed accounts
777        //
778        // See: https://eips.ethereum.org/EIPS/eip-6780
779        if !account.is_touched() || account.is_selfdestructed() {
780            continue
781        }
782
783        let mut storage_set =
784            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
785        for (key, slot) in account.storage {
786            // do nothing if unchanged
787            if !slot.is_changed() {
788                continue
789            }
790
791            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
792        }
793
794        storage_targets += storage_set.len();
795        targets.insert(keccak256(addr), storage_set);
796    }
797
798    (targets, storage_targets)
799}
800
801/// The events the pre-warm task can handle.
802///
803/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
804/// execution path without cloning the expensive `BundleState`.
805pub(super) enum PrewarmTaskEvent<R> {
806    /// Forcefully terminate all remaining transaction execution.
807    TerminateTransactionExecution,
808    /// Forcefully terminate the task on demand and update the shared cache with the given output
809    /// before exiting.
810    Terminate {
811        /// The final execution outcome. Using `Arc` allows sharing with the main execution
812        /// path without cloning the expensive `BundleState`.
813        execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
814    },
815    /// The outcome of a pre-warm task
816    Outcome {
817        /// The prepared proof targets based on the evm state outcome
818        proof_targets: Option<MultiProofTargets>,
819    },
820    /// Finished executing all transactions
821    FinishedTxExecution {
822        /// Number of transactions executed
823        executed_transactions: usize,
824    },
825}
826
827/// Metrics for transactions prewarming.
828#[derive(Metrics, Clone)]
829#[metrics(scope = "sync.prewarm")]
830pub(crate) struct PrewarmMetrics {
831    /// The number of transactions to prewarm
832    pub(crate) transactions: Gauge,
833    /// A histogram of the number of transactions to prewarm
834    pub(crate) transactions_histogram: Histogram,
835    /// A histogram of duration per transaction prewarming
836    pub(crate) total_runtime: Histogram,
837    /// A histogram of EVM execution duration per transaction prewarming
838    pub(crate) execution_duration: Histogram,
839    /// A histogram for prefetch targets per transaction prewarming
840    pub(crate) prefetch_storage_targets: Histogram,
841    /// A histogram of duration for cache saving
842    pub(crate) cache_saving_duration: Gauge,
843    /// Counter for transaction execution errors during prewarming
844    pub(crate) transaction_errors: Counter,
845    /// A histogram of BAL slot iteration duration during prefetching
846    pub(crate) bal_slot_iteration_duration: Histogram,
847}