Skip to main content

reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2//!
3//! Prewarming executes transactions in parallel before the actual block execution
4//! to populate the execution cache with state that will likely be accessed during
5//! block processing.
6//!
7//! ## How Prewarming Works
8//!
9//! 1. Incoming transactions are split into two streams: one for prewarming (executed in parallel)
10//!    and one for actual execution (executed sequentially)
11//! 2. Prewarming tasks execute transactions in parallel using shared caches
12//! 3. When actual block execution happens, it benefits from the warmed cache
13
14use super::bal_prewarm_pool::BalPrewarmPool;
15use crate::tree::{
16    payload_processor::multiproof::StateRootMessage,
17    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
18    CachedStateCacheMetrics, CachedStateMetrics, CachedStateProvider, ExecutionEnv,
19    PayloadExecutionCache, SavedCache, StateProviderBuilder,
20};
21use alloy_consensus::transaction::TxHashRef;
22use alloy_eip7928::bal::DecodedBal;
23use alloy_eips::eip4895::Withdrawal;
24use alloy_primitives::{keccak256, B256, U256};
25use crossbeam_channel::Sender as CrossbeamSender;
26use metrics::{Counter, Gauge, Histogram};
27use rayon::prelude::*;
28use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
29use reth_metrics::Metrics;
30use reth_primitives_traits::{Account, FastInstant as Instant, NodePrimitives};
31use reth_provider::{
32    AccountReader, BlockExecutionOutput, BlockReader, StateProviderFactory, StateReader,
33};
34use reth_revm::database::StateProviderDatabase;
35use reth_tasks::{pool::WorkerPool, Runtime};
36use reth_trie_common::MultiProofTargetsV2;
37use std::sync::{
38    atomic::{AtomicBool, AtomicUsize, Ordering},
39    mpsc::{self, channel, Receiver, Sender},
40    Arc,
41};
42use tokio::sync::oneshot;
43use tracing::{debug, debug_span, instrument, trace, trace_span, warn, Span};
44
45/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
46#[derive(Debug)]
47pub enum PrewarmMode<Tx> {
48    /// Prewarm by executing transactions from a stream, each paired with its block index.
49    Transactions(Receiver<(usize, Tx)>),
50    /// Prewarm by prefetching slots from a Block Access List.
51    BlockAccessList(Arc<DecodedBal>),
52    /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
53    /// benefit). No workers are spawned.
54    Skipped,
55}
56
57/// A task that is responsible for caching and prewarming the cache by executing transactions
58/// individually in parallel.
59///
60/// Note: This task runs until cancelled externally.
61#[derive(Debug)]
62pub struct PrewarmCacheTask<N, P, Evm>
63where
64    N: NodePrimitives,
65    Evm: ConfigureEvm<Primitives = N>,
66{
67    /// The executor used to spawn execution tasks.
68    executor: Runtime,
69    /// Shared execution cache.
70    execution_cache: PayloadExecutionCache,
71    /// Context provided to execution tasks
72    ctx: PrewarmContext<N, P, Evm>,
73    /// Sender to emit evm state outcome messages to the sparse trie task, if any.
74    to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
75    /// Receiver for events produced by tx execution
76    actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
77    /// Parent span for tracing
78    parent_span: Span,
79}
80
81impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
82where
83    N: NodePrimitives,
84    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
85    Evm: ConfigureEvm<Primitives = N> + 'static,
86{
87    /// Initializes the task with the given transactions pending execution
88    pub fn new(
89        executor: Runtime,
90        execution_cache: PayloadExecutionCache,
91        ctx: PrewarmContext<N, P, Evm>,
92        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
93    ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
94        let (actions_tx, actions_rx) = channel();
95
96        trace!(
97            target: "engine::tree::payload_processor::prewarm",
98            prewarming_threads = executor.prewarming_pool().current_num_threads(),
99            transaction_count = ctx.env.transaction_count,
100            "Initialized prewarm task"
101        );
102
103        (
104            Self {
105                executor,
106                execution_cache,
107                ctx,
108                to_sparse_trie_task,
109                actions_rx,
110                parent_span: Span::current(),
111            },
112            actions_tx,
113        )
114    }
115
116    /// Streams pending transactions and executes them in parallel on the prewarming pool.
117    ///
118    /// Kicks off EVM init on every pool thread, then uses `in_place_scope` to dispatch
119    /// transactions as they arrive and wait for all spawned tasks to complete before
120    /// clearing per-thread state. Workers that start via work-stealing lazily initialise
121    /// their EVM state on first access via [`get_or_init`](reth_tasks::pool::Worker::get_or_init).
122    fn spawn_txs_prewarm<Tx>(
123        &self,
124        pending: mpsc::Receiver<(usize, Tx)>,
125        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
126        to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
127    ) where
128        Tx: ExecutableTxFor<Evm> + Send + 'static,
129    {
130        let executor = self.executor.clone();
131        let ctx = self.ctx.clone();
132        let span = Span::current();
133
134        self.executor.spawn_blocking_named("prewarm-txs", move || {
135            let _enter = debug_span!(
136                target: "engine::tree::payload_processor::prewarm",
137                parent: &span,
138                "prewarm_txs"
139            )
140            .entered();
141
142            let ctx = &ctx;
143            let pool = executor.prewarming_pool();
144
145            let mut tx_count = 0usize;
146            let to_sparse_trie_task = to_sparse_trie_task.as_ref();
147            pool.in_place_scope(|s| {
148                s.spawn(|_| {
149                    pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
150                });
151
152                while let Ok((index, tx)) = pending.recv() {
153                    if ctx.should_stop() {
154                        trace!(
155                            target: "engine::tree::payload_processor::prewarm",
156                            "Termination requested, stopping transaction distribution"
157                        );
158                        break;
159                    }
160
161                    // skip transactions already executed by the main loop
162                    if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
163                        continue;
164                    }
165
166                    tx_count += 1;
167                    let parent_span = Span::current();
168                    s.spawn(move |_| {
169                        let _enter = trace_span!(
170                            target: "engine::tree::payload_processor::prewarm",
171                            parent: parent_span,
172                            "prewarm_tx",
173                            i = index,
174                        )
175                        .entered();
176                        Self::transact_worker(ctx, index, tx, to_sparse_trie_task);
177                    });
178                }
179
180                // Send withdrawal prefetch targets after all transactions dispatched
181                if let Some(to_sparse_trie_task) = to_sparse_trie_task &&
182                    let Some(withdrawals) = &ctx.env.withdrawals &&
183                    !withdrawals.is_empty()
184                {
185                    let targets = multiproof_targets_from_withdrawals(withdrawals);
186                    let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
187                }
188            });
189
190            // All tasks are done — clear per-thread EVM state for the next block.
191            pool.clear();
192
193            let _ = actions_tx
194                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
195        });
196    }
197
198    /// Executes a single prewarm transaction on the current pool thread's EVM.
199    ///
200    /// Lazily initialises per-thread [`PrewarmEvmState`] via
201    /// [`get_or_init`](reth_tasks::pool::Worker::get_or_init) on first access.
202    fn transact_worker<Tx>(
203        ctx: &PrewarmContext<N, P, Evm>,
204        index: usize,
205        tx: Tx,
206        to_sparse_trie_task: Option<&CrossbeamSender<StateRootMessage>>,
207    ) where
208        Tx: ExecutableTxFor<Evm>,
209    {
210        WorkerPool::with_worker_mut(|worker| {
211            let Some(evm) =
212                worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
213            else {
214                return;
215            };
216
217            if ctx.should_stop() {
218                return;
219            }
220
221            // skip if main execution has already processed this transaction
222            if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
223                return;
224            }
225
226            let start = Instant::now();
227
228            let (tx_env, tx) = tx.into_parts();
229            let res = match evm.transact(tx_env) {
230                Ok(res) => res,
231                Err(err) => {
232                    trace!(
233                        target: "engine::tree::payload_processor::prewarm",
234                        %err,
235                        tx_hash=%tx.tx().tx_hash(),
236                        sender=%tx.signer(),
237                        "Error when executing prewarm transaction",
238                    );
239                    ctx.metrics.transaction_errors.increment(1);
240                    return;
241                }
242            };
243            ctx.metrics.execution_duration.record(start.elapsed());
244
245            if ctx.should_stop() {
246                return;
247            }
248
249            if index > 0 {
250                let (targets, storage_targets) = MultiProofTargetsV2::from_state(res.state);
251                ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
252                if let Some(to_sparse_trie_task) = to_sparse_trie_task {
253                    let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
254                }
255            }
256
257            ctx.metrics.total_runtime.record(start.elapsed());
258        });
259    }
260
261    /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
262    /// It should only be called after ensuring that:
263    /// 1. All prewarming tasks have completed execution
264    /// 2. No other concurrent operations are accessing the cache
265    ///
266    /// Saves the warmed caches back into the shared slot after prewarming completes.
267    ///
268    /// This consumes the `SavedCache` held by the task, which releases its cache handle and allows
269    /// the new, warmed cache to be inserted.
270    ///
271    /// This method is called from `run()` only after all execution tasks are complete.
272    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
273    fn save_cache(
274        self,
275        execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
276        valid_block_rx: mpsc::Receiver<()>,
277    ) {
278        let start = Instant::now();
279
280        let Self {
281            execution_cache,
282            ctx: PrewarmContext { env, metrics, cache_state_metrics, saved_cache, .. },
283            ..
284        } = self;
285        let hash = env.hash;
286
287        if let Some(saved_cache) = saved_cache {
288            debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
289            execution_cache.update_with_guard(|cached| {
290                // consumes the `SavedCache` held by the prewarming task, which releases its cache
291                // handle
292                let caches = saved_cache.cache().clone();
293                let new_cache = SavedCache::new(hash, caches);
294
295                // Insert state into cache while holding the lock
296                // Access the BundleState through the shared ExecutionOutcome
297                if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
298                    // Clear the cache on error to prevent having a polluted cache
299                    *cached = None;
300                    debug!(target: "engine::caching", "cleared execution cache on update error");
301                    return;
302                }
303
304                new_cache.update_metrics(cache_state_metrics.as_ref());
305
306                if valid_block_rx.recv().is_ok() {
307                    // Replace the shared cache with the new one; the previous cache (if any) is
308                    // dropped.
309                    *cached = Some(new_cache);
310                } else {
311                    // Block was invalid; caches were already mutated by insert_state above,
312                    // so we must clear to prevent using polluted state
313                    *cached = None;
314                    debug!(target: "engine::caching", "cleared execution cache on invalid block");
315                }
316            });
317
318            let elapsed = start.elapsed();
319            debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
320
321            metrics.cache_saving_duration.set(elapsed.as_secs_f64());
322        }
323    }
324
325    /// Runs BAL-based prewarming and sparse-trie work inline.
326    ///
327    /// Spawns two halves concurrently on separate pools, then waits for both to complete:
328    /// 1. Hashed state streaming on the BAL streaming pool so storage updates can reach the sparse
329    ///    trie before account reads finish.
330    /// 2. Storage prefetch on the prewarming pool to populate the execution cache, unless BAL batch
331    ///    I/O is disabled.
332    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
333    fn run_bal_prewarm(
334        &self,
335        decoded_bal: Arc<DecodedBal>,
336        actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
337    ) {
338        let bal = decoded_bal.as_bal();
339        if bal.is_empty() {
340            if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
341                let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
342            }
343            let _ =
344                actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
345            return;
346        }
347
348        trace!(
349            target: "engine::tree::payload_processor::prewarm",
350            accounts = bal.len(),
351            "Starting BAL prewarm"
352        );
353
354        let ctx = self.ctx.clone();
355        let to_sparse_trie_task = self.to_sparse_trie_task.clone();
356        let executor = self.executor.clone();
357        let parent_span = Span::current();
358        let stream_parent_span = parent_span;
359        let prefetch_bal = Arc::clone(&decoded_bal);
360        let stream_bal = Arc::clone(&decoded_bal);
361        let (stream_tx, stream_rx) = oneshot::channel();
362
363        if let Some(to_sparse_trie_task) = to_sparse_trie_task {
364            let ctx = ctx.clone();
365            executor.bal_streaming_pool().spawn(move || {
366                let branch_span = debug_span!(
367                    target: "engine::tree::payload_processor::prewarm",
368                    parent: &stream_parent_span,
369                    "bal_hashed_state_stream",
370                    bal_accounts = stream_bal.as_bal().len(),
371                );
372                let parent_span = branch_span.clone();
373                let _span = branch_span.entered();
374
375                stream_bal.as_bal().par_iter().for_each(|account_changes| {
376                    WorkerPool::with_worker_mut(|worker| {
377                        let provider =
378                            worker.get_or_init::<Option<Box<dyn AccountReader>>>(|| None);
379                        ctx.send_bal_hashed_state(
380                            &parent_span,
381                            provider,
382                            account_changes,
383                            &to_sparse_trie_task,
384                        );
385                    });
386                });
387
388                let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
389                let _ = stream_tx.send(());
390            });
391        } else {
392            let _ = stream_tx.send(());
393        }
394
395        if let Some(saved_cache) = ctx.saved_cache &&
396            !ctx.disable_bal_batch_io &&
397            let Some(pool) = ctx.bal_prewarm_pool.as_ref()
398        {
399            // If
400            //
401            // - BAL path is enabled (and so bal_prewarm_pool is present),
402            // - dispatch_bal_batch_io is false
403            // - execution cache is not disabled
404            //
405            // we launch prewarming sequence of the BAL read set here. The BAL read-set consists
406            // of the accounts, their code if present, and declared storages (both storage_reads
407            // and storage_changes).
408            //
409            // This runs side-by-side with the parallel transaction execution reducing the time it
410            // spends blocking on the data.
411            let caches = saved_cache.cache().clone();
412            let provider_builder = ctx.provider.clone();
413            let build = Arc::new(move || provider_builder.build());
414
415            pool.begin_block(build, caches);
416            for account in prefetch_bal.as_bal() {
417                pool.warm_account(account.address);
418                for change in &account.storage_changes {
419                    pool.warm_storage(account.address, change.slot.into());
420                }
421                for &slot in &account.storage_reads {
422                    pool.warm_storage(account.address, slot.into());
423                }
424            }
425            pool.end_block();
426        }
427
428        stream_rx
429            .blocking_recv()
430            .expect("BAL hashed-state streaming task dropped without signaling completion");
431
432        // Drop the per-thread providers
433        executor.bal_streaming_pool().clear();
434        executor.prewarming_pool().clear();
435
436        let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
437    }
438
439    /// Executes the task.
440    ///
441    /// This will execute the transactions until all transactions have been processed or the task
442    /// was cancelled.
443    #[instrument(
444        parent = &self.parent_span,
445        level = "debug",
446        target = "engine::tree::payload_processor::prewarm",
447        name = "prewarm and caching",
448        skip_all
449    )]
450    pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
451    where
452        Tx: ExecutableTxFor<Evm> + Send + 'static,
453    {
454        // Spawn execution tasks based on mode
455        match mode {
456            PrewarmMode::Transactions(pending) => {
457                self.spawn_txs_prewarm(pending, actions_tx, self.to_sparse_trie_task.clone());
458            }
459            PrewarmMode::BlockAccessList(bal) => {
460                self.run_bal_prewarm(bal, actions_tx);
461            }
462            PrewarmMode::Skipped => {
463                let _ = actions_tx
464                    .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
465            }
466        }
467
468        let mut final_execution_outcome = None;
469        let mut finished_execution = false;
470        while let Ok(event) = self.actions_rx.recv() {
471            match event {
472                PrewarmTaskEvent::TerminateTransactionExecution => {
473                    // stop tx processing
474                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
475                    self.ctx.stop();
476                }
477                PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
478                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
479                    final_execution_outcome =
480                        Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
481
482                    if finished_execution {
483                        // all tasks are done, we can exit, which will save caches and exit
484                        break
485                    }
486                }
487                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
488                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
489                    self.ctx.metrics.transactions.set(executed_transactions as f64);
490                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
491
492                    finished_execution = true;
493
494                    if final_execution_outcome.is_some() {
495                        // all tasks are done, we can exit, which will save caches and exit
496                        break
497                    }
498                }
499            }
500        }
501
502        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
503
504        // save caches and finish using the shared ExecutionOutcome
505        if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
506            self.save_cache(execution_outcome, valid_block_rx);
507        }
508    }
509}
510
511/// Context required by tx execution tasks.
512#[derive(Debug, Clone)]
513pub struct PrewarmContext<N, P, Evm>
514where
515    N: NodePrimitives,
516    Evm: ConfigureEvm<Primitives = N>,
517{
518    /// The execution environment.
519    pub env: ExecutionEnv<Evm>,
520    /// The EVM configuration.
521    pub evm_config: Evm,
522    /// The saved cache.
523    pub saved_cache: Option<SavedCache>,
524    /// Provider to obtain the state
525    pub provider: StateProviderBuilder<N, P>,
526    /// Dedicated blocking pool for warming the BAL read-set. `Some` only on the BAL parallel
527    /// execution path; the pool is owned by the [`PayloadProcessor`](super::PayloadProcessor).
528    pub(crate) bal_prewarm_pool: Option<Arc<BalPrewarmPool>>,
529    /// The metrics for the prewarm task.
530    pub metrics: PrewarmMetrics,
531    /// Metrics for the execution cache.
532    /// Metrics for the execution cache. `None` disables metrics recording.
533    pub cache_metrics: Option<CachedStateMetrics>,
534    /// Metrics for shared execution cache state. `None` disables metrics recording.
535    pub cache_state_metrics: Option<CachedStateCacheMetrics>,
536    /// An atomic bool that tells prewarm tasks to not start any more execution.
537    pub terminate_execution: Arc<AtomicBool>,
538    /// Shared counter tracking the next transaction index to be executed by the main execution
539    /// loop. Prewarm workers skip transactions with `index < counter` since those have already
540    /// been executed.
541    pub executed_tx_index: Arc<AtomicUsize>,
542    /// Whether the precompile cache is disabled.
543    pub precompile_cache_disabled: bool,
544    /// The precompile cache map.
545    pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
546    /// Whether to disable BAL-driven parallel state root computation.
547    /// Only valid when BAL parallel execution is also disabled.
548    pub disable_bal_parallel_state_root: bool,
549    /// Whether BAL state prefetching during prewarm is disabled.
550    pub disable_bal_batch_io: bool,
551}
552
553/// Per-thread EVM state initialised by [`PrewarmContext::evm_for_ctx`] and stored in
554/// [`WorkerPool`] workers via [`Worker::get_or_init`](reth_tasks::pool::Worker::get_or_init).
555type PrewarmEvmState<Evm> =
556    Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
557
558impl<N, P, Evm> PrewarmContext<N, P, Evm>
559where
560    N: NodePrimitives,
561    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
562    Evm: ConfigureEvm<Primitives = N> + 'static,
563{
564    /// Creates a per-thread EVM for prewarming.
565    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
566    fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
567        let mut state_provider = match self.provider.build() {
568            Ok(provider) => provider,
569            Err(err) => {
570                trace!(
571                    target: "engine::tree::payload_processor::prewarm",
572                    %err,
573                    "Failed to build state provider in prewarm thread"
574                );
575                return None
576            }
577        };
578
579        // Use the caches to create a new provider with caching
580        if let Some(saved_cache) = &self.saved_cache {
581            let caches = saved_cache.cache().clone();
582            state_provider = Box::new(CachedStateProvider::new_prewarm(state_provider, caches));
583        }
584
585        let state_provider = StateProviderDatabase::new(state_provider);
586
587        let mut evm_env = self.env.evm_env.clone();
588
589        // we must disable the nonce check so that we can execute the transaction even if the nonce
590        // doesn't match what's on chain.
591        evm_env.cfg_env.disable_nonce_check = true;
592
593        // disable the balance check so that transactions from senders who were funded by earlier
594        // transactions in the block can still be prewarmed
595        evm_env.cfg_env.disable_balance_check = true;
596
597        // create a new executor and disable nonce checks in the env
598        let spec_id = *evm_env.spec_id();
599        let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
600
601        if !self.precompile_cache_disabled {
602            // Only cache pure precompiles to avoid issues with stateful precompiles
603            evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
604                CachedPrecompile::wrap(
605                    precompile,
606                    self.precompile_cache_map.cache_for_address(*address),
607                    spec_id,
608                    None, // No metrics for prewarm
609                )
610            });
611        }
612
613        Some(evm)
614    }
615
616    /// Returns `true` if prewarming should stop.
617    #[inline]
618    pub fn should_stop(&self) -> bool {
619        self.terminate_execution.load(Ordering::Relaxed)
620    }
621
622    /// Signals all prewarm tasks to stop execution.
623    #[inline]
624    pub fn stop(&self) {
625        self.terminate_execution.store(true, Ordering::Relaxed);
626    }
627
628    /// Hashes and streams a single BAL account's state to the sparse trie task.
629    ///
630    /// For each changed account, storage slots are hashed and sent immediately, then the account
631    /// is sent as a separate update. The parent account is read only when the BAL did not provide
632    /// all account leaf fields needed by the sparse trie.
633    ///
634    /// The `provider` is lazily initialized on first call and reused across accounts on the same
635    /// thread.
636    fn send_bal_hashed_state(
637        &self,
638        parent_span: &Span,
639        provider: &mut Option<Box<dyn AccountReader>>,
640        account_changes: &alloy_eip7928::AccountChanges,
641        to_sparse_trie_task: &CrossbeamSender<StateRootMessage>,
642    ) {
643        if self.disable_bal_parallel_state_root {
644            return;
645        }
646        let address = account_changes.address;
647        let mut hashed_address = None;
648        let account_fields = BalAccountStateFields::from_changes(account_changes);
649
650        if !bal_account_changes_state_root(account_changes, account_fields) {
651            return;
652        }
653
654        if !account_changes.storage_changes.is_empty() {
655            let hashed_address = *hashed_address.get_or_insert_with(|| keccak256(address));
656            let mut storage_map = reth_trie::HashedStorage::new(false);
657
658            for slot_changes in &account_changes.storage_changes {
659                let hashed_slot = keccak256(slot_changes.slot.to_be_bytes::<32>());
660                if let Some(last_change) = slot_changes.changes.last() {
661                    storage_map.storage.insert(hashed_slot, last_change.new_value);
662                }
663            }
664
665            let mut hashed_state = reth_trie::HashedPostState::default();
666            hashed_state.storages.insert(hashed_address, storage_map);
667            let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
668        }
669
670        let existing_account = if account_fields.needs_parent_account() {
671            if provider.is_none() {
672                let _span = debug_span!(
673                    target: "engine::tree::payload_processor::prewarm",
674                    parent: parent_span,
675                    "bal_hashed_state_provider_init",
676                    has_saved_cache = !self.disable_bal_batch_io && self.saved_cache.is_some(),
677                )
678                .entered();
679
680                let inner = match self.provider.build() {
681                    Ok(p) => p,
682                    Err(err) => {
683                        warn!(
684                            target: "engine::tree::payload_processor::prewarm",
685                            ?err,
686                            "Failed to build provider for BAL account reads"
687                        );
688                        return;
689                    }
690                };
691                let boxed: Box<dyn AccountReader> =
692                    match (self.disable_bal_batch_io, &self.saved_cache) {
693                        (false, Some(saved)) => {
694                            let caches = saved.cache().clone();
695                            Box::new(CachedStateProvider::new_prewarm(inner, caches))
696                        }
697                        _ => Box::new(inner),
698                    };
699                *provider = Some(boxed);
700            }
701            let account_reader = provider.as_ref().expect("provider just initialized");
702            account_reader.basic_account(&address).ok().flatten()
703        } else {
704            None
705        };
706
707        let account = account_fields.into_account(existing_account);
708
709        let hashed_address = hashed_address.unwrap_or_else(|| keccak256(address));
710        let mut hashed_state = reth_trie::HashedPostState::default();
711        hashed_state.accounts.insert(hashed_address, Some(account));
712
713        let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
714    }
715}
716
717#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
718struct BalAccountStateFields {
719    balance: Option<U256>,
720    nonce: Option<u64>,
721    code_hash: Option<B256>,
722}
723
724impl BalAccountStateFields {
725    fn from_changes(account_changes: &alloy_eip7928::AccountChanges) -> Self {
726        Self {
727            balance: account_changes.balance_changes.last().map(|change| change.post_balance),
728            nonce: account_changes.nonce_changes.last().map(|change| change.new_nonce),
729            code_hash: account_changes.code_changes.last().map(|code_change| {
730                if code_change.new_code.is_empty() {
731                    alloy_consensus::constants::KECCAK_EMPTY
732                } else {
733                    keccak256(&code_change.new_code)
734                }
735            }),
736        }
737    }
738
739    const fn is_empty(self) -> bool {
740        self.balance.is_none() && self.nonce.is_none() && self.code_hash.is_none()
741    }
742
743    const fn needs_parent_account(self) -> bool {
744        self.balance.is_none() || self.nonce.is_none() || self.code_hash.is_none()
745    }
746
747    fn into_account(self, existing_account: Option<Account>) -> Account {
748        let existing_account = existing_account.as_ref();
749        Account {
750            balance: self.balance.unwrap_or_else(|| {
751                existing_account
752                    .map(|account| account.balance)
753                    .unwrap_or(alloy_primitives::U256::ZERO)
754            }),
755            nonce: self
756                .nonce
757                .unwrap_or_else(|| existing_account.map(|account| account.nonce).unwrap_or(0)),
758            bytecode_hash: self.code_hash.or_else(|| {
759                existing_account
760                    .and_then(|account| account.bytecode_hash)
761                    .or(Some(alloy_consensus::constants::KECCAK_EMPTY))
762            }),
763        }
764    }
765}
766
767const fn bal_account_changes_state_root(
768    account_changes: &alloy_eip7928::AccountChanges,
769    account_fields: BalAccountStateFields,
770) -> bool {
771    !account_fields.is_empty() || !account_changes.storage_changes.is_empty()
772}
773
774/// Returns [`MultiProofTargetsV2`] for withdrawal addresses.
775///
776/// Withdrawals only modify account balances (no storage), so the targets contain
777/// only account-level entries with empty storage sets.
778fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
779    MultiProofTargetsV2 {
780        account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
781        ..Default::default()
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788    use alloy_eip7928::{
789        AccountChanges, BalanceChange, BlockAccessIndex, CodeChange, NonceChange, SlotChanges,
790        StorageChange,
791    };
792    use alloy_primitives::{address, bytes};
793
794    #[test]
795    fn bal_read_only_account_does_not_change_state_root() {
796        let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
797            .with_storage_read(U256::from(1));
798        let fields = BalAccountStateFields::from_changes(&changes);
799
800        assert!(fields.is_empty());
801        assert!(!bal_account_changes_state_root(&changes, fields));
802    }
803
804    #[test]
805    fn bal_account_with_all_leaf_fields_does_not_need_parent_account() {
806        let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
807            .with_balance_change(BalanceChange::new(BlockAccessIndex::new(1), U256::from(10)))
808            .with_nonce_change(NonceChange::new(BlockAccessIndex::new(1), 7))
809            .with_code_change(CodeChange::new(BlockAccessIndex::new(1), bytes!("6001600155")));
810        let fields = BalAccountStateFields::from_changes(&changes);
811
812        assert!(bal_account_changes_state_root(&changes, fields));
813        assert!(!fields.needs_parent_account());
814    }
815
816    #[test]
817    fn bal_storage_change_needs_parent_account_when_leaf_fields_missing() {
818        let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
819            .with_storage_change(SlotChanges::new(
820                U256::from(1),
821                vec![StorageChange::new(BlockAccessIndex::new(1), U256::from(2))],
822            ));
823        let fields = BalAccountStateFields::from_changes(&changes);
824
825        assert!(bal_account_changes_state_root(&changes, fields));
826        assert!(fields.needs_parent_account());
827    }
828
829    #[test]
830    fn bal_account_uses_existing_fields_only_when_missing() {
831        let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
832            .with_balance_change(BalanceChange::new(BlockAccessIndex::new(1), U256::from(10)));
833        let fields = BalAccountStateFields::from_changes(&changes);
834        let account = fields.into_account(Some(Account {
835            balance: U256::from(1),
836            nonce: 3,
837            bytecode_hash: Some(B256::repeat_byte(0xaa)),
838        }));
839
840        assert_eq!(account.balance, U256::from(10));
841        assert_eq!(account.nonce, 3);
842        assert_eq!(account.bytecode_hash, Some(B256::repeat_byte(0xaa)));
843    }
844}
845
846/// The events the pre-warm task can handle.
847///
848/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
849/// execution path without cloning the expensive `BundleState`.
850#[derive(Debug)]
851pub enum PrewarmTaskEvent<R> {
852    /// Forcefully terminate all remaining transaction execution.
853    TerminateTransactionExecution,
854    /// Forcefully terminate the task on demand and update the shared cache with the given output
855    /// before exiting.
856    Terminate {
857        /// The final execution outcome. Using `Arc` allows sharing with the main execution
858        /// path without cloning the expensive `BundleState`.
859        execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
860        /// Receiver for the block validation result.
861        ///
862        /// Cache saving is racing the state root validation. We optimistically construct the
863        /// updated cache but only save it once we know the block is valid.
864        valid_block_rx: mpsc::Receiver<()>,
865    },
866    /// Finished executing all transactions
867    FinishedTxExecution {
868        /// Number of transactions executed
869        executed_transactions: usize,
870    },
871}
872
873/// Metrics for transactions prewarming.
874#[derive(Metrics, Clone)]
875#[metrics(scope = "sync.prewarm")]
876pub struct PrewarmMetrics {
877    /// The number of transactions to prewarm
878    pub(crate) transactions: Gauge,
879    /// A histogram of the number of transactions to prewarm
880    pub(crate) transactions_histogram: Histogram,
881    /// A histogram of duration per transaction prewarming
882    pub(crate) total_runtime: Histogram,
883    /// A histogram of EVM execution duration per transaction prewarming
884    pub(crate) execution_duration: Histogram,
885    /// A histogram for prefetch targets per transaction prewarming
886    pub(crate) prefetch_storage_targets: Histogram,
887    /// A histogram of duration for cache saving
888    pub(crate) cache_saving_duration: Gauge,
889    /// Counter for transaction execution errors during prewarming
890    pub(crate) transaction_errors: Counter,
891    /// A histogram of BAL slot iteration duration during prefetching
892    pub(crate) bal_slot_iteration_duration: Histogram,
893}