Skip to main content

reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use crate::tree::{
15    payload_processor::multiproof::StateRootMessage,
16    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
17    CachedStateMetrics, CachedStateProvider, ExecutionEnv, PayloadExecutionCache, SavedCache,
18    StateProviderBuilder,
19};
20use alloy_consensus::transaction::TxHashRef;
21use alloy_eip7928::BlockAccessList;
22use alloy_eips::eip4895::Withdrawal;
23use alloy_primitives::{keccak256, StorageKey, B256};
24use crossbeam_channel::Sender as CrossbeamSender;
25use metrics::{Counter, Gauge, Histogram};
26use rayon::prelude::*;
27use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
28use reth_metrics::Metrics;
29use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
30use reth_provider::{
31    AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
32    StateReader,
33};
34use reth_revm::{database::StateProviderDatabase, state::EvmState};
35use reth_tasks::{pool::WorkerPool, Runtime};
36use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
37use std::sync::{
38    atomic::{AtomicBool, AtomicUsize, Ordering},
39    mpsc::{self, channel, Receiver, Sender},
40    Arc,
41};
42use tokio::sync::oneshot;
43use tracing::{debug, debug_span, instrument, trace, trace_span, warn, Span};
44
45/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
46#[derive(Debug)]
47pub enum PrewarmMode<Tx> {
48    /// Prewarm by executing transactions from a stream, each paired with its block index.
49    Transactions(Receiver<(usize, Tx)>),
50    /// Prewarm by prefetching slots from a Block Access List.
51    BlockAccessList(Arc<BlockAccessList>),
52    /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
53    /// benefit). No workers are spawned.
54    Skipped,
55}
56
57/// A task that is responsible for caching and prewarming the cache by executing transactions
58/// individually in parallel.
59///
60/// Note: This task runs until cancelled externally.
61#[derive(Debug)]
62pub struct PrewarmCacheTask<N, P, Evm>
63where
64    N: NodePrimitives,
65    Evm: ConfigureEvm<Primitives = N>,
66{
67    /// The executor used to spawn execution tasks.
68    executor: Runtime,
69    /// Shared execution cache.
70    execution_cache: PayloadExecutionCache,
71    /// Context provided to execution tasks
72    ctx: PrewarmContext<N, P, Evm>,
73    /// Sender to emit evm state outcome messages to the sparse trie task, if any.
74    to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
75    /// Receiver for events produced by tx execution
76    actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
77    /// Parent span for tracing
78    parent_span: Span,
79}
80
81impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
82where
83    N: NodePrimitives,
84    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
85    Evm: ConfigureEvm<Primitives = N> + 'static,
86{
87    /// Initializes the task with the given transactions pending execution
88    pub fn new(
89        executor: Runtime,
90        execution_cache: PayloadExecutionCache,
91        ctx: PrewarmContext<N, P, Evm>,
92        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
93    ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
94        let (actions_tx, actions_rx) = channel();
95
96        trace!(
97            target: "engine::tree::payload_processor::prewarm",
98            prewarming_threads = executor.prewarming_pool().current_num_threads(),
99            transaction_count = ctx.env.transaction_count,
100            "Initialized prewarm task"
101        );
102
103        (
104            Self {
105                executor,
106                execution_cache,
107                ctx,
108                to_sparse_trie_task,
109                actions_rx,
110                parent_span: Span::current(),
111            },
112            actions_tx,
113        )
114    }
115
116    /// Streams pending transactions and executes them in parallel on the prewarming pool.
117    ///
118    /// Kicks off EVM init on every pool thread, then uses `in_place_scope` to dispatch
119    /// transactions as they arrive and wait for all spawned tasks to complete before
120    /// clearing per-thread state. Workers that start via work-stealing lazily initialise
121    /// their EVM state on first access via [`get_or_init`](reth_tasks::pool::Worker::get_or_init).
122    fn spawn_txs_prewarm<Tx>(
123        &self,
124        pending: mpsc::Receiver<(usize, Tx)>,
125        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
126        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
127    ) where
128        Tx: ExecutableTxFor<Evm> + Send + 'static,
129    {
130        let executor = self.executor.clone();
131        let ctx = self.ctx.clone();
132        let span = Span::current();
133
134        self.executor.spawn_blocking_named("prewarm-txs", move || {
135            let _enter = debug_span!(
136                target: "engine::tree::payload_processor::prewarm",
137                parent: &span,
138                "prewarm_txs"
139            )
140            .entered();
141
142            let ctx = &ctx;
143            let pool = executor.prewarming_pool();
144
145            let mut tx_count = 0usize;
146            let to_sparse_trie_task = to_sparse_trie_task.as_ref();
147            pool.in_place_scope(|s| {
148                s.spawn(|_| {
149                    pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
150                });
151
152                while let Ok((index, tx)) = pending.recv() {
153                    if ctx.should_stop() {
154                        trace!(
155                            target: "engine::tree::payload_processor::prewarm",
156                            "Termination requested, stopping transaction distribution"
157                        );
158                        break;
159                    }
160
161                    // skip transactions already executed by the main loop
162                    if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
163                        continue;
164                    }
165
166                    tx_count += 1;
167                    let parent_span = Span::current();
168                    s.spawn(move |_| {
169                        let _enter = trace_span!(
170                            target: "engine::tree::payload_processor::prewarm",
171                            parent: parent_span,
172                            "prewarm_tx",
173                            i = index,
174                        )
175                        .entered();
176                        Self::transact_worker(ctx, index, tx, to_sparse_trie_task);
177                    });
178                }
179
180                // Send withdrawal prefetch targets after all transactions dispatched
181                if let Some(to_sparse_trie_task) = to_sparse_trie_task &&
182                    let Some(withdrawals) = &ctx.env.withdrawals &&
183                    !withdrawals.is_empty()
184                {
185                    let targets = multiproof_targets_from_withdrawals(withdrawals);
186                    let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
187                }
188            });
189
190            // All tasks are done — clear per-thread EVM state for the next block.
191            pool.clear();
192
193            let _ = actions_tx
194                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
195        });
196    }
197
198    /// Executes a single prewarm transaction on the current pool thread's EVM.
199    ///
200    /// Lazily initialises per-thread [`PrewarmEvmState`] via
201    /// [`get_or_init`](reth_tasks::pool::Worker::get_or_init) on first access.
202    fn transact_worker<Tx>(
203        ctx: &PrewarmContext<N, P, Evm>,
204        index: usize,
205        tx: Tx,
206        to_sparse_trie_task: Option<&CrossbeamSender<StateRootMessage>>,
207    ) where
208        Tx: ExecutableTxFor<Evm>,
209    {
210        WorkerPool::with_worker_mut(|worker| {
211            let Some(evm) =
212                worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
213            else {
214                return;
215            };
216
217            if ctx.should_stop() {
218                return;
219            }
220
221            // skip if main execution has already processed this transaction
222            if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
223                return;
224            }
225
226            let start = Instant::now();
227
228            let (tx_env, tx) = tx.into_parts();
229            let res = match evm.transact(tx_env) {
230                Ok(res) => res,
231                Err(err) => {
232                    trace!(
233                        target: "engine::tree::payload_processor::prewarm",
234                        %err,
235                        tx_hash=%tx.tx().tx_hash(),
236                        sender=%tx.signer(),
237                        "Error when executing prewarm transaction",
238                    );
239                    ctx.metrics.transaction_errors.increment(1);
240                    return;
241                }
242            };
243            ctx.metrics.execution_duration.record(start.elapsed());
244
245            if ctx.should_stop() {
246                return;
247            }
248
249            if index > 0 {
250                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
251                ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
252                if let Some(to_sparse_trie_task) = to_sparse_trie_task {
253                    let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
254                }
255            }
256
257            ctx.metrics.total_runtime.record(start.elapsed());
258        });
259    }
260
261    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
262    /// It should only be called after ensuring that:
263    /// 1. All prewarming tasks have completed execution
264    /// 2. No other concurrent operations are accessing the cache
265    ///
266    /// Saves the warmed caches back into the shared slot after prewarming completes.
267    ///
268    /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
269    /// the new, warmed cache to be inserted.
270    ///
271    /// This method is called from `run()` only after all execution tasks are complete.
272    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
273    fn save_cache(
274        self,
275        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
276        valid_block_rx: mpsc::Receiver<()>,
277    ) {
278        let start = Instant::now();
279
280        let Self {
281            execution_cache,
282            ctx: PrewarmContext { env, metrics, cache_metrics, saved_cache, .. },
283            ..
284        } = self;
285        let hash = env.hash;
286
287        if let Some(saved_cache) = saved_cache {
288            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
289            execution_cache.update_with_guard(|cached| {
290                // consumes the `SavedCache` held by the prewarming task, which releases its usage
291                // guard
292                let caches = saved_cache.cache().clone();
293                let new_cache = SavedCache::new(hash, caches);
294
295                // Insert state into cache while holding the lock
296                // Access the BundleState through the shared ExecutionOutcome
297                if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
298                    // Clear the cache on error to prevent having a polluted cache
299                    *cached = None;
300                    debug!(target: "engine::caching", "cleared execution cache on update error");
301                    return;
302                }
303
304                new_cache.update_metrics(cache_metrics.as_ref());
305
306                if valid_block_rx.recv().is_ok() {
307                    // Replace the shared cache with the new one; the previous cache (if any) is
308                    // dropped.
309                    *cached = Some(new_cache);
310                } else {
311                    // Block was invalid; caches were already mutated by insert_state above,
312                    // so we must clear to prevent using polluted state
313                    *cached = None;
314                    debug!(target: "engine::caching", "cleared execution cache on invalid block");
315                }
316            });
317
318            let elapsed = start.elapsed();
319            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
320
321            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
322        }
323    }
324
325    /// Runs BAL-based prewarming and sparse-trie work inline.
326    ///
327    /// Spawns two halves concurrently on separate pools, then waits for both to complete:
328    /// 1. Storage prefetch on the prewarming pool to populate the execution cache.
329    /// 2. Hashed state streaming on the BAL streaming pool so storage updates can reach the sparse
330    ///    trie before account reads finish.
331    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
332    fn run_bal_prewarm(
333        &self,
334        bal: Arc<BlockAccessList>,
335        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
336    ) {
337        if bal.is_empty() {
338            if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
339                let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
340            }
341            let _ =
342                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
343            return;
344        }
345
346        trace!(
347            target: "engine::tree::payload_processor::prewarm",
348            accounts = bal.len(),
349            "Starting BAL prewarm"
350        );
351
352        let ctx = self.ctx.clone();
353        let to_sparse_trie_task = self.to_sparse_trie_task.clone();
354        let executor = self.executor.clone();
355        let parent_span = Span::current();
356        let prefetch_parent_span = parent_span.clone();
357        let stream_parent_span = parent_span;
358        let prefetch_bal = Arc::clone(&bal);
359        let stream_bal = Arc::clone(&bal);
360        let (prefetch_tx, prefetch_rx) = oneshot::channel();
361        let (stream_tx, stream_rx) = oneshot::channel();
362
363        if ctx.saved_cache.is_some() {
364            let prefetch_ctx = ctx.clone();
365            executor.prewarming_pool().spawn(move || {
366                let branch_span = debug_span!(
367                    target: "engine::tree::payload_processor::prewarm",
368                    parent: &prefetch_parent_span,
369                    "bal_prefetch_storage",
370                    bal_accounts = prefetch_bal.len(),
371                );
372                let provider_parent_span = branch_span.clone();
373                let _span = branch_span.entered();
374
375                prefetch_bal.par_iter().for_each_init(
376                    || {
377                        (
378                            prefetch_ctx.clone(),
379                            None::<CachedStateProvider<reth_provider::StateProviderBox, true>>,
380                            provider_parent_span.clone(),
381                        )
382                    },
383                    |(ctx, provider, parent_span), account| {
384                        if ctx.should_stop() {
385                            return;
386                        }
387                        ctx.prefetch_bal_storage(parent_span, provider, account);
388                    },
389                );
390
391                let _ = prefetch_tx.send(());
392            });
393        } else {
394            let _ = prefetch_tx.send(());
395        }
396
397        if let Some(to_sparse_trie_task) = to_sparse_trie_task {
398            executor.bal_streaming_pool().spawn(move || {
399                let branch_span = debug_span!(
400                    target: "engine::tree::payload_processor::prewarm",
401                    parent: &stream_parent_span,
402                    "bal_hashed_state_stream",
403                    bal_accounts = stream_bal.len(),
404                );
405                let provider_parent_span = branch_span.clone();
406                let _span = branch_span.entered();
407
408                stream_bal.par_iter().for_each_init(
409                    || (ctx.clone(), None::<Box<dyn AccountReader>>, provider_parent_span.clone()),
410                    |(ctx, provider, parent_span), account_changes| {
411                        ctx.send_bal_hashed_state(
412                            parent_span,
413                            provider,
414                            account_changes,
415                            &to_sparse_trie_task,
416                        );
417                    },
418                );
419
420                let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
421                let _ = stream_tx.send(());
422            });
423        } else {
424            let _ = stream_tx.send(());
425        }
426
427        prefetch_rx
428            .blocking_recv()
429            .expect("BAL prefetch task dropped without signaling completion");
430        stream_rx
431            .blocking_recv()
432            .expect("BAL hashed-state streaming task dropped without signaling completion");
433
434        let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
435    }
436
437    /// Executes the task.
438    ///
439    /// This will execute the transactions until all transactions have been processed or the task
440    /// was cancelled.
441    #[instrument(
442        parent = &self.parent_span,
443        level = "debug",
444        target = "engine::tree::payload_processor::prewarm",
445        name = "prewarm and caching",
446        skip_all
447    )]
448    pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
449    where
450        Tx: ExecutableTxFor<Evm> + Send + 'static,
451    {
452        // Spawn execution tasks based on mode
453        match mode {
454            PrewarmMode::Transactions(pending) => {
455                self.spawn_txs_prewarm(pending, actions_tx, self.to_sparse_trie_task.clone());
456            }
457            PrewarmMode::BlockAccessList(bal) => {
458                self.run_bal_prewarm(bal, actions_tx);
459            }
460            PrewarmMode::Skipped => {
461                let _ = actions_tx
462                    .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
463            }
464        }
465
466        let mut final_execution_outcome = None;
467        let mut finished_execution = false;
468        while let Ok(event) = self.actions_rx.recv() {
469            match event {
470                PrewarmTaskEvent::TerminateTransactionExecution => {
471                    // stop tx processing
472                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
473                    self.ctx.stop();
474                }
475                PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
476                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
477                    final_execution_outcome =
478                        Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
479
480                    if finished_execution {
481                        // all tasks are done, we can exit, which will save caches and exit
482                        break
483                    }
484                }
485                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
486                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
487                    self.ctx.metrics.transactions.set(executed_transactions as f64);
488                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
489
490                    finished_execution = true;
491
492                    if final_execution_outcome.is_some() {
493                        // all tasks are done, we can exit, which will save caches and exit
494                        break
495                    }
496                }
497            }
498        }
499
500        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
501
502        // save caches and finish using the shared ExecutionOutcome
503        if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
504            self.save_cache(execution_outcome, valid_block_rx);
505        }
506    }
507}
508
509/// Context required by tx execution tasks.
510#[derive(Debug, Clone)]
511pub struct PrewarmContext<N, P, Evm>
512where
513    N: NodePrimitives,
514    Evm: ConfigureEvm<Primitives = N>,
515{
516    /// The execution environment.
517    pub env: ExecutionEnv<Evm>,
518    /// The EVM configuration.
519    pub evm_config: Evm,
520    /// The saved cache.
521    pub saved_cache: Option<SavedCache>,
522    /// Provider to obtain the state
523    pub provider: StateProviderBuilder<N, P>,
524    /// The metrics for the prewarm task.
525    pub metrics: PrewarmMetrics,
526    /// Metrics for the execution cache.
527    /// Metrics for the execution cache. `None` disables metrics recording.
528    pub cache_metrics: Option<CachedStateMetrics>,
529    /// An atomic bool that tells prewarm tasks to not start any more execution.
530    pub terminate_execution: Arc<AtomicBool>,
531    /// Shared counter tracking the next transaction index to be executed by the main execution
532    /// loop. Prewarm workers skip transactions with `index < counter` since those have already
533    /// been executed.
534    pub executed_tx_index: Arc<AtomicUsize>,
535    /// Whether the precompile cache is disabled.
536    pub precompile_cache_disabled: bool,
537    /// The precompile cache map.
538    pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
539}
540
541/// Per-thread EVM state initialised by [`PrewarmContext::evm_for_ctx`] and stored in
542/// [`WorkerPool`] workers via [`Worker::get_or_init`](reth_tasks::pool::Worker::get_or_init).
543type PrewarmEvmState<Evm> =
544    Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
545
546impl<N, P, Evm> PrewarmContext<N, P, Evm>
547where
548    N: NodePrimitives,
549    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
550    Evm: ConfigureEvm<Primitives = N> + 'static,
551{
552    /// Creates a per-thread EVM for prewarming.
553    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
554    fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
555        let mut state_provider = match self.provider.build() {
556            Ok(provider) => provider,
557            Err(err) => {
558                trace!(
559                    target: "engine::tree::payload_processor::prewarm",
560                    %err,
561                    "Failed to build state provider in prewarm thread"
562                );
563                return None
564            }
565        };
566
567        // Use the caches to create a new provider with caching
568        if let Some(saved_cache) = &self.saved_cache {
569            let caches = saved_cache.cache().clone();
570            state_provider = Box::new(CachedStateProvider::new_prewarm(
571                state_provider,
572                caches,
573                self.cache_metrics.clone().unwrap_or_default(),
574            ));
575        }
576
577        let state_provider = StateProviderDatabase::new(state_provider);
578
579        let mut evm_env = self.env.evm_env.clone();
580
581        // we must disable the nonce check so that we can execute the transaction even if the nonce
582        // doesn't match what's on chain.
583        evm_env.cfg_env.disable_nonce_check = true;
584
585        // disable the balance check so that transactions from senders who were funded by earlier
586        // transactions in the block can still be prewarmed
587        evm_env.cfg_env.disable_balance_check = true;
588
589        // create a new executor and disable nonce checks in the env
590        let spec_id = *evm_env.spec_id();
591        let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
592
593        if !self.precompile_cache_disabled {
594            // Only cache pure precompiles to avoid issues with stateful precompiles
595            evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
596                CachedPrecompile::wrap(
597                    precompile,
598                    self.precompile_cache_map.cache_for_address(*address),
599                    spec_id,
600                    None, // No metrics for prewarm
601                )
602            });
603        }
604
605        Some(evm)
606    }
607
608    /// Returns `true` if prewarming should stop.
609    #[inline]
610    pub fn should_stop(&self) -> bool {
611        self.terminate_execution.load(Ordering::Relaxed)
612    }
613
614    /// Signals all prewarm tasks to stop execution.
615    #[inline]
616    pub fn stop(&self) {
617        self.terminate_execution.store(true, Ordering::Relaxed);
618    }
619
620    /// Hashes and streams a single BAL account's state to the sparse trie task.
621    ///
622    /// For each account, storage slots are hashed and sent immediately, then the account is read
623    /// from the database and sent as a separate update.
624    ///
625    /// The `provider` is lazily initialized on first call and reused across accounts on the same
626    /// thread.
627    fn send_bal_hashed_state(
628        &self,
629        parent_span: &Span,
630        provider: &mut Option<Box<dyn AccountReader>>,
631        account_changes: &alloy_eip7928::AccountChanges,
632        to_sparse_trie_task: &CrossbeamSender<StateRootMessage>,
633    ) {
634        let address = account_changes.address;
635        let mut hashed_address = None;
636
637        if !account_changes.storage_changes.is_empty() {
638            let hashed_address = *hashed_address.get_or_insert_with(|| keccak256(address));
639            let mut storage_map = reth_trie::HashedStorage::new(false);
640
641            for slot_changes in &account_changes.storage_changes {
642                let hashed_slot = keccak256(slot_changes.slot.to_be_bytes::<32>());
643                if let Some(last_change) = slot_changes.changes.last() {
644                    storage_map.storage.insert(hashed_slot, last_change.new_value);
645                }
646            }
647
648            let mut hashed_state = reth_trie::HashedPostState::default();
649            hashed_state.storages.insert(hashed_address, storage_map);
650            let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
651        }
652
653        if provider.is_none() {
654            let _span = debug_span!(
655                target: "engine::tree::payload_processor::prewarm",
656                parent: parent_span,
657                "bal_hashed_state_provider_init",
658                has_saved_cache = self.saved_cache.is_some(),
659            )
660            .entered();
661
662            let inner = match self.provider.build() {
663                Ok(p) => p,
664                Err(err) => {
665                    warn!(
666                        target: "engine::tree::payload_processor::prewarm",
667                        ?err,
668                        "Failed to build provider for BAL account reads"
669                    );
670                    return;
671                }
672            };
673            let boxed: Box<dyn AccountReader> = if let Some(saved) = &self.saved_cache {
674                let caches = saved.cache().clone();
675                Box::new(CachedStateProvider::new_prewarm(
676                    inner,
677                    caches,
678                    self.cache_metrics.clone().unwrap_or_default(),
679                ))
680            } else {
681                Box::new(inner)
682            };
683            *provider = Some(boxed);
684        }
685        let account_reader = provider.as_ref().expect("provider just initialized");
686
687        let existing_account = account_reader.basic_account(&address).ok().flatten();
688
689        let balance = account_changes.balance_changes.last().map(|change| change.post_balance);
690        let nonce = account_changes.nonce_changes.last().map(|change| change.new_nonce);
691        let code_hash = account_changes.code_changes.last().map(|code_change| {
692            if code_change.new_code.is_empty() {
693                alloy_consensus::constants::KECCAK_EMPTY
694            } else {
695                keccak256(&code_change.new_code)
696            }
697        });
698
699        if balance.is_none() &&
700            nonce.is_none() &&
701            code_hash.is_none() &&
702            account_changes.storage_changes.is_empty()
703        {
704            return;
705        }
706
707        let account = reth_primitives_traits::Account {
708            balance: balance.unwrap_or_else(|| {
709                existing_account
710                    .as_ref()
711                    .map(|account| account.balance)
712                    .unwrap_or(alloy_primitives::U256::ZERO)
713            }),
714            nonce: nonce.unwrap_or_else(|| {
715                existing_account.as_ref().map(|account| account.nonce).unwrap_or(0)
716            }),
717            bytecode_hash: code_hash.or_else(|| {
718                existing_account
719                    .as_ref()
720                    .and_then(|account| account.bytecode_hash)
721                    .or(Some(alloy_consensus::constants::KECCAK_EMPTY))
722            }),
723        };
724
725        let hashed_address = hashed_address.unwrap_or_else(|| keccak256(address));
726        let mut hashed_state = reth_trie::HashedPostState::default();
727        hashed_state.accounts.insert(hashed_address, Some(account));
728
729        let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
730    }
731
732    /// Prefetches storage slots for a single BAL account into the cache.
733    ///
734    /// Account reads are handled separately by [`Self::send_bal_hashed_state`], so this method
735    /// only
736    /// warms storage.
737    ///
738    /// The `provider` is lazily initialized on first call and reused across accounts on the same
739    /// thread.
740    fn prefetch_bal_storage(
741        &self,
742        parent_span: &Span,
743        provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox, true>>,
744        account: &alloy_eip7928::AccountChanges,
745    ) {
746        if account.storage_changes.is_empty() && account.storage_reads.is_empty() {
747            return;
748        }
749
750        let state_provider = match provider {
751            Some(p) => p,
752            slot @ None => {
753                let _span = debug_span!(
754                    target: "engine::tree::payload_processor::prewarm",
755                    parent: parent_span,
756                    "bal_prefetch_provider_init",
757                )
758                .entered();
759
760                let built = match self.provider.build() {
761                    Ok(p) => p,
762                    Err(err) => {
763                        trace!(
764                            target: "engine::tree::payload_processor::prewarm",
765                            %err,
766                            "Failed to build state provider in BAL prewarm thread"
767                        );
768                        return;
769                    }
770                };
771                let saved_cache =
772                    self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
773                let caches = saved_cache.cache().clone();
774                slot.insert(CachedStateProvider::new_prewarm(
775                    built,
776                    caches,
777                    self.cache_metrics.clone().unwrap_or_default(),
778                ))
779            }
780        };
781
782        let start = Instant::now();
783
784        for slot in &account.storage_changes {
785            let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
786        }
787        for &slot in &account.storage_reads {
788            let _ = state_provider.storage(account.address, StorageKey::from(slot));
789        }
790
791        self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
792    }
793}
794
795/// Returns a set of [`MultiProofTargetsV2`] and the total amount of storage targets, based on the
796/// given state.
797fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
798    let mut targets = MultiProofTargetsV2::default();
799    let mut storage_target_count = 0;
800    for (addr, account) in state {
801        // if the account was not touched, or if the account was selfdestructed, do not
802        // fetch proofs for it
803        //
804        // Since selfdestruct can only happen in the same transaction, we can skip
805        // prefetching proofs for selfdestructed accounts
806        //
807        // See: https://eips.ethereum.org/EIPS/eip-6780
808        if !account.is_touched() || account.is_selfdestructed() {
809            continue
810        }
811
812        let hashed_address = keccak256(addr);
813        targets.account_targets.push(hashed_address.into());
814
815        let mut storage_slots = Vec::with_capacity(account.storage.len());
816        for (key, slot) in account.storage {
817            // do nothing if unchanged
818            if !slot.is_changed() {
819                continue
820            }
821
822            let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
823            storage_slots.push(ProofV2Target::from(hashed_slot));
824        }
825
826        storage_target_count += storage_slots.len();
827        if !storage_slots.is_empty() {
828            targets.storage_targets.insert(hashed_address, storage_slots);
829        }
830    }
831
832    (targets, storage_target_count)
833}
834
835/// Returns [`MultiProofTargetsV2`] for withdrawal addresses.
836///
837/// Withdrawals only modify account balances (no storage), so the targets contain
838/// only account-level entries with empty storage sets.
839fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
840    MultiProofTargetsV2 {
841        account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
842        ..Default::default()
843    }
844}
845
846/// The events the pre-warm task can handle.
847///
848/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
849/// execution path without cloning the expensive `BundleState`.
850#[derive(Debug)]
851pub enum PrewarmTaskEvent<R> {
852    /// Forcefully terminate all remaining transaction execution.
853    TerminateTransactionExecution,
854    /// Forcefully terminate the task on demand and update the shared cache with the given output
855    /// before exiting.
856    Terminate {
857        /// The final execution outcome. Using `Arc` allows sharing with the main execution
858        /// path without cloning the expensive `BundleState`.
859        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
860        /// Receiver for the block validation result.
861        ///
862        /// Cache saving is racing the state root validation. We optimistically construct the
863        /// updated cache but only save it once we know the block is valid.
864        valid_block_rx: mpsc::Receiver<()>,
865    },
866    /// Finished executing all transactions
867    FinishedTxExecution {
868        /// Number of transactions executed
869        executed_transactions: usize,
870    },
871}
872
873/// Metrics for transactions prewarming.
874#[derive(Metrics, Clone)]
875#[metrics(scope = "sync.prewarm")]
876pub struct PrewarmMetrics {
877    /// The number of transactions to prewarm
878    pub(crate) transactions: Gauge,
879    /// A histogram of the number of transactions to prewarm
880    pub(crate) transactions_histogram: Histogram,
881    /// A histogram of duration per transaction prewarming
882    pub(crate) total_runtime: Histogram,
883    /// A histogram of EVM execution duration per transaction prewarming
884    pub(crate) execution_duration: Histogram,
885    /// A histogram for prefetch targets per transaction prewarming
886    pub(crate) prefetch_storage_targets: Histogram,
887    /// A histogram of duration for cache saving
888    pub(crate) cache_saving_duration: Gauge,
889    /// Counter for transaction execution errors during prewarming
890    pub(crate) transaction_errors: Counter,
891    /// A histogram of BAL slot iteration duration during prefetching
892    pub(crate) bal_slot_iteration_duration: Histogram,
893}