1use crate::tree::{
15    cached_state::{CachedStateProvider, SavedCache},
16    payload_processor::{
17        executor::WorkloadExecutor, multiproof::MultiProofMessage,
18        ExecutionCache as PayloadExecutionCache,
19    },
20    precompile_cache::{CachedPrecompile, PrecompileCacheMap},
21    ExecutionEnv, StateProviderBuilder,
22};
23use alloy_consensus::transaction::TxHashRef;
24use alloy_eips::Typed2718;
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use crossbeam_channel::Sender as CrossbeamSender;
28use metrics::{Counter, Gauge, Histogram};
29use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
30use reth_metrics::Metrics;
31use reth_primitives_traits::NodePrimitives;
32use reth_provider::{BlockReader, StateProviderFactory, StateReader};
33use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
34use reth_trie::MultiProofTargets;
35use std::{
36    sync::{
37        atomic::{AtomicBool, Ordering},
38        mpsc::{self, channel, Receiver, Sender},
39        Arc,
40    },
41    time::Instant,
42};
43use tracing::{debug, debug_span, instrument, trace, warn};
44
45#[derive(Clone)]
47struct IndexedTransaction<Tx> {
48    index: usize,
50    tx: Tx,
52}
53
54const MAX_STANDARD_TX_TYPE: u8 = 4;
66
67pub(super) struct PrewarmCacheTask<N, P, Evm>
72where
73    N: NodePrimitives,
74    Evm: ConfigureEvm<Primitives = N>,
75{
76    executor: WorkloadExecutor,
78    execution_cache: PayloadExecutionCache,
80    ctx: PrewarmContext<N, P, Evm>,
82    max_concurrency: usize,
84    transaction_count_hint: usize,
86    to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
88    actions_rx: Receiver<PrewarmTaskEvent>,
90}
91
92impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
93where
94    N: NodePrimitives,
95    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
96    Evm: ConfigureEvm<Primitives = N> + 'static,
97{
98    pub(super) fn new(
100        executor: WorkloadExecutor,
101        execution_cache: PayloadExecutionCache,
102        ctx: PrewarmContext<N, P, Evm>,
103        to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
104        transaction_count_hint: usize,
105        max_concurrency: usize,
106    ) -> (Self, Sender<PrewarmTaskEvent>) {
107        let (actions_tx, actions_rx) = channel();
108
109        trace!(
110            target: "engine::tree::payload_processor::prewarm",
111            max_concurrency,
112            transaction_count_hint,
113            "Initialized prewarm task"
114        );
115
116        (
117            Self {
118                executor,
119                execution_cache,
120                ctx,
121                max_concurrency,
122                transaction_count_hint,
123                to_multi_proof,
124                actions_rx,
125            },
126            actions_tx,
127        )
128    }
129
130    fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
136    where
137        Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
138    {
139        let executor = self.executor.clone();
140        let ctx = self.ctx.clone();
141        let max_concurrency = self.max_concurrency;
142        let transaction_count_hint = self.transaction_count_hint;
143        let span = tracing::Span::current();
144
145        self.executor.spawn_blocking(move || {
146            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
147
148            let (done_tx, done_rx) = mpsc::channel();
149
150            let workers_needed = if transaction_count_hint == 0 {
154                max_concurrency
155            } else {
156                transaction_count_hint.min(max_concurrency)
157            };
158
159            let mut handles = Vec::with_capacity(workers_needed);
161
162            for i in 0..workers_needed {
164                handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
165            }
166
167            let mut tx_index = 0usize;
169            while let Ok(tx) = pending.recv() {
170                if ctx.terminate_execution.load(Ordering::Relaxed) {
172                    trace!(
173                        target: "engine::tree::payload_processor::prewarm",
174                        "Termination requested, stopping transaction distribution"
175                    );
176                    break;
177                }
178
179                let indexed_tx = IndexedTransaction { index: tx_index, tx };
180                let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
181
182                if tx_index == 0 && is_system_tx {
188                    for handle in &handles {
189                        let _ = handle.send(indexed_tx.clone());
194                    }
195                } else {
196                    let worker_idx = tx_index % workers_needed;
198                    let _ = handles[worker_idx].send(indexed_tx);
203                }
204
205                tx_index += 1;
206            }
207
208            drop(done_tx);
210            drop(handles);
211            while done_rx.recv().is_ok() {}
212
213            let _ = actions_tx
214                .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
215        });
216    }
217
218    fn is_execution_terminated(&self) -> bool {
220        self.ctx.terminate_execution.load(Ordering::Relaxed)
221    }
222
223    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
225        if self.is_execution_terminated() {
226            return
229        }
230
231        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
232            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
233        }
234    }
235
236    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
248    fn save_cache(self, state: BundleState) {
249        let start = Instant::now();
250
251        let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
252            self;
253        let hash = env.hash;
254
255        debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
256        execution_cache.update_with_guard(|cached| {
258            let (caches, cache_metrics) = saved_cache.split();
260            let new_cache = SavedCache::new(hash, caches, cache_metrics);
261
262            if new_cache.cache().insert_state(&state).is_err() {
264                *cached = None;
266                debug!(target: "engine::caching", "cleared execution cache on update error");
267                return;
268            }
269
270            new_cache.update_metrics();
271
272            *cached = Some(new_cache);
274        });
275
276        let elapsed = start.elapsed();
277        debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
278
279        metrics.cache_saving_duration.set(elapsed.as_secs_f64());
280    }
281
282    #[instrument(
287        level = "debug",
288        target = "engine::tree::payload_processor::prewarm",
289        name = "prewarm",
290        skip_all
291    )]
292    pub(super) fn run(
293        self,
294        pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
295        actions_tx: Sender<PrewarmTaskEvent>,
296    ) {
297        self.spawn_all(pending, actions_tx);
299
300        let mut final_block_output = None;
301        let mut finished_execution = false;
302        while let Ok(event) = self.actions_rx.recv() {
303            match event {
304                PrewarmTaskEvent::TerminateTransactionExecution => {
305                    debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
307                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
308                }
309                PrewarmTaskEvent::Outcome { proof_targets } => {
310                    self.send_multi_proof_targets(proof_targets);
312                }
313                PrewarmTaskEvent::Terminate { block_output } => {
314                    trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
315                    final_block_output = Some(block_output);
316
317                    if finished_execution {
318                        break
320                    }
321                }
322                PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
323                    trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
324                    self.ctx.metrics.transactions.set(executed_transactions as f64);
325                    self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
326
327                    finished_execution = true;
328
329                    if final_block_output.is_some() {
330                        break
332                    }
333                }
334            }
335        }
336
337        debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
338
339        if let Some(Some(state)) = final_block_output {
341            self.save_cache(state);
342        }
343    }
344}
345
346#[derive(Debug, Clone)]
348pub(super) struct PrewarmContext<N, P, Evm>
349where
350    N: NodePrimitives,
351    Evm: ConfigureEvm<Primitives = N>,
352{
353    pub(super) env: ExecutionEnv<Evm>,
354    pub(super) evm_config: Evm,
355    pub(super) saved_cache: SavedCache,
356    pub(super) provider: StateProviderBuilder<N, P>,
358    pub(super) metrics: PrewarmMetrics,
359    pub(super) terminate_execution: Arc<AtomicBool>,
361    pub(super) precompile_cache_disabled: bool,
362    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
363}
364
365impl<N, P, Evm> PrewarmContext<N, P, Evm>
366where
367    N: NodePrimitives,
368    P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
369    Evm: ConfigureEvm<Primitives = N> + 'static,
370{
371    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
374    fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
375        let Self {
376            env,
377            evm_config,
378            saved_cache,
379            provider,
380            metrics,
381            terminate_execution,
382            precompile_cache_disabled,
383            mut precompile_cache_map,
384        } = self;
385
386        let state_provider = match provider.build() {
387            Ok(provider) => provider,
388            Err(err) => {
389                trace!(
390                    target: "engine::tree::payload_processor::prewarm",
391                    %err,
392                    "Failed to build state provider in prewarm thread"
393                );
394                return None
395            }
396        };
397
398        let caches = saved_cache.cache().clone();
400        let cache_metrics = saved_cache.metrics().clone();
401        let state_provider =
402            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
403
404        let state_provider = StateProviderDatabase::new(state_provider);
405
406        let mut evm_env = env.evm_env;
407
408        evm_env.cfg_env.disable_nonce_check = true;
411
412        let spec_id = *evm_env.spec_id();
414        let mut evm = evm_config.evm_with_env(state_provider, evm_env);
415
416        if !precompile_cache_disabled {
417            evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
419                CachedPrecompile::wrap(
420                    precompile,
421                    precompile_cache_map.cache_for_address(*address),
422                    spec_id,
423                    None, )
425            });
426        }
427
428        Some((evm, metrics, terminate_execution))
429    }
430
431    #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
440    fn transact_batch<Tx>(
441        self,
442        txs: mpsc::Receiver<IndexedTransaction<Tx>>,
443        sender: Sender<PrewarmTaskEvent>,
444        done_tx: Sender<()>,
445    ) where
446        Tx: ExecutableTxFor<Evm>,
447    {
448        let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
449
450        while let Ok(IndexedTransaction { index, tx }) = {
451            let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
452                .entered();
453            txs.recv()
454        } {
455            let _enter =
456                debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
457                    .entered();
458
459            let start = Instant::now();
461
462            if terminate_execution.load(Ordering::Relaxed) {
465                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
466                break
467            }
468
469            let res = match evm.transact(&tx) {
470                Ok(res) => res,
471                Err(err) => {
472                    trace!(
473                        target: "engine::tree::payload_processor::prewarm",
474                        %err,
475                        tx_hash=%tx.tx().tx_hash(),
476                        sender=%tx.signer(),
477                        "Error when executing prewarm transaction",
478                    );
479                    metrics.transaction_errors.increment(1);
481                    continue
483                }
484            };
485            metrics.execution_duration.record(start.elapsed());
486
487            drop(_enter);
488
489            if terminate_execution.load(Ordering::Relaxed) {
492                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
493                break
494            }
495
496            if index > 0 {
499                let _enter =
500                    debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
501                        .entered();
502                let (targets, storage_targets) = multiproof_targets_from_state(res.state);
503                metrics.prefetch_storage_targets.record(storage_targets as f64);
504                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
505                drop(_enter);
506            }
507
508            metrics.total_runtime.record(start.elapsed());
509        }
510
511        let _ = done_tx.send(());
513    }
514
515    fn spawn_worker<Tx>(
517        &self,
518        idx: usize,
519        executor: &WorkloadExecutor,
520        actions_tx: Sender<PrewarmTaskEvent>,
521        done_tx: Sender<()>,
522    ) -> mpsc::Sender<IndexedTransaction<Tx>>
523    where
524        Tx: ExecutableTxFor<Evm> + Send + 'static,
525    {
526        let (tx, rx) = mpsc::channel();
527        let ctx = self.clone();
528        let span =
529            debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
530
531        executor.spawn_blocking(move || {
532            let _enter = span.entered();
533            ctx.transact_batch(rx, actions_tx, done_tx);
534        });
535
536        tx
537    }
538}
539
540fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
543    let mut targets = MultiProofTargets::with_capacity(state.len());
544    let mut storage_targets = 0;
545    for (addr, account) in state {
546        if !account.is_touched() || account.is_selfdestructed() {
554            continue
555        }
556
557        let mut storage_set =
558            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
559        for (key, slot) in account.storage {
560            if !slot.is_changed() {
562                continue
563            }
564
565            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
566        }
567
568        storage_targets += storage_set.len();
569        targets.insert(keccak256(addr), storage_set);
570    }
571
572    (targets, storage_targets)
573}
574
575pub(super) enum PrewarmTaskEvent {
577    TerminateTransactionExecution,
579    Terminate {
582        block_output: Option<BundleState>,
584    },
585    Outcome {
587        proof_targets: Option<MultiProofTargets>,
589    },
590    FinishedTxExecution {
592        executed_transactions: usize,
594    },
595}
596
597#[derive(Metrics, Clone)]
599#[metrics(scope = "sync.prewarm")]
600pub(crate) struct PrewarmMetrics {
601    pub(crate) transactions: Gauge,
603    pub(crate) transactions_histogram: Histogram,
605    pub(crate) total_runtime: Histogram,
607    pub(crate) execution_duration: Histogram,
609    pub(crate) prefetch_storage_targets: Histogram,
611    pub(crate) cache_saving_duration: Gauge,
613    pub(crate) transaction_errors: Counter,
615}