Skip to main content

reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    cached_state::{CachedStateProvider, SavedCache},
16    payload_processor::{
17        bal::{self, total_slots, BALSlotIter},
18        multiproof::{MultiProofMessage, VersionedMultiProofTargets},
19        PayloadExecutionCache,
20    },
21    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
22    ExecutionEnv, StateProviderBuilder,
23};
24use alloy_consensus::transaction::TxHashRef;
25use alloy_eip7928::BlockAccessList;
26use alloy_eips::eip4895::Withdrawal;
27use alloy_evm::Database;
28use alloy_primitives::{keccak256, map::B256Set, B256};
29use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
30use metrics::{Counter, Gauge, Histogram};
31use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
32use reth_metrics::Metrics;
33use reth_primitives_traits::NodePrimitives;
34use reth_provider::{
35    AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
36    StateReader,
37};
38use reth_revm::{database::StateProviderDatabase, state::EvmState};
39use reth_tasks::Runtime;
40use reth_trie::MultiProofTargets;
41use std::{
42    ops::Range,
43    sync::{
44        atomic::{AtomicBool, Ordering},
45        mpsc::{self, channel, Receiver, Sender},
46        Arc,
47    },
48    time::Instant,
49};
50use tracing::{debug, debug_span, instrument, trace, warn, Span};
51
52/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
53#[derive(Debug)]
54pub enum PrewarmMode<Tx> {
55    /// Prewarm by executing transactions from a stream.
56    Transactions(Receiver<Tx>),
57    /// Prewarm by prefetching slots from a Block Access List.
58    BlockAccessList(Arc<BlockAccessList>),
59    /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
60    /// benefit). No workers are spawned.
61    Skipped,
62}
63
64/// A wrapper for transactions that includes their index in the block.
65#[derive(Clone)]
66struct IndexedTransaction<Tx> {
67    /// The transaction index in the block.
68    index: usize,
69    /// The wrapped transaction.
70    tx: Tx,
71}
72
73/// A task that is responsible for caching and prewarming the cache by executing transactions
74/// individually in parallel.
75///
76/// Note: This task runs until cancelled externally.
77#[derive(Debug)]
78pub struct PrewarmCacheTask<N, P, Evm>
79where
80    N: NodePrimitives,
81    Evm: ConfigureEvm<Primitives = N>,
82{
83    /// The executor used to spawn execution tasks.
84    executor: Runtime,
85    /// Shared execution cache.
86    execution_cache: PayloadExecutionCache,
87    /// Context provided to execution tasks
88    ctx: PrewarmContext<N, P, Evm>,
89    /// How many transactions should be executed in parallel
90    max_concurrency: usize,
91    /// Sender to emit evm state outcome messages, if any.
92    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
93    /// Receiver for events produced by tx execution
94    actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
95    /// Parent span for tracing
96    parent_span: Span,
97}
98
99impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
100where
101    N: NodePrimitives,
102    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
103    Evm: ConfigureEvm<Primitives = N> + 'static,
104{
105    /// Initializes the task with the given transactions pending execution
106    pub fn new(
107        executor: Runtime,
108        execution_cache: PayloadExecutionCache,
109        ctx: PrewarmContext<N, P, Evm>,
110        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
111        max_concurrency: usize,
112    ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
113        let (actions_tx, actions_rx) = channel();
114
115        trace!(
116            target: "engine::tree::payload_processor::prewarm",
117            max_concurrency,
118            transaction_count = ctx.env.transaction_count,
119            "Initialized prewarm task"
120        );
121
122        (
123            Self {
124                executor,
125                execution_cache,
126                ctx,
127                max_concurrency,
128                to_multi_proof,
129                actions_rx,
130                parent_span: Span::current(),
131            },
132            actions_tx,
133        )
134    }
135
136    /// Spawns all pending transactions as blocking tasks by first chunking them.
137    ///
138    /// For Optimism chains, special handling is applied to the first transaction if it's a
139    /// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
140    /// subsequent transactions in the block.
141    fn spawn_all<Tx>(
142        &self,
143        pending: mpsc::Receiver<Tx>,
144        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
145        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
146    ) where
147        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
148    {
149        let executor = self.executor.clone();
150        let ctx = self.ctx.clone();
151        let max_concurrency = self.max_concurrency;
152        let span = Span::current();
153
154        self.executor.spawn_blocking(move || {
155            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
156
157            let (done_tx, done_rx) = mpsc::channel();
158
159            // When transaction_count is 0, it means the count is unknown. In this case, spawn
160            // max workers to handle potentially many transactions in parallel rather
161            // than bottlenecking on a single worker.
162            let transaction_count = ctx.env.transaction_count;
163            let workers_needed = if transaction_count == 0 {
164                max_concurrency
165            } else {
166                transaction_count.min(max_concurrency)
167            };
168
169            // Spawn workers
170            let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor,  to_multi_proof.clone(), done_tx.clone());
171
172            // Distribute transactions to workers
173            let mut tx_index = 0usize;
174            while let Ok(tx) = pending.recv() {
175                // Stop distributing if termination was requested
176                if ctx.terminate_execution.load(Ordering::Relaxed) {
177                    trace!(
178                        target: "engine::tree::payload_processor::prewarm",
179                        "Termination requested, stopping transaction distribution"
180                    );
181                    break;
182                }
183
184                let indexed_tx = IndexedTransaction { index: tx_index, tx };
185
186                // Send transaction to the workers
187                // Ignore send errors: workers listen to terminate_execution and may
188                // exit early when signaled.
189                let _ = tx_sender.send(indexed_tx);
190
191                tx_index += 1;
192            }
193
194            // Send withdrawal prefetch targets after all transactions have been distributed
195            if let Some(to_multi_proof) = to_multi_proof
196                && let Some(withdrawals) = &ctx.env.withdrawals
197                && !withdrawals.is_empty()
198            {
199                let targets =
200                    multiproof_targets_from_withdrawals(withdrawals, ctx.v2_proofs_enabled);
201                let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
202            }
203
204            // drop sender and wait for all tasks to finish
205            drop(done_tx);
206            drop(tx_sender);
207            while done_rx.recv().is_ok() {}
208
209            let _ = actions_tx
210                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
211        });
212    }
213
214    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
215    /// It should only be called after ensuring that:
216    /// 1. All prewarming tasks have completed execution
217    /// 2. No other concurrent operations are accessing the cache
218    ///
219    /// Saves the warmed caches back into the shared slot after prewarming completes.
220    ///
221    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
222    /// the new, warmed cache to be inserted.
223    ///
224    /// This method is called from `run()` only after all execution tasks are complete.
225    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
226    fn save_cache(
227        self,
228        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
229        valid_block_rx: mpsc::Receiver<()>,
230    ) {
231        let start = Instant::now();
232
233        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
234            self;
235        let hash = env.hash;
236
237        if let Some(saved_cache) = saved_cache {
238            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
239            // Perform all cache operations atomically under the lock
240            execution_cache.update_with_guard(|cached| {
241                // consumes the `SavedCache` held by the prewarming task, which releases its usage
242                // guard
243                let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
244                let new_cache = SavedCache::new(hash, caches, cache_metrics)
245                    .with_disable_cache_metrics(disable_cache_metrics);
246
247                // Insert state into cache while holding the lock
248                // Access the BundleState through the shared ExecutionOutcome
249                if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
250                    // Clear the cache on error to prevent having a polluted cache
251                    *cached = None;
252                    debug!(target: "engine::caching", "cleared execution cache on update error");
253                    return;
254                }
255
256                new_cache.update_metrics();
257
258                if valid_block_rx.recv().is_ok() {
259                    // Replace the shared cache with the new one; the previous cache (if any) is
260                    // dropped.
261                    *cached = Some(new_cache);
262                } else {
263                    // Block was invalid; caches were already mutated by insert_state above,
264                    // so we must clear to prevent using polluted state
265                    *cached = None;
266                    debug!(target: "engine::caching", "cleared execution cache on invalid block");
267                }
268            });
269
270            let elapsed = start.elapsed();
271            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
272
273            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
274        }
275    }
276
277    /// Runs BAL-based prewarming by spawning workers to prefetch storage slots.
278    ///
279    /// Divides the total slots across `max_concurrency` workers, each responsible for
280    /// prefetching a range of slots from the BAL.
281    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
282    fn run_bal_prewarm(
283        &self,
284        bal: Arc<BlockAccessList>,
285        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
286    ) {
287        // Only prefetch if we have a cache to populate
288        if self.ctx.saved_cache.is_none() {
289            trace!(
290                target: "engine::tree::payload_processor::prewarm",
291                "Skipping BAL prewarm - no cache available"
292            );
293            self.send_bal_hashed_state(&bal);
294            let _ =
295                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
296            return;
297        }
298
299        let total_slots = total_slots(&bal);
300
301        trace!(
302            target: "engine::tree::payload_processor::prewarm",
303            total_slots,
304            max_concurrency = self.max_concurrency,
305            "Starting BAL prewarm"
306        );
307
308        if total_slots == 0 {
309            self.send_bal_hashed_state(&bal);
310            let _ =
311                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
312            return;
313        }
314
315        let (done_tx, done_rx) = mpsc::channel();
316
317        // Calculate number of workers needed (at most max_concurrency)
318        let workers_needed = total_slots.min(self.max_concurrency);
319
320        // Calculate slots per worker
321        let slots_per_worker = total_slots / workers_needed;
322        let remainder = total_slots % workers_needed;
323
324        // Spawn workers with their assigned ranges
325        for i in 0..workers_needed {
326            let start = i * slots_per_worker + i.min(remainder);
327            let extra = if i < remainder { 1 } else { 0 };
328            let end = start + slots_per_worker + extra;
329
330            self.ctx.spawn_bal_worker(
331                i,
332                &self.executor,
333                Arc::clone(&bal),
334                start..end,
335                done_tx.clone(),
336            );
337        }
338
339        // Drop our handle to done_tx so we can detect completion
340        drop(done_tx);
341
342        // Wait for all workers to complete
343        let mut completed_workers = 0;
344        while done_rx.recv().is_ok() {
345            completed_workers += 1;
346        }
347
348        trace!(
349            target: "engine::tree::payload_processor::prewarm",
350            completed_workers,
351            "All BAL prewarm workers completed"
352        );
353
354        // Convert BAL to HashedPostState and send to multiproof task
355        self.send_bal_hashed_state(&bal);
356
357        // Signal that execution has finished
358        let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
359    }
360
361    /// Converts the BAL to [`HashedPostState`](reth_trie::HashedPostState) and sends it to the
362    /// multiproof task.
363    fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
364        let Some(to_multi_proof) = &self.to_multi_proof else { return };
365
366        let provider = match self.ctx.provider.build() {
367            Ok(provider) => provider,
368            Err(err) => {
369                warn!(
370                    target: "engine::tree::payload_processor::prewarm",
371                    ?err,
372                    "Failed to build provider for BAL hashed state conversion"
373                );
374                return;
375            }
376        };
377
378        match bal::bal_to_hashed_post_state(bal, &provider) {
379            Ok(hashed_state) => {
380                debug!(
381                    target: "engine::tree::payload_processor::prewarm",
382                    accounts = hashed_state.accounts.len(),
383                    storages = hashed_state.storages.len(),
384                    "Converted BAL to hashed post state"
385                );
386                let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
387                let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
388            }
389            Err(err) => {
390                warn!(
391                    target: "engine::tree::payload_processor::prewarm",
392                    ?err,
393                    "Failed to convert BAL to hashed state"
394                );
395            }
396        }
397    }
398
399    /// Executes the task.
400    ///
401    /// This will execute the transactions until all transactions have been processed or the task
402    /// was cancelled.
403    #[instrument(
404        parent = &self.parent_span,
405        level = "debug",
406        target = "engine::tree::payload_processor::prewarm",
407        name = "prewarm and caching",
408        skip_all
409    )]
410    pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
411    where
412        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
413    {
414        // Spawn execution tasks based on mode
415        match mode {
416            PrewarmMode::Transactions(pending) => {
417                self.spawn_all(pending, actions_tx, self.to_multi_proof.clone());
418            }
419            PrewarmMode::BlockAccessList(bal) => {
420                self.run_bal_prewarm(bal, actions_tx);
421            }
422            PrewarmMode::Skipped => {
423                let _ = actions_tx
424                    .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
425            }
426        }
427
428        let mut final_execution_outcome = None;
429        let mut finished_execution = false;
430        while let Ok(event) = self.actions_rx.recv() {
431            match event {
432                PrewarmTaskEvent::TerminateTransactionExecution => {
433                    // stop tx processing
434                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
435                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
436                }
437                PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
438                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
439                    final_execution_outcome =
440                        Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
441
442                    if finished_execution {
443                        // all tasks are done, we can exit, which will save caches and exit
444                        break
445                    }
446                }
447                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
448                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
449                    self.ctx.metrics.transactions.set(executed_transactions as f64);
450                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
451
452                    finished_execution = true;
453
454                    if final_execution_outcome.is_some() {
455                        // all tasks are done, we can exit, which will save caches and exit
456                        break
457                    }
458                }
459            }
460        }
461
462        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
463
464        // save caches and finish using the shared ExecutionOutcome
465        if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
466            self.save_cache(execution_outcome, valid_block_rx);
467        }
468    }
469}
470
471/// Context required by tx execution tasks.
472#[derive(Debug, Clone)]
473pub struct PrewarmContext<N, P, Evm>
474where
475    N: NodePrimitives,
476    Evm: ConfigureEvm<Primitives = N>,
477{
478    /// The execution environment.
479    pub env: ExecutionEnv<Evm>,
480    /// The EVM configuration.
481    pub evm_config: Evm,
482    /// The saved cache.
483    pub saved_cache: Option<SavedCache>,
484    /// Provider to obtain the state
485    pub provider: StateProviderBuilder<N, P>,
486    /// The metrics for the prewarm task.
487    pub metrics: PrewarmMetrics,
488    /// An atomic bool that tells prewarm tasks to not start any more execution.
489    pub terminate_execution: Arc<AtomicBool>,
490    /// Whether the precompile cache is disabled.
491    pub precompile_cache_disabled: bool,
492    /// The precompile cache map.
493    pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
494    /// Whether V2 proof calculation is enabled.
495    pub v2_proofs_enabled: bool,
496}
497
498impl<N, P, Evm> PrewarmContext<N, P, Evm>
499where
500    N: NodePrimitives,
501    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
502    Evm: ConfigureEvm<Primitives = N> + 'static,
503{
504    /// Splits this context into an evm, an evm config, metrics, the atomic bool for terminating
505    /// execution, and whether V2 proofs are enabled.
506    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
507    fn evm_for_ctx(
508        self,
509    ) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>, bool)> {
510        let Self {
511            env,
512            evm_config,
513            saved_cache,
514            provider,
515            metrics,
516            terminate_execution,
517            precompile_cache_disabled,
518            precompile_cache_map,
519            v2_proofs_enabled,
520        } = self;
521
522        let mut state_provider = match provider.build() {
523            Ok(provider) => provider,
524            Err(err) => {
525                trace!(
526                    target: "engine::tree::payload_processor::prewarm",
527                    %err,
528                    "Failed to build state provider in prewarm thread"
529                );
530                return None
531            }
532        };
533
534        // Use the caches to create a new provider with caching
535        if let Some(saved_cache) = saved_cache {
536            let caches = saved_cache.cache().clone();
537            let cache_metrics = saved_cache.metrics().clone();
538            state_provider =
539                Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
540        }
541
542        let state_provider = StateProviderDatabase::new(state_provider);
543
544        let mut evm_env = env.evm_env;
545
546        // we must disable the nonce check so that we can execute the transaction even if the nonce
547        // doesn't match what's on chain.
548        evm_env.cfg_env.disable_nonce_check = true;
549
550        // disable the balance check so that transactions from senders who were funded by earlier
551        // transactions in the block can still be prewarmed
552        evm_env.cfg_env.disable_balance_check = true;
553
554        // create a new executor and disable nonce checks in the env
555        let spec_id = *evm_env.spec_id();
556        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
557
558        if !precompile_cache_disabled {
559            // Only cache pure precompiles to avoid issues with stateful precompiles
560            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
561                CachedPrecompile::wrap(
562                    precompile,
563                    precompile_cache_map.cache_for_address(*address),
564                    spec_id,
565                    None, // No metrics for prewarm
566                )
567            });
568        }
569
570        Some((evm, metrics, terminate_execution, v2_proofs_enabled))
571    }
572
573    /// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
574    /// transactions and streams [`MultiProofMessage::PrefetchProofs`] messages for each
575    /// transaction.
576    ///
577    /// This function processes transactions sequentially from the receiver and emits outcome events
578    /// via the provided sender. Execution errors are logged and tracked but do not stop the batch
579    /// processing unless the task is explicitly cancelled.
580    ///
581    /// Note: There are no ordering guarantees; this does not reflect the state produced by
582    /// sequential execution.
583    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
584    fn transact_batch<Tx>(
585        self,
586        txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
587        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
588        done_tx: Sender<()>,
589    ) where
590        Tx: ExecutableTxFor<Evm>,
591    {
592        let Some((mut evm, metrics, terminate_execution, v2_proofs_enabled)) = self.evm_for_ctx()
593        else {
594            return
595        };
596
597        while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
598            let _enter = debug_span!(
599                target: "engine::tree::payload_processor::prewarm",
600                "prewarm tx",
601                index,
602            )
603            .entered();
604
605            // create the tx env
606            let start = Instant::now();
607
608            // If the task was cancelled, stop execution, and exit.
609            if terminate_execution.load(Ordering::Relaxed) {
610                break
611            }
612
613            let (tx_env, tx) = tx.into_parts();
614            let res = match evm.transact(tx_env) {
615                Ok(res) => res,
616                Err(err) => {
617                    trace!(
618                        target: "engine::tree::payload_processor::prewarm",
619                        %err,
620                        tx_hash=%tx.tx().tx_hash(),
621                        sender=%tx.signer(),
622                        "Error when executing prewarm transaction",
623                    );
624                    // Track transaction execution errors
625                    metrics.transaction_errors.increment(1);
626                    // skip error because we can ignore these errors and continue with the next tx
627                    continue
628                }
629            };
630            metrics.execution_duration.record(start.elapsed());
631
632            // If the task was cancelled, stop execution, and exit.
633            if terminate_execution.load(Ordering::Relaxed) {
634                break
635            }
636
637            // Only send outcome for transactions after the first txn
638            // as the main execution will be just as fast
639            if index > 0 {
640                let (targets, storage_targets) =
641                    multiproof_targets_from_state(res.state, v2_proofs_enabled);
642                metrics.prefetch_storage_targets.record(storage_targets as f64);
643                if let Some(to_multi_proof) = &to_multi_proof {
644                    let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
645                }
646            }
647
648            metrics.total_runtime.record(start.elapsed());
649        }
650
651        // send a message to the main task to flag that we're done
652        let _ = done_tx.send(());
653    }
654
655    /// Spawns worker tasks that pull transactions from a shared channel.
656    ///
657    /// Returns the sender for distributing transactions to workers.
658    fn spawn_workers<Tx>(
659        self,
660        workers_needed: usize,
661        task_executor: &Runtime,
662        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
663        done_tx: Sender<()>,
664    ) -> CrossbeamSender<IndexedTransaction<Tx>>
665    where
666        Tx: ExecutableTxFor<Evm> + Send + 'static,
667    {
668        let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
669
670        // Spawn workers that all pull from the shared receiver
671        let executor = task_executor.clone();
672        let span = Span::current();
673        task_executor.spawn_blocking(move || {
674            let _enter = span.entered();
675            for idx in 0..workers_needed {
676                let ctx = self.clone();
677                let to_multi_proof = to_multi_proof.clone();
678                let done_tx = done_tx.clone();
679                let rx = tx_receiver.clone();
680                let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
681                executor.spawn_blocking(move || {
682                    let _enter = span.entered();
683                    ctx.transact_batch(rx, to_multi_proof, done_tx);
684                });
685            }
686        });
687
688        tx_sender
689    }
690
691    /// Spawns a worker task for BAL slot prefetching.
692    ///
693    /// The worker iterates over the specified range of slots in the BAL and ensures
694    /// each slot is loaded into the cache by accessing it through the state provider.
695    fn spawn_bal_worker(
696        &self,
697        idx: usize,
698        executor: &Runtime,
699        bal: Arc<BlockAccessList>,
700        range: Range<usize>,
701        done_tx: Sender<()>,
702    ) {
703        let ctx = self.clone();
704        let span = debug_span!(
705            target: "engine::tree::payload_processor::prewarm",
706            "bal prewarm worker",
707            idx,
708            range_start = range.start,
709            range_end = range.end
710        );
711
712        executor.spawn_blocking(move || {
713            let _enter = span.entered();
714            ctx.prefetch_bal_slots(bal, range, done_tx);
715        });
716    }
717
718    /// Prefetches storage slots from a BAL range into the cache.
719    ///
720    /// This iterates through the specified range of slots and accesses them via the state
721    /// provider to populate the cache.
722    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
723    fn prefetch_bal_slots(
724        self,
725        bal: Arc<BlockAccessList>,
726        range: Range<usize>,
727        done_tx: Sender<()>,
728    ) {
729        let Self { saved_cache, provider, metrics, .. } = self;
730
731        // Build state provider
732        let state_provider = match provider.build() {
733            Ok(provider) => provider,
734            Err(err) => {
735                trace!(
736                    target: "engine::tree::payload_processor::prewarm",
737                    %err,
738                    "Failed to build state provider in BAL prewarm thread"
739                );
740                let _ = done_tx.send(());
741                return;
742            }
743        };
744
745        // Wrap with cache (guaranteed to be Some since run_bal_prewarm checks)
746        let saved_cache = saved_cache.expect("BAL prewarm should only run with cache");
747        let caches = saved_cache.cache().clone();
748        let cache_metrics = saved_cache.metrics().clone();
749        let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics);
750
751        let start = Instant::now();
752
753        // Track last seen address to avoid fetching the same account multiple times.
754        let mut last_address = None;
755
756        // Iterate through the assigned range of slots
757        for (address, slot) in BALSlotIter::new(&bal, range.clone()) {
758            // Fetch the account if this is a different address than the last one
759            if last_address != Some(address) {
760                let _ = state_provider.basic_account(&address);
761                last_address = Some(address);
762            }
763
764            // Access the slot to populate the cache
765            let _ = state_provider.storage(address, slot);
766        }
767
768        let elapsed = start.elapsed();
769
770        trace!(
771            target: "engine::tree::payload_processor::prewarm",
772            ?range,
773            elapsed_ms = elapsed.as_millis(),
774            "BAL prewarm worker completed"
775        );
776
777        // Signal completion
778        let _ = done_tx.send(());
779        metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64());
780    }
781}
782
783/// Returns a set of [`VersionedMultiProofTargets`] and the total amount of storage targets, based
784/// on the given state.
785fn multiproof_targets_from_state(
786    state: EvmState,
787    v2_enabled: bool,
788) -> (VersionedMultiProofTargets, usize) {
789    if v2_enabled {
790        multiproof_targets_v2_from_state(state)
791    } else {
792        multiproof_targets_legacy_from_state(state)
793    }
794}
795
796/// Returns legacy [`MultiProofTargets`] and the total amount of storage targets, based on the
797/// given state.
798fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
799    let mut targets = MultiProofTargets::with_capacity(state.len());
800    let mut storage_targets = 0;
801    for (addr, account) in state {
802        // if the account was not touched, or if the account was selfdestructed, do not
803        // fetch proofs for it
804        //
805        // Since selfdestruct can only happen in the same transaction, we can skip
806        // prefetching proofs for selfdestructed accounts
807        //
808        // See: https://eips.ethereum.org/EIPS/eip-6780
809        if !account.is_touched() || account.is_selfdestructed() {
810            continue
811        }
812
813        let mut storage_set =
814            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
815        for (key, slot) in account.storage {
816            // do nothing if unchanged
817            if !slot.is_changed() {
818                continue
819            }
820
821            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
822        }
823
824        storage_targets += storage_set.len();
825        targets.insert(keccak256(addr), storage_set);
826    }
827
828    (VersionedMultiProofTargets::Legacy(targets), storage_targets)
829}
830
831/// Returns V2 [`reth_trie_parallel::targets_v2::MultiProofTargetsV2`] and the total amount of
832/// storage targets, based on the given state.
833fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
834    use reth_trie::proof_v2;
835    use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
836
837    let mut targets = MultiProofTargetsV2::default();
838    let mut storage_target_count = 0;
839    for (addr, account) in state {
840        // if the account was not touched, or if the account was selfdestructed, do not
841        // fetch proofs for it
842        //
843        // Since selfdestruct can only happen in the same transaction, we can skip
844        // prefetching proofs for selfdestructed accounts
845        //
846        // See: https://eips.ethereum.org/EIPS/eip-6780
847        if !account.is_touched() || account.is_selfdestructed() {
848            continue
849        }
850
851        let hashed_address = keccak256(addr);
852        targets.account_targets.push(hashed_address.into());
853
854        let mut storage_slots = Vec::with_capacity(account.storage.len());
855        for (key, slot) in account.storage {
856            // do nothing if unchanged
857            if !slot.is_changed() {
858                continue
859            }
860
861            let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
862            storage_slots.push(proof_v2::Target::from(hashed_slot));
863        }
864
865        storage_target_count += storage_slots.len();
866        if !storage_slots.is_empty() {
867            targets.storage_targets.insert(hashed_address, storage_slots);
868        }
869    }
870
871    (VersionedMultiProofTargets::V2(targets), storage_target_count)
872}
873
874/// Returns [`VersionedMultiProofTargets`] for withdrawal addresses.
875///
876/// Withdrawals only modify account balances (no storage), so the targets contain
877/// only account-level entries with empty storage sets.
878fn multiproof_targets_from_withdrawals(
879    withdrawals: &[Withdrawal],
880    v2_enabled: bool,
881) -> VersionedMultiProofTargets {
882    use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
883    if v2_enabled {
884        VersionedMultiProofTargets::V2(MultiProofTargetsV2 {
885            account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
886            ..Default::default()
887        })
888    } else {
889        VersionedMultiProofTargets::Legacy(
890            withdrawals.iter().map(|w| (keccak256(w.address), Default::default())).collect(),
891        )
892    }
893}
894
895/// The events the pre-warm task can handle.
896///
897/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
898/// execution path without cloning the expensive `BundleState`.
899#[derive(Debug)]
900pub enum PrewarmTaskEvent<R> {
901    /// Forcefully terminate all remaining transaction execution.
902    TerminateTransactionExecution,
903    /// Forcefully terminate the task on demand and update the shared cache with the given output
904    /// before exiting.
905    Terminate {
906        /// The final execution outcome. Using `Arc` allows sharing with the main execution
907        /// path without cloning the expensive `BundleState`.
908        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
909        /// Receiver for the block validation result.
910        ///
911        /// Cache saving is racing the state root validation. We optimistically construct the
912        /// updated cache but only save it once we know the block is valid.
913        valid_block_rx: mpsc::Receiver<()>,
914    },
915    /// Finished executing all transactions
916    FinishedTxExecution {
917        /// Number of transactions executed
918        executed_transactions: usize,
919    },
920}
921
922/// Metrics for transactions prewarming.
923#[derive(Metrics, Clone)]
924#[metrics(scope = "sync.prewarm")]
925pub struct PrewarmMetrics {
926    /// The number of transactions to prewarm
927    pub(crate) transactions: Gauge,
928    /// A histogram of the number of transactions to prewarm
929    pub(crate) transactions_histogram: Histogram,
930    /// A histogram of duration per transaction prewarming
931    pub(crate) total_runtime: Histogram,
932    /// A histogram of EVM execution duration per transaction prewarming
933    pub(crate) execution_duration: Histogram,
934    /// A histogram for prefetch targets per transaction prewarming
935    pub(crate) prefetch_storage_targets: Histogram,
936    /// A histogram of duration for cache saving
937    pub(crate) cache_saving_duration: Gauge,
938    /// Counter for transaction execution errors during prewarming
939    pub(crate) transaction_errors: Counter,
940    /// A histogram of BAL slot iteration duration during prefetching
941    pub(crate) bal_slot_iteration_duration: Histogram,
942}