reth_engine_tree/tree/payload_processor/
prewarm.rs1use crate::tree::{
15 cached_state::{CachedStateProvider, SavedCache},
16 payload_processor::{
17 executor::WorkloadExecutor, multiproof::MultiProofMessage,
18 ExecutionCache as PayloadExecutionCache,
19 },
20 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
21 ExecutionEnv, StateProviderBuilder,
22};
23use alloy_consensus::transaction::TxHashRef;
24use alloy_eips::Typed2718;
25use alloy_evm::Database;
26use alloy_primitives::{keccak256, map::B256Set, B256};
27use metrics::{Counter, Gauge, Histogram};
28use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
29use reth_metrics::Metrics;
30use reth_primitives_traits::NodePrimitives;
31use reth_provider::{BlockReader, StateProviderFactory, StateReader};
32use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
33use reth_trie::MultiProofTargets;
34use std::{
35 sync::{
36 atomic::{AtomicBool, Ordering},
37 mpsc::{self, channel, Receiver, Sender},
38 Arc,
39 },
40 time::Instant,
41};
42use tracing::{debug, trace, warn};
43
44#[derive(Clone)]
46struct IndexedTransaction<Tx> {
47 index: usize,
49 tx: Tx,
51}
52
53const MAX_STANDARD_TX_TYPE: u8 = 4;
65
66pub(super) struct PrewarmCacheTask<N, P, Evm>
71where
72 N: NodePrimitives,
73 Evm: ConfigureEvm<Primitives = N>,
74{
75 executor: WorkloadExecutor,
77 execution_cache: PayloadExecutionCache,
79 ctx: PrewarmContext<N, P, Evm>,
81 max_concurrency: usize,
83 transaction_count_hint: usize,
85 to_multi_proof: Option<Sender<MultiProofMessage>>,
87 actions_rx: Receiver<PrewarmTaskEvent>,
89}
90
91impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
92where
93 N: NodePrimitives,
94 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
95 Evm: ConfigureEvm<Primitives = N> + 'static,
96{
97 pub(super) fn new(
99 executor: WorkloadExecutor,
100 execution_cache: PayloadExecutionCache,
101 ctx: PrewarmContext<N, P, Evm>,
102 to_multi_proof: Option<Sender<MultiProofMessage>>,
103 transaction_count_hint: usize,
104 max_concurrency: usize,
105 ) -> (Self, Sender<PrewarmTaskEvent>) {
106 let (actions_tx, actions_rx) = channel();
107
108 trace!(
109 target: "engine::tree::prewarm",
110 max_concurrency,
111 transaction_count_hint,
112 "Initialized prewarm task"
113 );
114
115 (
116 Self {
117 executor,
118 execution_cache,
119 ctx,
120 max_concurrency,
121 transaction_count_hint,
122 to_multi_proof,
123 actions_rx,
124 },
125 actions_tx,
126 )
127 }
128
129 fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
135 where
136 Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
137 {
138 let executor = self.executor.clone();
139 let ctx = self.ctx.clone();
140 let max_concurrency = self.max_concurrency;
141 let transaction_count_hint = self.transaction_count_hint;
142
143 self.executor.spawn_blocking(move || {
144 let (done_tx, done_rx) = mpsc::channel();
145 let mut executing = 0usize;
146
147 let mut handles = Vec::with_capacity(max_concurrency);
149
150 let workers_needed = if transaction_count_hint == 0 {
154 max_concurrency
155 } else {
156 transaction_count_hint.min(max_concurrency)
157 };
158
159 for _ in 0..workers_needed {
161 handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
162 }
163
164 let mut tx_index = 0usize;
165
166 if let Ok(first_tx) = pending.recv() {
168 let indexed_tx = IndexedTransaction { index: tx_index, tx: first_tx };
170 let tx_ref = indexed_tx.tx.tx();
172 let is_system_tx = tx_ref.ty() > MAX_STANDARD_TX_TYPE;
173 let first_tx_hash = tx_ref.tx_hash();
174
175 if is_system_tx {
179 for handle in &handles {
183 if let Err(err) = handle.send(indexed_tx.clone()) {
184 warn!(
185 target: "engine::tree::prewarm",
186 tx_hash = %first_tx_hash,
187 error = %err,
188 "Failed to send deposit transaction to worker"
189 );
190 }
191 }
192 } else {
193 if let Err(err) = handles[0].send(indexed_tx) {
195 warn!(
196 target: "engine::tree::prewarm",
197 task_idx = 0,
198 error = %err,
199 "Failed to send transaction to worker"
200 );
201 }
202 }
203 executing += 1;
204 tx_index += 1;
205 }
206
207 while let Ok(executable) = pending.recv() {
209 let indexed_tx = IndexedTransaction { index: tx_index, tx: executable };
210 let task_idx = executing % workers_needed;
211 if let Err(err) = handles[task_idx].send(indexed_tx) {
212 warn!(
213 target: "engine::tree::prewarm",
214 task_idx,
215 error = %err,
216 "Failed to send transaction to worker"
217 );
218 }
219 executing += 1;
220 tx_index += 1;
221 }
222
223 drop(done_tx);
225 drop(handles);
226 while done_rx.recv().is_ok() {}
227
228 let _ = actions_tx
229 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: executing });
230 });
231 }
232
233 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
235 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
236 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
237 }
238 }
239
240 fn save_cache(self, state: BundleState) {
252 let start = Instant::now();
253
254 let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
255 self;
256 let hash = env.hash;
257
258 execution_cache.update_with_guard(|cached| {
260
261 let (caches, cache_metrics) = saved_cache.split();
263 let new_cache = SavedCache::new(hash, caches, cache_metrics);
264
265 if new_cache.cache().insert_state(&state).is_err() {
267 *cached = None;
269 debug!(target: "engine::caching", "cleared execution cache on update error");
270 return;
271 }
272
273 new_cache.update_metrics();
274 debug!(target: "engine::caching", parent_hash=?new_cache.executed_block_hash(), "Updated execution cache");
275
276 *cached = Some(new_cache);
278 });
279
280 metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
281 }
282
283 pub(super) fn run(
288 self,
289 pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
290 actions_tx: Sender<PrewarmTaskEvent>,
291 ) {
292 self.spawn_all(pending, actions_tx);
294
295 let mut final_block_output = None;
296 let mut finished_execution = false;
297 while let Ok(event) = self.actions_rx.recv() {
298 match event {
299 PrewarmTaskEvent::TerminateTransactionExecution => {
300 self.ctx.terminate_execution.store(true, Ordering::Relaxed);
302 }
303 PrewarmTaskEvent::Outcome { proof_targets } => {
304 self.send_multi_proof_targets(proof_targets);
306 }
307 PrewarmTaskEvent::Terminate { block_output } => {
308 trace!(target: "engine::tree::prewarm", "Received termination signal");
309 final_block_output = Some(block_output);
310
311 if finished_execution {
312 break
314 }
315 }
316 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
317 trace!(target: "engine::tree::prewarm", "Finished prewarm execution signal");
318 self.ctx.metrics.transactions.set(executed_transactions as f64);
319 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
320
321 finished_execution = true;
322
323 if final_block_output.is_some() {
324 break
326 }
327 }
328 }
329 }
330
331 trace!(target: "engine::tree::prewarm", "Completed prewarm execution");
332
333 if let Some(Some(state)) = final_block_output {
335 self.save_cache(state);
336 }
337 }
338}
339
340#[derive(Debug, Clone)]
342pub(super) struct PrewarmContext<N, P, Evm>
343where
344 N: NodePrimitives,
345 Evm: ConfigureEvm<Primitives = N>,
346{
347 pub(super) env: ExecutionEnv<Evm>,
348 pub(super) evm_config: Evm,
349 pub(super) saved_cache: SavedCache,
350 pub(super) provider: StateProviderBuilder<N, P>,
352 pub(super) metrics: PrewarmMetrics,
353 pub(super) terminate_execution: Arc<AtomicBool>,
355 pub(super) precompile_cache_disabled: bool,
356 pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
357}
358
359impl<N, P, Evm> PrewarmContext<N, P, Evm>
360where
361 N: NodePrimitives,
362 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
363 Evm: ConfigureEvm<Primitives = N> + 'static,
364{
365 fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
368 let Self {
369 env,
370 evm_config,
371 saved_cache,
372 provider,
373 metrics,
374 terminate_execution,
375 precompile_cache_disabled,
376 mut precompile_cache_map,
377 } = self;
378
379 let state_provider = match provider.build() {
380 Ok(provider) => provider,
381 Err(err) => {
382 trace!(
383 target: "engine::tree",
384 %err,
385 "Failed to build state provider in prewarm thread"
386 );
387 return None
388 }
389 };
390
391 let caches = saved_cache.cache().clone();
393 let cache_metrics = saved_cache.metrics().clone();
394 let state_provider =
395 CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
396
397 let state_provider = StateProviderDatabase::new(state_provider);
398
399 let mut evm_env = env.evm_env;
400
401 evm_env.cfg_env.disable_nonce_check = true;
404
405 let spec_id = *evm_env.spec_id();
407 let mut evm = evm_config.evm_with_env(state_provider, evm_env);
408
409 if !precompile_cache_disabled {
410 evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
412 CachedPrecompile::wrap(
413 precompile,
414 precompile_cache_map.cache_for_address(*address),
415 spec_id,
416 None, )
418 });
419 }
420
421 Some((evm, metrics, terminate_execution))
422 }
423
424 fn transact_batch<Tx>(
433 self,
434 txs: mpsc::Receiver<IndexedTransaction<Tx>>,
435 sender: Sender<PrewarmTaskEvent>,
436 done_tx: Sender<()>,
437 ) where
438 Tx: ExecutableTxFor<Evm>,
439 {
440 let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
441
442 while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
443 if terminate_execution.load(Ordering::Relaxed) {
446 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
447 break
448 }
449
450 let start = Instant::now();
452 let res = match evm.transact(&tx) {
453 Ok(res) => res,
454 Err(err) => {
455 trace!(
456 target: "engine::tree::prewarm",
457 %err,
458 tx_hash=%tx.tx().tx_hash(),
459 sender=%tx.signer(),
460 "Error when executing prewarm transaction",
461 );
462 metrics.transaction_errors.increment(1);
464 continue
466 }
467 };
468 metrics.execution_duration.record(start.elapsed());
469
470 if index > 0 {
473 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
474 metrics.prefetch_storage_targets.record(storage_targets as f64);
475 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
476 }
477
478 metrics.total_runtime.record(start.elapsed());
479 }
480
481 let _ = done_tx.send(());
483 }
484
485 fn spawn_worker<Tx>(
487 &self,
488 executor: &WorkloadExecutor,
489 actions_tx: Sender<PrewarmTaskEvent>,
490 done_tx: Sender<()>,
491 ) -> mpsc::Sender<IndexedTransaction<Tx>>
492 where
493 Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
494 {
495 let (tx, rx) = mpsc::channel();
496 let ctx = self.clone();
497
498 executor.spawn_blocking(move || {
499 ctx.transact_batch(rx, actions_tx, done_tx);
500 });
501
502 tx
503 }
504}
505
506fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
509 let mut targets = MultiProofTargets::with_capacity(state.len());
510 let mut storage_targets = 0;
511 for (addr, account) in state {
512 if !account.is_touched() || account.is_selfdestructed() {
520 continue
521 }
522
523 let mut storage_set =
524 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
525 for (key, slot) in account.storage {
526 if !slot.is_changed() {
528 continue
529 }
530
531 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
532 }
533
534 storage_targets += storage_set.len();
535 targets.insert(keccak256(addr), storage_set);
536 }
537
538 (targets, storage_targets)
539}
540
541pub(super) enum PrewarmTaskEvent {
543 TerminateTransactionExecution,
545 Terminate {
548 block_output: Option<BundleState>,
550 },
551 Outcome {
553 proof_targets: Option<MultiProofTargets>,
555 },
556 FinishedTxExecution {
558 executed_transactions: usize,
560 },
561}
562
563#[derive(Metrics, Clone)]
565#[metrics(scope = "sync.prewarm")]
566pub(crate) struct PrewarmMetrics {
567 pub(crate) transactions: Gauge,
569 pub(crate) transactions_histogram: Histogram,
571 pub(crate) total_runtime: Histogram,
573 pub(crate) execution_duration: Histogram,
575 pub(crate) prefetch_storage_targets: Histogram,
577 pub(crate) cache_saving_duration: Gauge,
579 pub(crate) transaction_errors: Counter,
581}