1use crate::tree::{
15 payload_processor::multiproof::StateRootMessage,
16 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
17 CachedStateMetrics, CachedStateProvider, ExecutionEnv, PayloadExecutionCache, SavedCache,
18 StateProviderBuilder,
19};
20use alloy_consensus::transaction::TxHashRef;
21use alloy_eip7928::bal::DecodedBal;
22use alloy_eips::eip4895::Withdrawal;
23use alloy_primitives::{keccak256, StorageKey, B256};
24use crossbeam_channel::Sender as CrossbeamSender;
25use metrics::{Counter, Gauge, Histogram};
26use rayon::prelude::*;
27use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
28use reth_metrics::Metrics;
29use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
30use reth_provider::{
31 AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
32 StateReader,
33};
34use reth_revm::{database::StateProviderDatabase, state::EvmState};
35use reth_tasks::{pool::WorkerPool, Runtime};
36use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
37use std::sync::{
38 atomic::{AtomicBool, AtomicUsize, Ordering},
39 mpsc::{self, channel, Receiver, Sender},
40 Arc,
41};
42use tokio::sync::oneshot;
43use tracing::{debug, debug_span, instrument, trace, trace_span, warn, Span};
44
45#[derive(Debug)]
47pub enum PrewarmMode<Tx> {
48 Transactions(Receiver<(usize, Tx)>),
50 BlockAccessList(Arc<DecodedBal>),
52 Skipped,
55}
56
57#[derive(Debug)]
62pub struct PrewarmCacheTask<N, P, Evm>
63where
64 N: NodePrimitives,
65 Evm: ConfigureEvm<Primitives = N>,
66{
67 executor: Runtime,
69 execution_cache: PayloadExecutionCache,
71 ctx: PrewarmContext<N, P, Evm>,
73 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
75 actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
77 parent_span: Span,
79}
80
81impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
82where
83 N: NodePrimitives,
84 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
85 Evm: ConfigureEvm<Primitives = N> + 'static,
86{
87 pub fn new(
89 executor: Runtime,
90 execution_cache: PayloadExecutionCache,
91 ctx: PrewarmContext<N, P, Evm>,
92 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
93 ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
94 let (actions_tx, actions_rx) = channel();
95
96 trace!(
97 target: "engine::tree::payload_processor::prewarm",
98 prewarming_threads = executor.prewarming_pool().current_num_threads(),
99 transaction_count = ctx.env.transaction_count,
100 "Initialized prewarm task"
101 );
102
103 (
104 Self {
105 executor,
106 execution_cache,
107 ctx,
108 to_sparse_trie_task,
109 actions_rx,
110 parent_span: Span::current(),
111 },
112 actions_tx,
113 )
114 }
115
116 fn spawn_txs_prewarm<Tx>(
123 &self,
124 pending: mpsc::Receiver<(usize, Tx)>,
125 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
126 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
127 ) where
128 Tx: ExecutableTxFor<Evm> + Send + 'static,
129 {
130 let executor = self.executor.clone();
131 let ctx = self.ctx.clone();
132 let span = Span::current();
133
134 self.executor.spawn_blocking_named("prewarm-txs", move || {
135 let _enter = debug_span!(
136 target: "engine::tree::payload_processor::prewarm",
137 parent: &span,
138 "prewarm_txs"
139 )
140 .entered();
141
142 let ctx = &ctx;
143 let pool = executor.prewarming_pool();
144
145 let mut tx_count = 0usize;
146 let to_sparse_trie_task = to_sparse_trie_task.as_ref();
147 pool.in_place_scope(|s| {
148 s.spawn(|_| {
149 pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
150 });
151
152 while let Ok((index, tx)) = pending.recv() {
153 if ctx.should_stop() {
154 trace!(
155 target: "engine::tree::payload_processor::prewarm",
156 "Termination requested, stopping transaction distribution"
157 );
158 break;
159 }
160
161 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
163 continue;
164 }
165
166 tx_count += 1;
167 let parent_span = Span::current();
168 s.spawn(move |_| {
169 let _enter = trace_span!(
170 target: "engine::tree::payload_processor::prewarm",
171 parent: parent_span,
172 "prewarm_tx",
173 i = index,
174 )
175 .entered();
176 Self::transact_worker(ctx, index, tx, to_sparse_trie_task);
177 });
178 }
179
180 if let Some(to_sparse_trie_task) = to_sparse_trie_task &&
182 let Some(withdrawals) = &ctx.env.withdrawals &&
183 !withdrawals.is_empty()
184 {
185 let targets = multiproof_targets_from_withdrawals(withdrawals);
186 let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
187 }
188 });
189
190 pool.clear();
192
193 let _ = actions_tx
194 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
195 });
196 }
197
198 fn transact_worker<Tx>(
203 ctx: &PrewarmContext<N, P, Evm>,
204 index: usize,
205 tx: Tx,
206 to_sparse_trie_task: Option<&CrossbeamSender<StateRootMessage>>,
207 ) where
208 Tx: ExecutableTxFor<Evm>,
209 {
210 WorkerPool::with_worker_mut(|worker| {
211 let Some(evm) =
212 worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
213 else {
214 return;
215 };
216
217 if ctx.should_stop() {
218 return;
219 }
220
221 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
223 return;
224 }
225
226 let start = Instant::now();
227
228 let (tx_env, tx) = tx.into_parts();
229 let res = match evm.transact(tx_env) {
230 Ok(res) => res,
231 Err(err) => {
232 trace!(
233 target: "engine::tree::payload_processor::prewarm",
234 %err,
235 tx_hash=%tx.tx().tx_hash(),
236 sender=%tx.signer(),
237 "Error when executing prewarm transaction",
238 );
239 ctx.metrics.transaction_errors.increment(1);
240 return;
241 }
242 };
243 ctx.metrics.execution_duration.record(start.elapsed());
244
245 if ctx.should_stop() {
246 return;
247 }
248
249 if index > 0 {
250 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
251 ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
252 if let Some(to_sparse_trie_task) = to_sparse_trie_task {
253 let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
254 }
255 }
256
257 ctx.metrics.total_runtime.record(start.elapsed());
258 });
259 }
260
261 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
273 fn save_cache(
274 self,
275 execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
276 valid_block_rx: mpsc::Receiver<()>,
277 ) {
278 let start = Instant::now();
279
280 let Self {
281 execution_cache,
282 ctx: PrewarmContext { env, metrics, cache_metrics, saved_cache, .. },
283 ..
284 } = self;
285 let hash = env.hash;
286
287 if let Some(saved_cache) = saved_cache {
288 debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
289 execution_cache.update_with_guard(|cached| {
290 let caches = saved_cache.cache().clone();
293 let new_cache = SavedCache::new(hash, caches);
294
295 if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
298 *cached = None;
300 debug!(target: "engine::caching", "cleared execution cache on update error");
301 return;
302 }
303
304 new_cache.update_metrics(cache_metrics.as_ref());
305
306 if valid_block_rx.recv().is_ok() {
307 *cached = Some(new_cache);
310 } else {
311 *cached = None;
314 debug!(target: "engine::caching", "cleared execution cache on invalid block");
315 }
316 });
317
318 let elapsed = start.elapsed();
319 debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
320
321 metrics.cache_saving_duration.set(elapsed.as_secs_f64());
322 }
323 }
324
325 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
333 fn run_bal_prewarm(
334 &self,
335 decoded_bal: Arc<DecodedBal>,
336 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
337 ) {
338 let bal = decoded_bal.as_bal();
339 if bal.is_empty() {
340 if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
341 let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
342 }
343 let _ =
344 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
345 return;
346 }
347
348 trace!(
349 target: "engine::tree::payload_processor::prewarm",
350 accounts = bal.len(),
351 "Starting BAL prewarm"
352 );
353
354 let ctx = self.ctx.clone();
355 let to_sparse_trie_task = self.to_sparse_trie_task.clone();
356 let executor = self.executor.clone();
357 let parent_span = Span::current();
358 let prefetch_parent_span = parent_span.clone();
359 let stream_parent_span = parent_span;
360 let prefetch_bal = Arc::clone(&decoded_bal);
361 let stream_bal = Arc::clone(&decoded_bal);
362 let (prefetch_tx, prefetch_rx) = oneshot::channel();
363 let (stream_tx, stream_rx) = oneshot::channel();
364
365 if let Some(to_sparse_trie_task) = to_sparse_trie_task {
366 let stream_ctx = ctx.clone();
367 executor.bal_streaming_pool().spawn(move || {
368 let branch_span = debug_span!(
369 target: "engine::tree::payload_processor::prewarm",
370 parent: &stream_parent_span,
371 "bal_hashed_state_stream",
372 bal_accounts = stream_bal.as_bal().len(),
373 );
374 let provider_parent_span = branch_span.clone();
375 let _span = branch_span.entered();
376
377 stream_bal.as_bal().par_iter().for_each_init(
378 || {
379 (
380 stream_ctx.clone(),
381 None::<Box<dyn AccountReader>>,
382 provider_parent_span.clone(),
383 )
384 },
385 |(ctx, provider, parent_span), account_changes| {
386 ctx.send_bal_hashed_state(
387 parent_span,
388 provider,
389 account_changes,
390 &to_sparse_trie_task,
391 );
392 },
393 );
394
395 let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
396 let _ = stream_tx.send(());
397 });
398 } else {
399 let _ = stream_tx.send(());
400 }
401
402 if ctx.saved_cache.is_some() && !ctx.disable_bal_batch_io {
403 executor.prewarming_pool().spawn(move || {
404 let branch_span = debug_span!(
405 target: "engine::tree::payload_processor::prewarm",
406 parent: &prefetch_parent_span,
407 "bal_prefetch_storage",
408 bal_accounts = prefetch_bal.as_bal().len(),
409 );
410 let provider_parent_span = branch_span.clone();
411 let _span = branch_span.entered();
412
413 prefetch_bal.as_bal().par_iter().for_each_init(
414 || {
415 (
416 ctx.clone(),
417 None::<CachedStateProvider<reth_provider::StateProviderBox, true>>,
418 provider_parent_span.clone(),
419 )
420 },
421 |(ctx, provider, parent_span), account| {
422 if ctx.should_stop() {
423 return;
424 }
425 ctx.prefetch_bal_storage(parent_span, provider, account);
426 },
427 );
428
429 let _ = prefetch_tx.send(());
430 });
431 } else {
432 let _ = prefetch_tx.send(());
433 }
434
435 prefetch_rx
436 .blocking_recv()
437 .expect("BAL prefetch task dropped without signaling completion");
438 stream_rx
439 .blocking_recv()
440 .expect("BAL hashed-state streaming task dropped without signaling completion");
441
442 let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
443 }
444
445 #[instrument(
450 parent = &self.parent_span,
451 level = "debug",
452 target = "engine::tree::payload_processor::prewarm",
453 name = "prewarm and caching",
454 skip_all
455 )]
456 pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
457 where
458 Tx: ExecutableTxFor<Evm> + Send + 'static,
459 {
460 match mode {
462 PrewarmMode::Transactions(pending) => {
463 self.spawn_txs_prewarm(pending, actions_tx, self.to_sparse_trie_task.clone());
464 }
465 PrewarmMode::BlockAccessList(bal) => {
466 self.run_bal_prewarm(bal, actions_tx);
467 }
468 PrewarmMode::Skipped => {
469 let _ = actions_tx
470 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
471 }
472 }
473
474 let mut final_execution_outcome = None;
475 let mut finished_execution = false;
476 while let Ok(event) = self.actions_rx.recv() {
477 match event {
478 PrewarmTaskEvent::TerminateTransactionExecution => {
479 debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
481 self.ctx.stop();
482 }
483 PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
484 trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
485 final_execution_outcome =
486 Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
487
488 if finished_execution {
489 break
491 }
492 }
493 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
494 trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
495 self.ctx.metrics.transactions.set(executed_transactions as f64);
496 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
497
498 finished_execution = true;
499
500 if final_execution_outcome.is_some() {
501 break
503 }
504 }
505 }
506 }
507
508 debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
509
510 if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
512 self.save_cache(execution_outcome, valid_block_rx);
513 }
514 }
515}
516
517#[derive(Debug, Clone)]
519pub struct PrewarmContext<N, P, Evm>
520where
521 N: NodePrimitives,
522 Evm: ConfigureEvm<Primitives = N>,
523{
524 pub env: ExecutionEnv<Evm>,
526 pub evm_config: Evm,
528 pub saved_cache: Option<SavedCache>,
530 pub provider: StateProviderBuilder<N, P>,
532 pub metrics: PrewarmMetrics,
534 pub cache_metrics: Option<CachedStateMetrics>,
537 pub terminate_execution: Arc<AtomicBool>,
539 pub executed_tx_index: Arc<AtomicUsize>,
543 pub precompile_cache_disabled: bool,
545 pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
547 pub disable_bal_parallel_state_root: bool,
550 pub disable_bal_batch_io: bool,
552}
553
554type PrewarmEvmState<Evm> =
557 Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
558
559impl<N, P, Evm> PrewarmContext<N, P, Evm>
560where
561 N: NodePrimitives,
562 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
563 Evm: ConfigureEvm<Primitives = N> + 'static,
564{
565 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
567 fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
568 let mut state_provider = match self.provider.build() {
569 Ok(provider) => provider,
570 Err(err) => {
571 trace!(
572 target: "engine::tree::payload_processor::prewarm",
573 %err,
574 "Failed to build state provider in prewarm thread"
575 );
576 return None
577 }
578 };
579
580 if let Some(saved_cache) = &self.saved_cache {
582 let caches = saved_cache.cache().clone();
583 state_provider = Box::new(CachedStateProvider::new_prewarm(
584 state_provider,
585 caches,
586 self.cache_metrics.clone().unwrap_or_default(),
587 ));
588 }
589
590 let state_provider = StateProviderDatabase::new(state_provider);
591
592 let mut evm_env = self.env.evm_env.clone();
593
594 evm_env.cfg_env.disable_nonce_check = true;
597
598 evm_env.cfg_env.disable_balance_check = true;
601
602 let spec_id = *evm_env.spec_id();
604 let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
605
606 if !self.precompile_cache_disabled {
607 evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
609 CachedPrecompile::wrap(
610 precompile,
611 self.precompile_cache_map.cache_for_address(*address),
612 spec_id,
613 None, )
615 });
616 }
617
618 Some(evm)
619 }
620
621 #[inline]
623 pub fn should_stop(&self) -> bool {
624 self.terminate_execution.load(Ordering::Relaxed)
625 }
626
627 #[inline]
629 pub fn stop(&self) {
630 self.terminate_execution.store(true, Ordering::Relaxed);
631 }
632
633 fn send_bal_hashed_state(
641 &self,
642 parent_span: &Span,
643 provider: &mut Option<Box<dyn AccountReader>>,
644 account_changes: &alloy_eip7928::AccountChanges,
645 to_sparse_trie_task: &CrossbeamSender<StateRootMessage>,
646 ) {
647 if self.disable_bal_parallel_state_root {
648 return;
649 }
650 let address = account_changes.address;
651 let mut hashed_address = None;
652
653 if !account_changes.storage_changes.is_empty() {
654 let hashed_address = *hashed_address.get_or_insert_with(|| keccak256(address));
655 let mut storage_map = reth_trie::HashedStorage::new(false);
656
657 for slot_changes in &account_changes.storage_changes {
658 let hashed_slot = keccak256(slot_changes.slot.to_be_bytes::<32>());
659 if let Some(last_change) = slot_changes.changes.last() {
660 storage_map.storage.insert(hashed_slot, last_change.new_value);
661 }
662 }
663
664 let mut hashed_state = reth_trie::HashedPostState::default();
665 hashed_state.storages.insert(hashed_address, storage_map);
666 let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
667 }
668
669 if provider.is_none() {
670 let _span = debug_span!(
671 target: "engine::tree::payload_processor::prewarm",
672 parent: parent_span,
673 "bal_hashed_state_provider_init",
674 has_saved_cache = !self.disable_bal_batch_io && self.saved_cache.is_some(),
675 )
676 .entered();
677
678 let inner = match self.provider.build() {
679 Ok(p) => p,
680 Err(err) => {
681 warn!(
682 target: "engine::tree::payload_processor::prewarm",
683 ?err,
684 "Failed to build provider for BAL account reads"
685 );
686 return;
687 }
688 };
689 let boxed: Box<dyn AccountReader> = match (self.disable_bal_batch_io, &self.saved_cache)
690 {
691 (false, Some(saved)) => {
692 let caches = saved.cache().clone();
693 Box::new(CachedStateProvider::new_prewarm(
694 inner,
695 caches,
696 self.cache_metrics.clone().unwrap_or_default(),
697 ))
698 }
699 _ => Box::new(inner),
700 };
701 *provider = Some(boxed);
702 }
703 let account_reader = provider.as_ref().expect("provider just initialized");
704
705 let existing_account = account_reader.basic_account(&address).ok().flatten();
706
707 let balance = account_changes.balance_changes.last().map(|change| change.post_balance);
708 let nonce = account_changes.nonce_changes.last().map(|change| change.new_nonce);
709 let code_hash = account_changes.code_changes.last().map(|code_change| {
710 if code_change.new_code.is_empty() {
711 alloy_consensus::constants::KECCAK_EMPTY
712 } else {
713 keccak256(&code_change.new_code)
714 }
715 });
716
717 if balance.is_none() &&
718 nonce.is_none() &&
719 code_hash.is_none() &&
720 account_changes.storage_changes.is_empty()
721 {
722 return;
723 }
724
725 let account = reth_primitives_traits::Account {
726 balance: balance.unwrap_or_else(|| {
727 existing_account
728 .as_ref()
729 .map(|account| account.balance)
730 .unwrap_or(alloy_primitives::U256::ZERO)
731 }),
732 nonce: nonce.unwrap_or_else(|| {
733 existing_account.as_ref().map(|account| account.nonce).unwrap_or(0)
734 }),
735 bytecode_hash: code_hash.or_else(|| {
736 existing_account
737 .as_ref()
738 .and_then(|account| account.bytecode_hash)
739 .or(Some(alloy_consensus::constants::KECCAK_EMPTY))
740 }),
741 };
742
743 let hashed_address = hashed_address.unwrap_or_else(|| keccak256(address));
744 let mut hashed_state = reth_trie::HashedPostState::default();
745 hashed_state.accounts.insert(hashed_address, Some(account));
746
747 let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
748 }
749
750 fn prefetch_bal_storage(
759 &self,
760 parent_span: &Span,
761 provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox, true>>,
762 account: &alloy_eip7928::AccountChanges,
763 ) {
764 if self.disable_bal_batch_io ||
765 (account.storage_changes.is_empty() && account.storage_reads.is_empty())
766 {
767 return;
768 }
769
770 let state_provider = match provider {
771 Some(p) => p,
772 slot @ None => {
773 let _span = debug_span!(
774 target: "engine::tree::payload_processor::prewarm",
775 parent: parent_span,
776 "bal_prefetch_provider_init",
777 )
778 .entered();
779
780 let built = match self.provider.build() {
781 Ok(p) => p,
782 Err(err) => {
783 trace!(
784 target: "engine::tree::payload_processor::prewarm",
785 %err,
786 "Failed to build state provider in BAL prewarm thread"
787 );
788 return;
789 }
790 };
791 let saved_cache =
792 self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
793 let caches = saved_cache.cache().clone();
794 slot.insert(CachedStateProvider::new_prewarm(
795 built,
796 caches,
797 self.cache_metrics.clone().unwrap_or_default(),
798 ))
799 }
800 };
801
802 let start = Instant::now();
803
804 for slot in &account.storage_changes {
805 let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
806 }
807 for &slot in &account.storage_reads {
808 let _ = state_provider.storage(account.address, StorageKey::from(slot));
809 }
810
811 self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
812 }
813}
814
815fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
818 let mut targets = MultiProofTargetsV2::default();
819 let mut storage_target_count = 0;
820 for (addr, account) in state {
821 if !account.is_touched() || account.is_selfdestructed() {
829 continue
830 }
831
832 let hashed_address = keccak256(addr);
833 targets.account_targets.push(hashed_address.into());
834
835 let mut storage_slots = Vec::with_capacity(account.storage.len());
836 for (key, slot) in account.storage {
837 if !slot.is_changed() {
839 continue
840 }
841
842 let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
843 storage_slots.push(ProofV2Target::from(hashed_slot));
844 }
845
846 storage_target_count += storage_slots.len();
847 if !storage_slots.is_empty() {
848 targets.storage_targets.insert(hashed_address, storage_slots);
849 }
850 }
851
852 (targets, storage_target_count)
853}
854
855fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
860 MultiProofTargetsV2 {
861 account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
862 ..Default::default()
863 }
864}
865
866#[derive(Debug)]
871pub enum PrewarmTaskEvent<R> {
872 TerminateTransactionExecution,
874 Terminate {
877 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
880 valid_block_rx: mpsc::Receiver<()>,
885 },
886 FinishedTxExecution {
888 executed_transactions: usize,
890 },
891}
892
893#[derive(Metrics, Clone)]
895#[metrics(scope = "sync.prewarm")]
896pub struct PrewarmMetrics {
897 pub(crate) transactions: Gauge,
899 pub(crate) transactions_histogram: Histogram,
901 pub(crate) total_runtime: Histogram,
903 pub(crate) execution_duration: Histogram,
905 pub(crate) prefetch_storage_targets: Histogram,
907 pub(crate) cache_saving_duration: Gauge,
909 pub(crate) transaction_errors: Counter,
911 pub(crate) bal_slot_iteration_duration: Histogram,
913}