Skip to main content

reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    payload_processor::multiproof::StateRootMessage,
16    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
17    CachedStateMetrics, CachedStateProvider, ExecutionEnv, PayloadExecutionCache, SavedCache,
18    StateProviderBuilder,
19};
20use alloy_consensus::transaction::TxHashRef;
21use alloy_eip7928::bal::DecodedBal;
22use alloy_eips::eip4895::Withdrawal;
23use alloy_primitives::{keccak256, StorageKey, B256};
24use crossbeam_channel::Sender as CrossbeamSender;
25use metrics::{Counter, Gauge, Histogram};
26use rayon::prelude::*;
27use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
28use reth_metrics::Metrics;
29use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
30use reth_provider::{
31    AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
32    StateReader,
33};
34use reth_revm::{database::StateProviderDatabase, state::EvmState};
35use reth_tasks::{pool::WorkerPool, Runtime};
36use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
37use std::sync::{
38    atomic::{AtomicBool, AtomicUsize, Ordering},
39    mpsc::{self, channel, Receiver, Sender},
40    Arc,
41};
42use tokio::sync::oneshot;
43use tracing::{debug, debug_span, instrument, trace, trace_span, warn, Span};
44
45/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
46#[derive(Debug)]
47pub enum PrewarmMode<Tx> {
48    /// Prewarm by executing transactions from a stream, each paired with its block index.
49    Transactions(Receiver<(usize, Tx)>),
50    /// Prewarm by prefetching slots from a Block Access List.
51    BlockAccessList(Arc<DecodedBal>),
52    /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
53    /// benefit). No workers are spawned.
54    Skipped,
55}
56
57/// A task that is responsible for caching and prewarming the cache by executing transactions
58/// individually in parallel.
59///
60/// Note: This task runs until cancelled externally.
61#[derive(Debug)]
62pub struct PrewarmCacheTask<N, P, Evm>
63where
64    N: NodePrimitives,
65    Evm: ConfigureEvm<Primitives = N>,
66{
67    /// The executor used to spawn execution tasks.
68    executor: Runtime,
69    /// Shared execution cache.
70    execution_cache: PayloadExecutionCache,
71    /// Context provided to execution tasks
72    ctx: PrewarmContext<N, P, Evm>,
73    /// Sender to emit evm state outcome messages to the sparse trie task, if any.
74    to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
75    /// Receiver for events produced by tx execution
76    actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
77    /// Parent span for tracing
78    parent_span: Span,
79}
80
81impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
82where
83    N: NodePrimitives,
84    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
85    Evm: ConfigureEvm<Primitives = N> + 'static,
86{
87    /// Initializes the task with the given transactions pending execution
88    pub fn new(
89        executor: Runtime,
90        execution_cache: PayloadExecutionCache,
91        ctx: PrewarmContext<N, P, Evm>,
92        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
93    ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
94        let (actions_tx, actions_rx) = channel();
95
96        trace!(
97            target: "engine::tree::payload_processor::prewarm",
98            prewarming_threads = executor.prewarming_pool().current_num_threads(),
99            transaction_count = ctx.env.transaction_count,
100            "Initialized prewarm task"
101        );
102
103        (
104            Self {
105                executor,
106                execution_cache,
107                ctx,
108                to_sparse_trie_task,
109                actions_rx,
110                parent_span: Span::current(),
111            },
112            actions_tx,
113        )
114    }
115
116    /// Streams pending transactions and executes them in parallel on the prewarming pool.
117    ///
118    /// Kicks off EVM init on every pool thread, then uses `in_place_scope` to dispatch
119    /// transactions as they arrive and wait for all spawned tasks to complete before
120    /// clearing per-thread state. Workers that start via work-stealing lazily initialise
121    /// their EVM state on first access via [`get_or_init`](reth_tasks::pool::Worker::get_or_init).
122    fn spawn_txs_prewarm<Tx>(
123        &self,
124        pending: mpsc::Receiver<(usize, Tx)>,
125        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
126        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
127    ) where
128        Tx: ExecutableTxFor<Evm> + Send + 'static,
129    {
130        let executor = self.executor.clone();
131        let ctx = self.ctx.clone();
132        let span = Span::current();
133
134        self.executor.spawn_blocking_named("prewarm-txs", move || {
135            let _enter = debug_span!(
136                target: "engine::tree::payload_processor::prewarm",
137                parent: &span,
138                "prewarm_txs"
139            )
140            .entered();
141
142            let ctx = &ctx;
143            let pool = executor.prewarming_pool();
144
145            let mut tx_count = 0usize;
146            let to_sparse_trie_task = to_sparse_trie_task.as_ref();
147            pool.in_place_scope(|s| {
148                s.spawn(|_| {
149                    pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
150                });
151
152                while let Ok((index, tx)) = pending.recv() {
153                    if ctx.should_stop() {
154                        trace!(
155                            target: "engine::tree::payload_processor::prewarm",
156                            "Termination requested, stopping transaction distribution"
157                        );
158                        break;
159                    }
160
161                    // skip transactions already executed by the main loop
162                    if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
163                        continue;
164                    }
165
166                    tx_count += 1;
167                    let parent_span = Span::current();
168                    s.spawn(move |_| {
169                        let _enter = trace_span!(
170                            target: "engine::tree::payload_processor::prewarm",
171                            parent: parent_span,
172                            "prewarm_tx",
173                            i = index,
174                        )
175                        .entered();
176                        Self::transact_worker(ctx, index, tx, to_sparse_trie_task);
177                    });
178                }
179
180                // Send withdrawal prefetch targets after all transactions dispatched
181                if let Some(to_sparse_trie_task) = to_sparse_trie_task &&
182                    let Some(withdrawals) = &ctx.env.withdrawals &&
183                    !withdrawals.is_empty()
184                {
185                    let targets = multiproof_targets_from_withdrawals(withdrawals);
186                    let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
187                }
188            });
189
190            // All tasks are done — clear per-thread EVM state for the next block.
191            pool.clear();
192
193            let _ = actions_tx
194                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
195        });
196    }
197
198    /// Executes a single prewarm transaction on the current pool thread's EVM.
199    ///
200    /// Lazily initialises per-thread [`PrewarmEvmState`] via
201    /// [`get_or_init`](reth_tasks::pool::Worker::get_or_init) on first access.
202    fn transact_worker<Tx>(
203        ctx: &PrewarmContext<N, P, Evm>,
204        index: usize,
205        tx: Tx,
206        to_sparse_trie_task: Option<&CrossbeamSender<StateRootMessage>>,
207    ) where
208        Tx: ExecutableTxFor<Evm>,
209    {
210        WorkerPool::with_worker_mut(|worker| {
211            let Some(evm) =
212                worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
213            else {
214                return;
215            };
216
217            if ctx.should_stop() {
218                return;
219            }
220
221            // skip if main execution has already processed this transaction
222            if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
223                return;
224            }
225
226            let start = Instant::now();
227
228            let (tx_env, tx) = tx.into_parts();
229            let res = match evm.transact(tx_env) {
230                Ok(res) => res,
231                Err(err) => {
232                    trace!(
233                        target: "engine::tree::payload_processor::prewarm",
234                        %err,
235                        tx_hash=%tx.tx().tx_hash(),
236                        sender=%tx.signer(),
237                        "Error when executing prewarm transaction",
238                    );
239                    ctx.metrics.transaction_errors.increment(1);
240                    return;
241                }
242            };
243            ctx.metrics.execution_duration.record(start.elapsed());
244
245            if ctx.should_stop() {
246                return;
247            }
248
249            if index > 0 {
250                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
251                ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
252                if let Some(to_sparse_trie_task) = to_sparse_trie_task {
253                    let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
254                }
255            }
256
257            ctx.metrics.total_runtime.record(start.elapsed());
258        });
259    }
260
261    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
262    /// It should only be called after ensuring that:
263    /// 1. All prewarming tasks have completed execution
264    /// 2. No other concurrent operations are accessing the cache
265    ///
266    /// Saves the warmed caches back into the shared slot after prewarming completes.
267    ///
268    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
269    /// the new, warmed cache to be inserted.
270    ///
271    /// This method is called from `run()` only after all execution tasks are complete.
272    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
273    fn save_cache(
274        self,
275        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
276        valid_block_rx: mpsc::Receiver<()>,
277    ) {
278        let start = Instant::now();
279
280        let Self {
281            execution_cache,
282            ctx: PrewarmContext { env, metrics, cache_metrics, saved_cache, .. },
283            ..
284        } = self;
285        let hash = env.hash;
286
287        if let Some(saved_cache) = saved_cache {
288            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
289            execution_cache.update_with_guard(|cached| {
290                // consumes the `SavedCache` held by the prewarming task, which releases its usage
291                // guard
292                let caches = saved_cache.cache().clone();
293                let new_cache = SavedCache::new(hash, caches);
294
295                // Insert state into cache while holding the lock
296                // Access the BundleState through the shared ExecutionOutcome
297                if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
298                    // Clear the cache on error to prevent having a polluted cache
299                    *cached = None;
300                    debug!(target: "engine::caching", "cleared execution cache on update error");
301                    return;
302                }
303
304                new_cache.update_metrics(cache_metrics.as_ref());
305
306                if valid_block_rx.recv().is_ok() {
307                    // Replace the shared cache with the new one; the previous cache (if any) is
308                    // dropped.
309                    *cached = Some(new_cache);
310                } else {
311                    // Block was invalid; caches were already mutated by insert_state above,
312                    // so we must clear to prevent using polluted state
313                    *cached = None;
314                    debug!(target: "engine::caching", "cleared execution cache on invalid block");
315                }
316            });
317
318            let elapsed = start.elapsed();
319            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
320
321            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
322        }
323    }
324
325    /// Runs BAL-based prewarming and sparse-trie work inline.
326    ///
327    /// Spawns two halves concurrently on separate pools, then waits for both to complete:
328    /// 1. Hashed state streaming on the BAL streaming pool so storage updates can reach the sparse
329    ///    trie before account reads finish.
330    /// 2. Storage prefetch on the prewarming pool to populate the execution cache, unless BAL batch
331    ///    I/O is disabled.
332    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
333    fn run_bal_prewarm(
334        &self,
335        decoded_bal: Arc<DecodedBal>,
336        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
337    ) {
338        let bal = decoded_bal.as_bal();
339        if bal.is_empty() {
340            if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
341                let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
342            }
343            let _ =
344                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
345            return;
346        }
347
348        trace!(
349            target: "engine::tree::payload_processor::prewarm",
350            accounts = bal.len(),
351            "Starting BAL prewarm"
352        );
353
354        let ctx = self.ctx.clone();
355        let to_sparse_trie_task = self.to_sparse_trie_task.clone();
356        let executor = self.executor.clone();
357        let parent_span = Span::current();
358        let prefetch_parent_span = parent_span.clone();
359        let stream_parent_span = parent_span;
360        let prefetch_bal = Arc::clone(&decoded_bal);
361        let stream_bal = Arc::clone(&decoded_bal);
362        let (prefetch_tx, prefetch_rx) = oneshot::channel();
363        let (stream_tx, stream_rx) = oneshot::channel();
364
365        if let Some(to_sparse_trie_task) = to_sparse_trie_task {
366            let stream_ctx = ctx.clone();
367            executor.bal_streaming_pool().spawn(move || {
368                let branch_span = debug_span!(
369                    target: "engine::tree::payload_processor::prewarm",
370                    parent: &stream_parent_span,
371                    "bal_hashed_state_stream",
372                    bal_accounts = stream_bal.as_bal().len(),
373                );
374                let provider_parent_span = branch_span.clone();
375                let _span = branch_span.entered();
376
377                stream_bal.as_bal().par_iter().for_each_init(
378                    || {
379                        (
380                            stream_ctx.clone(),
381                            None::<Box<dyn AccountReader>>,
382                            provider_parent_span.clone(),
383                        )
384                    },
385                    |(ctx, provider, parent_span), account_changes| {
386                        ctx.send_bal_hashed_state(
387                            parent_span,
388                            provider,
389                            account_changes,
390                            &to_sparse_trie_task,
391                        );
392                    },
393                );
394
395                let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
396                let _ = stream_tx.send(());
397            });
398        } else {
399            let _ = stream_tx.send(());
400        }
401
402        if ctx.saved_cache.is_some() && !ctx.disable_bal_batch_io {
403            executor.prewarming_pool().spawn(move || {
404                let branch_span = debug_span!(
405                    target: "engine::tree::payload_processor::prewarm",
406                    parent: &prefetch_parent_span,
407                    "bal_prefetch_storage",
408                    bal_accounts = prefetch_bal.as_bal().len(),
409                );
410                let provider_parent_span = branch_span.clone();
411                let _span = branch_span.entered();
412
413                prefetch_bal.as_bal().par_iter().for_each_init(
414                    || {
415                        (
416                            ctx.clone(),
417                            None::<CachedStateProvider<reth_provider::StateProviderBox, true>>,
418                            provider_parent_span.clone(),
419                        )
420                    },
421                    |(ctx, provider, parent_span), account| {
422                        if ctx.should_stop() {
423                            return;
424                        }
425                        ctx.prefetch_bal_storage(parent_span, provider, account);
426                    },
427                );
428
429                let _ = prefetch_tx.send(());
430            });
431        } else {
432            let _ = prefetch_tx.send(());
433        }
434
435        prefetch_rx
436            .blocking_recv()
437            .expect("BAL prefetch task dropped without signaling completion");
438        stream_rx
439            .blocking_recv()
440            .expect("BAL hashed-state streaming task dropped without signaling completion");
441
442        let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
443    }
444
445    /// Executes the task.
446    ///
447    /// This will execute the transactions until all transactions have been processed or the task
448    /// was cancelled.
449    #[instrument(
450        parent = &self.parent_span,
451        level = "debug",
452        target = "engine::tree::payload_processor::prewarm",
453        name = "prewarm and caching",
454        skip_all
455    )]
456    pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
457    where
458        Tx: ExecutableTxFor<Evm> + Send + 'static,
459    {
460        // Spawn execution tasks based on mode
461        match mode {
462            PrewarmMode::Transactions(pending) => {
463                self.spawn_txs_prewarm(pending, actions_tx, self.to_sparse_trie_task.clone());
464            }
465            PrewarmMode::BlockAccessList(bal) => {
466                self.run_bal_prewarm(bal, actions_tx);
467            }
468            PrewarmMode::Skipped => {
469                let _ = actions_tx
470                    .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
471            }
472        }
473
474        let mut final_execution_outcome = None;
475        let mut finished_execution = false;
476        while let Ok(event) = self.actions_rx.recv() {
477            match event {
478                PrewarmTaskEvent::TerminateTransactionExecution => {
479                    // stop tx processing
480                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
481                    self.ctx.stop();
482                }
483                PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
484                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
485                    final_execution_outcome =
486                        Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
487
488                    if finished_execution {
489                        // all tasks are done, we can exit, which will save caches and exit
490                        break
491                    }
492                }
493                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
494                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
495                    self.ctx.metrics.transactions.set(executed_transactions as f64);
496                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
497
498                    finished_execution = true;
499
500                    if final_execution_outcome.is_some() {
501                        // all tasks are done, we can exit, which will save caches and exit
502                        break
503                    }
504                }
505            }
506        }
507
508        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
509
510        // save caches and finish using the shared ExecutionOutcome
511        if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
512            self.save_cache(execution_outcome, valid_block_rx);
513        }
514    }
515}
516
517/// Context required by tx execution tasks.
518#[derive(Debug, Clone)]
519pub struct PrewarmContext<N, P, Evm>
520where
521    N: NodePrimitives,
522    Evm: ConfigureEvm<Primitives = N>,
523{
524    /// The execution environment.
525    pub env: ExecutionEnv<Evm>,
526    /// The EVM configuration.
527    pub evm_config: Evm,
528    /// The saved cache.
529    pub saved_cache: Option<SavedCache>,
530    /// Provider to obtain the state
531    pub provider: StateProviderBuilder<N, P>,
532    /// The metrics for the prewarm task.
533    pub metrics: PrewarmMetrics,
534    /// Metrics for the execution cache.
535    /// Metrics for the execution cache. `None` disables metrics recording.
536    pub cache_metrics: Option<CachedStateMetrics>,
537    /// An atomic bool that tells prewarm tasks to not start any more execution.
538    pub terminate_execution: Arc<AtomicBool>,
539    /// Shared counter tracking the next transaction index to be executed by the main execution
540    /// loop. Prewarm workers skip transactions with `index < counter` since those have already
541    /// been executed.
542    pub executed_tx_index: Arc<AtomicUsize>,
543    /// Whether the precompile cache is disabled.
544    pub precompile_cache_disabled: bool,
545    /// The precompile cache map.
546    pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
547    /// Whether to disable BAL-driven parallel state root computation.
548    /// Only valid when BAL parallel execution is also disabled.
549    pub disable_bal_parallel_state_root: bool,
550    /// Whether BAL state prefetching during prewarm is disabled.
551    pub disable_bal_batch_io: bool,
552}
553
554/// Per-thread EVM state initialised by [`PrewarmContext::evm_for_ctx`] and stored in
555/// [`WorkerPool`] workers via [`Worker::get_or_init`](reth_tasks::pool::Worker::get_or_init).
556type PrewarmEvmState<Evm> =
557    Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
558
559impl<N, P, Evm> PrewarmContext<N, P, Evm>
560where
561    N: NodePrimitives,
562    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
563    Evm: ConfigureEvm<Primitives = N> + 'static,
564{
565    /// Creates a per-thread EVM for prewarming.
566    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
567    fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
568        let mut state_provider = match self.provider.build() {
569            Ok(provider) => provider,
570            Err(err) => {
571                trace!(
572                    target: "engine::tree::payload_processor::prewarm",
573                    %err,
574                    "Failed to build state provider in prewarm thread"
575                );
576                return None
577            }
578        };
579
580        // Use the caches to create a new provider with caching
581        if let Some(saved_cache) = &self.saved_cache {
582            let caches = saved_cache.cache().clone();
583            state_provider = Box::new(CachedStateProvider::new_prewarm(
584                state_provider,
585                caches,
586                self.cache_metrics.clone().unwrap_or_default(),
587            ));
588        }
589
590        let state_provider = StateProviderDatabase::new(state_provider);
591
592        let mut evm_env = self.env.evm_env.clone();
593
594        // we must disable the nonce check so that we can execute the transaction even if the nonce
595        // doesn't match what's on chain.
596        evm_env.cfg_env.disable_nonce_check = true;
597
598        // disable the balance check so that transactions from senders who were funded by earlier
599        // transactions in the block can still be prewarmed
600        evm_env.cfg_env.disable_balance_check = true;
601
602        // create a new executor and disable nonce checks in the env
603        let spec_id = *evm_env.spec_id();
604        let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
605
606        if !self.precompile_cache_disabled {
607            // Only cache pure precompiles to avoid issues with stateful precompiles
608            evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
609                CachedPrecompile::wrap(
610                    precompile,
611                    self.precompile_cache_map.cache_for_address(*address),
612                    spec_id,
613                    None, // No metrics for prewarm
614                )
615            });
616        }
617
618        Some(evm)
619    }
620
621    /// Returns `true` if prewarming should stop.
622    #[inline]
623    pub fn should_stop(&self) -> bool {
624        self.terminate_execution.load(Ordering::Relaxed)
625    }
626
627    /// Signals all prewarm tasks to stop execution.
628    #[inline]
629    pub fn stop(&self) {
630        self.terminate_execution.store(true, Ordering::Relaxed);
631    }
632
633    /// Hashes and streams a single BAL account's state to the sparse trie task.
634    ///
635    /// For each account, storage slots are hashed and sent immediately, then the account is read
636    /// from the database and sent as a separate update.
637    ///
638    /// The `provider` is lazily initialized on first call and reused across accounts on the same
639    /// thread.
640    fn send_bal_hashed_state(
641        &self,
642        parent_span: &Span,
643        provider: &mut Option<Box<dyn AccountReader>>,
644        account_changes: &alloy_eip7928::AccountChanges,
645        to_sparse_trie_task: &CrossbeamSender<StateRootMessage>,
646    ) {
647        if self.disable_bal_parallel_state_root {
648            return;
649        }
650        let address = account_changes.address;
651        let mut hashed_address = None;
652
653        if !account_changes.storage_changes.is_empty() {
654            let hashed_address = *hashed_address.get_or_insert_with(|| keccak256(address));
655            let mut storage_map = reth_trie::HashedStorage::new(false);
656
657            for slot_changes in &account_changes.storage_changes {
658                let hashed_slot = keccak256(slot_changes.slot.to_be_bytes::<32>());
659                if let Some(last_change) = slot_changes.changes.last() {
660                    storage_map.storage.insert(hashed_slot, last_change.new_value);
661                }
662            }
663
664            let mut hashed_state = reth_trie::HashedPostState::default();
665            hashed_state.storages.insert(hashed_address, storage_map);
666            let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
667        }
668
669        if provider.is_none() {
670            let _span = debug_span!(
671                target: "engine::tree::payload_processor::prewarm",
672                parent: parent_span,
673                "bal_hashed_state_provider_init",
674                has_saved_cache = !self.disable_bal_batch_io && self.saved_cache.is_some(),
675            )
676            .entered();
677
678            let inner = match self.provider.build() {
679                Ok(p) => p,
680                Err(err) => {
681                    warn!(
682                        target: "engine::tree::payload_processor::prewarm",
683                        ?err,
684                        "Failed to build provider for BAL account reads"
685                    );
686                    return;
687                }
688            };
689            let boxed: Box<dyn AccountReader> = match (self.disable_bal_batch_io, &self.saved_cache)
690            {
691                (false, Some(saved)) => {
692                    let caches = saved.cache().clone();
693                    Box::new(CachedStateProvider::new_prewarm(
694                        inner,
695                        caches,
696                        self.cache_metrics.clone().unwrap_or_default(),
697                    ))
698                }
699                _ => Box::new(inner),
700            };
701            *provider = Some(boxed);
702        }
703        let account_reader = provider.as_ref().expect("provider just initialized");
704
705        let existing_account = account_reader.basic_account(&address).ok().flatten();
706
707        let balance = account_changes.balance_changes.last().map(|change| change.post_balance);
708        let nonce = account_changes.nonce_changes.last().map(|change| change.new_nonce);
709        let code_hash = account_changes.code_changes.last().map(|code_change| {
710            if code_change.new_code.is_empty() {
711                alloy_consensus::constants::KECCAK_EMPTY
712            } else {
713                keccak256(&code_change.new_code)
714            }
715        });
716
717        if balance.is_none() &&
718            nonce.is_none() &&
719            code_hash.is_none() &&
720            account_changes.storage_changes.is_empty()
721        {
722            return;
723        }
724
725        let account = reth_primitives_traits::Account {
726            balance: balance.unwrap_or_else(|| {
727                existing_account
728                    .as_ref()
729                    .map(|account| account.balance)
730                    .unwrap_or(alloy_primitives::U256::ZERO)
731            }),
732            nonce: nonce.unwrap_or_else(|| {
733                existing_account.as_ref().map(|account| account.nonce).unwrap_or(0)
734            }),
735            bytecode_hash: code_hash.or_else(|| {
736                existing_account
737                    .as_ref()
738                    .and_then(|account| account.bytecode_hash)
739                    .or(Some(alloy_consensus::constants::KECCAK_EMPTY))
740            }),
741        };
742
743        let hashed_address = hashed_address.unwrap_or_else(|| keccak256(address));
744        let mut hashed_state = reth_trie::HashedPostState::default();
745        hashed_state.accounts.insert(hashed_address, Some(account));
746
747        let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
748    }
749
750    /// Prefetches storage slots for a single BAL account into the cache.
751    ///
752    /// Account reads are handled separately by [`Self::send_bal_hashed_state`], so this method
753    /// only
754    /// warms storage.
755    ///
756    /// The `provider` is lazily initialized on first call and reused across accounts on the same
757    /// thread.
758    fn prefetch_bal_storage(
759        &self,
760        parent_span: &Span,
761        provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox, true>>,
762        account: &alloy_eip7928::AccountChanges,
763    ) {
764        if self.disable_bal_batch_io ||
765            (account.storage_changes.is_empty() && account.storage_reads.is_empty())
766        {
767            return;
768        }
769
770        let state_provider = match provider {
771            Some(p) => p,
772            slot @ None => {
773                let _span = debug_span!(
774                    target: "engine::tree::payload_processor::prewarm",
775                    parent: parent_span,
776                    "bal_prefetch_provider_init",
777                )
778                .entered();
779
780                let built = match self.provider.build() {
781                    Ok(p) => p,
782                    Err(err) => {
783                        trace!(
784                            target: "engine::tree::payload_processor::prewarm",
785                            %err,
786                            "Failed to build state provider in BAL prewarm thread"
787                        );
788                        return;
789                    }
790                };
791                let saved_cache =
792                    self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
793                let caches = saved_cache.cache().clone();
794                slot.insert(CachedStateProvider::new_prewarm(
795                    built,
796                    caches,
797                    self.cache_metrics.clone().unwrap_or_default(),
798                ))
799            }
800        };
801
802        let start = Instant::now();
803
804        for slot in &account.storage_changes {
805            let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
806        }
807        for &slot in &account.storage_reads {
808            let _ = state_provider.storage(account.address, StorageKey::from(slot));
809        }
810
811        self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
812    }
813}
814
815/// Returns a set of [`MultiProofTargetsV2`] and the total amount of storage targets, based on the
816/// given state.
817fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
818    let mut targets = MultiProofTargetsV2::default();
819    let mut storage_target_count = 0;
820    for (addr, account) in state {
821        // if the account was not touched, or if the account was selfdestructed, do not
822        // fetch proofs for it
823        //
824        // Since selfdestruct can only happen in the same transaction, we can skip
825        // prefetching proofs for selfdestructed accounts
826        //
827        // See: https://eips.ethereum.org/EIPS/eip-6780
828        if !account.is_touched() || account.is_selfdestructed() {
829            continue
830        }
831
832        let hashed_address = keccak256(addr);
833        targets.account_targets.push(hashed_address.into());
834
835        let mut storage_slots = Vec::with_capacity(account.storage.len());
836        for (key, slot) in account.storage {
837            // do nothing if unchanged
838            if !slot.is_changed() {
839                continue
840            }
841
842            let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
843            storage_slots.push(ProofV2Target::from(hashed_slot));
844        }
845
846        storage_target_count += storage_slots.len();
847        if !storage_slots.is_empty() {
848            targets.storage_targets.insert(hashed_address, storage_slots);
849        }
850    }
851
852    (targets, storage_target_count)
853}
854
855/// Returns [`MultiProofTargetsV2`] for withdrawal addresses.
856///
857/// Withdrawals only modify account balances (no storage), so the targets contain
858/// only account-level entries with empty storage sets.
859fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
860    MultiProofTargetsV2 {
861        account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
862        ..Default::default()
863    }
864}
865
866/// The events the pre-warm task can handle.
867///
868/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
869/// execution path without cloning the expensive `BundleState`.
870#[derive(Debug)]
871pub enum PrewarmTaskEvent<R> {
872    /// Forcefully terminate all remaining transaction execution.
873    TerminateTransactionExecution,
874    /// Forcefully terminate the task on demand and update the shared cache with the given output
875    /// before exiting.
876    Terminate {
877        /// The final execution outcome. Using `Arc` allows sharing with the main execution
878        /// path without cloning the expensive `BundleState`.
879        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
880        /// Receiver for the block validation result.
881        ///
882        /// Cache saving is racing the state root validation. We optimistically construct the
883        /// updated cache but only save it once we know the block is valid.
884        valid_block_rx: mpsc::Receiver<()>,
885    },
886    /// Finished executing all transactions
887    FinishedTxExecution {
888        /// Number of transactions executed
889        executed_transactions: usize,
890    },
891}
892
893/// Metrics for transactions prewarming.
894#[derive(Metrics, Clone)]
895#[metrics(scope = "sync.prewarm")]
896pub struct PrewarmMetrics {
897    /// The number of transactions to prewarm
898    pub(crate) transactions: Gauge,
899    /// A histogram of the number of transactions to prewarm
900    pub(crate) transactions_histogram: Histogram,
901    /// A histogram of duration per transaction prewarming
902    pub(crate) total_runtime: Histogram,
903    /// A histogram of EVM execution duration per transaction prewarming
904    pub(crate) execution_duration: Histogram,
905    /// A histogram for prefetch targets per transaction prewarming
906    pub(crate) prefetch_storage_targets: Histogram,
907    /// A histogram of duration for cache saving
908    pub(crate) cache_saving_duration: Gauge,
909    /// Counter for transaction execution errors during prewarming
910    pub(crate) transaction_errors: Counter,
911    /// A histogram of BAL slot iteration duration during prefetching
912    pub(crate) bal_slot_iteration_duration: Histogram,
913}