1use super::bal_prewarm_pool::BalPrewarmPool;
15use crate::tree::{
16 payload_processor::multiproof::StateRootMessage,
17 precompile_cache::{CachedPrecompile, PrecompileCacheMap},
18 CachedStateCacheMetrics, CachedStateMetrics, CachedStateProvider, ExecutionEnv,
19 PayloadExecutionCache, SavedCache, StateProviderBuilder,
20};
21use alloy_consensus::transaction::TxHashRef;
22use alloy_eip7928::bal::DecodedBal;
23use alloy_eips::eip4895::Withdrawal;
24use alloy_primitives::{keccak256, B256, U256};
25use crossbeam_channel::Sender as CrossbeamSender;
26use metrics::{Counter, Gauge, Histogram};
27use rayon::prelude::*;
28use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
29use reth_metrics::Metrics;
30use reth_primitives_traits::{Account, FastInstant as Instant, NodePrimitives};
31use reth_provider::{
32 AccountReader, BlockExecutionOutput, BlockReader, StateProviderFactory, StateReader,
33};
34use reth_revm::database::StateProviderDatabase;
35use reth_tasks::{pool::WorkerPool, Runtime};
36use reth_trie_common::MultiProofTargetsV2;
37use std::sync::{
38 atomic::{AtomicBool, AtomicUsize, Ordering},
39 mpsc::{self, channel, Receiver, Sender},
40 Arc,
41};
42use tokio::sync::oneshot;
43use tracing::{debug, debug_span, instrument, trace, trace_span, warn, Span};
44
45#[derive(Debug)]
47pub enum PrewarmMode<Tx> {
48 Transactions(Receiver<(usize, Tx)>),
50 BlockAccessList(Arc<DecodedBal>),
52 Skipped,
55}
56
57#[derive(Debug)]
62pub struct PrewarmCacheTask<N, P, Evm>
63where
64 N: NodePrimitives,
65 Evm: ConfigureEvm<Primitives = N>,
66{
67 executor: Runtime,
69 execution_cache: PayloadExecutionCache,
71 ctx: PrewarmContext<N, P, Evm>,
73 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
75 actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
77 parent_span: Span,
79}
80
81impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
82where
83 N: NodePrimitives,
84 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
85 Evm: ConfigureEvm<Primitives = N> + 'static,
86{
87 pub fn new(
89 executor: Runtime,
90 execution_cache: PayloadExecutionCache,
91 ctx: PrewarmContext<N, P, Evm>,
92 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
93 ) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
94 let (actions_tx, actions_rx) = channel();
95
96 trace!(
97 target: "engine::tree::payload_processor::prewarm",
98 prewarming_threads = executor.prewarming_pool().current_num_threads(),
99 transaction_count = ctx.env.transaction_count,
100 "Initialized prewarm task"
101 );
102
103 (
104 Self {
105 executor,
106 execution_cache,
107 ctx,
108 to_sparse_trie_task,
109 actions_rx,
110 parent_span: Span::current(),
111 },
112 actions_tx,
113 )
114 }
115
116 fn spawn_txs_prewarm<Tx>(
123 &self,
124 pending: mpsc::Receiver<(usize, Tx)>,
125 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
126 to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
127 ) where
128 Tx: ExecutableTxFor<Evm> + Send + 'static,
129 {
130 let executor = self.executor.clone();
131 let ctx = self.ctx.clone();
132 let span = Span::current();
133
134 self.executor.spawn_blocking_named("prewarm-txs", move || {
135 let _enter = debug_span!(
136 target: "engine::tree::payload_processor::prewarm",
137 parent: &span,
138 "prewarm_txs"
139 )
140 .entered();
141
142 let ctx = &ctx;
143 let pool = executor.prewarming_pool();
144
145 let mut tx_count = 0usize;
146 let to_sparse_trie_task = to_sparse_trie_task.as_ref();
147 pool.in_place_scope(|s| {
148 s.spawn(|_| {
149 pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
150 });
151
152 while let Ok((index, tx)) = pending.recv() {
153 if ctx.should_stop() {
154 trace!(
155 target: "engine::tree::payload_processor::prewarm",
156 "Termination requested, stopping transaction distribution"
157 );
158 break;
159 }
160
161 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
163 continue;
164 }
165
166 tx_count += 1;
167 let parent_span = Span::current();
168 s.spawn(move |_| {
169 let _enter = trace_span!(
170 target: "engine::tree::payload_processor::prewarm",
171 parent: parent_span,
172 "prewarm_tx",
173 i = index,
174 )
175 .entered();
176 Self::transact_worker(ctx, index, tx, to_sparse_trie_task);
177 });
178 }
179
180 if let Some(to_sparse_trie_task) = to_sparse_trie_task &&
182 let Some(withdrawals) = &ctx.env.withdrawals &&
183 !withdrawals.is_empty()
184 {
185 let targets = multiproof_targets_from_withdrawals(withdrawals);
186 let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
187 }
188 });
189
190 pool.clear();
192
193 let _ = actions_tx
194 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
195 });
196 }
197
198 fn transact_worker<Tx>(
203 ctx: &PrewarmContext<N, P, Evm>,
204 index: usize,
205 tx: Tx,
206 to_sparse_trie_task: Option<&CrossbeamSender<StateRootMessage>>,
207 ) where
208 Tx: ExecutableTxFor<Evm>,
209 {
210 WorkerPool::with_worker_mut(|worker| {
211 let Some(evm) =
212 worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
213 else {
214 return;
215 };
216
217 if ctx.should_stop() {
218 return;
219 }
220
221 if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
223 return;
224 }
225
226 let start = Instant::now();
227
228 let (tx_env, tx) = tx.into_parts();
229 let res = match evm.transact(tx_env) {
230 Ok(res) => res,
231 Err(err) => {
232 trace!(
233 target: "engine::tree::payload_processor::prewarm",
234 %err,
235 tx_hash=%tx.tx().tx_hash(),
236 sender=%tx.signer(),
237 "Error when executing prewarm transaction",
238 );
239 ctx.metrics.transaction_errors.increment(1);
240 return;
241 }
242 };
243 ctx.metrics.execution_duration.record(start.elapsed());
244
245 if ctx.should_stop() {
246 return;
247 }
248
249 if index > 0 {
250 let (targets, storage_targets) = MultiProofTargetsV2::from_state(res.state);
251 ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
252 if let Some(to_sparse_trie_task) = to_sparse_trie_task {
253 let _ = to_sparse_trie_task.send(StateRootMessage::PrefetchProofs(targets));
254 }
255 }
256
257 ctx.metrics.total_runtime.record(start.elapsed());
258 });
259 }
260
261 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
273 fn save_cache(
274 self,
275 execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
276 valid_block_rx: mpsc::Receiver<()>,
277 ) {
278 let start = Instant::now();
279
280 let Self {
281 execution_cache,
282 ctx: PrewarmContext { env, metrics, cache_state_metrics, saved_cache, .. },
283 ..
284 } = self;
285 let hash = env.hash;
286
287 if let Some(saved_cache) = saved_cache {
288 debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
289 execution_cache.update_with_guard(|cached| {
290 let caches = saved_cache.cache().clone();
293 let new_cache = SavedCache::new(hash, caches);
294
295 if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
298 *cached = None;
300 debug!(target: "engine::caching", "cleared execution cache on update error");
301 return;
302 }
303
304 new_cache.update_metrics(cache_state_metrics.as_ref());
305
306 if valid_block_rx.recv().is_ok() {
307 *cached = Some(new_cache);
310 } else {
311 *cached = None;
314 debug!(target: "engine::caching", "cleared execution cache on invalid block");
315 }
316 });
317
318 let elapsed = start.elapsed();
319 debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
320
321 metrics.cache_saving_duration.set(elapsed.as_secs_f64());
322 }
323 }
324
325 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
333 fn run_bal_prewarm(
334 &self,
335 decoded_bal: Arc<DecodedBal>,
336 actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
337 ) {
338 let bal = decoded_bal.as_bal();
339 if bal.is_empty() {
340 if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
341 let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
342 }
343 let _ =
344 actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
345 return;
346 }
347
348 trace!(
349 target: "engine::tree::payload_processor::prewarm",
350 accounts = bal.len(),
351 "Starting BAL prewarm"
352 );
353
354 let ctx = self.ctx.clone();
355 let to_sparse_trie_task = self.to_sparse_trie_task.clone();
356 let executor = self.executor.clone();
357 let parent_span = Span::current();
358 let stream_parent_span = parent_span;
359 let prefetch_bal = Arc::clone(&decoded_bal);
360 let stream_bal = Arc::clone(&decoded_bal);
361 let (stream_tx, stream_rx) = oneshot::channel();
362
363 if let Some(to_sparse_trie_task) = to_sparse_trie_task {
364 let ctx = ctx.clone();
365 executor.bal_streaming_pool().spawn(move || {
366 let branch_span = debug_span!(
367 target: "engine::tree::payload_processor::prewarm",
368 parent: &stream_parent_span,
369 "bal_hashed_state_stream",
370 bal_accounts = stream_bal.as_bal().len(),
371 );
372 let parent_span = branch_span.clone();
373 let _span = branch_span.entered();
374
375 stream_bal.as_bal().par_iter().for_each(|account_changes| {
376 WorkerPool::with_worker_mut(|worker| {
377 let provider =
378 worker.get_or_init::<Option<Box<dyn AccountReader>>>(|| None);
379 ctx.send_bal_hashed_state(
380 &parent_span,
381 provider,
382 account_changes,
383 &to_sparse_trie_task,
384 );
385 });
386 });
387
388 let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
389 let _ = stream_tx.send(());
390 });
391 } else {
392 let _ = stream_tx.send(());
393 }
394
395 if let Some(saved_cache) = ctx.saved_cache &&
396 !ctx.disable_bal_batch_io &&
397 let Some(pool) = ctx.bal_prewarm_pool.as_ref()
398 {
399 let caches = saved_cache.cache().clone();
412 let provider_builder = ctx.provider.clone();
413 let build = Arc::new(move || provider_builder.build());
414
415 pool.begin_block(build, caches);
416 for account in prefetch_bal.as_bal() {
417 pool.warm_account(account.address);
418 for change in &account.storage_changes {
419 pool.warm_storage(account.address, change.slot.into());
420 }
421 for &slot in &account.storage_reads {
422 pool.warm_storage(account.address, slot.into());
423 }
424 }
425 pool.end_block();
426 }
427
428 stream_rx
429 .blocking_recv()
430 .expect("BAL hashed-state streaming task dropped without signaling completion");
431
432 executor.bal_streaming_pool().clear();
434 executor.prewarming_pool().clear();
435
436 let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
437 }
438
439 #[instrument(
444 parent = &self.parent_span,
445 level = "debug",
446 target = "engine::tree::payload_processor::prewarm",
447 name = "prewarm and caching",
448 skip_all
449 )]
450 pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
451 where
452 Tx: ExecutableTxFor<Evm> + Send + 'static,
453 {
454 match mode {
456 PrewarmMode::Transactions(pending) => {
457 self.spawn_txs_prewarm(pending, actions_tx, self.to_sparse_trie_task.clone());
458 }
459 PrewarmMode::BlockAccessList(bal) => {
460 self.run_bal_prewarm(bal, actions_tx);
461 }
462 PrewarmMode::Skipped => {
463 let _ = actions_tx
464 .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
465 }
466 }
467
468 let mut final_execution_outcome = None;
469 let mut finished_execution = false;
470 while let Ok(event) = self.actions_rx.recv() {
471 match event {
472 PrewarmTaskEvent::TerminateTransactionExecution => {
473 debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
475 self.ctx.stop();
476 }
477 PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
478 trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
479 final_execution_outcome =
480 Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
481
482 if finished_execution {
483 break
485 }
486 }
487 PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
488 trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
489 self.ctx.metrics.transactions.set(executed_transactions as f64);
490 self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
491
492 finished_execution = true;
493
494 if final_execution_outcome.is_some() {
495 break
497 }
498 }
499 }
500 }
501
502 debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
503
504 if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
506 self.save_cache(execution_outcome, valid_block_rx);
507 }
508 }
509}
510
511#[derive(Debug, Clone)]
513pub struct PrewarmContext<N, P, Evm>
514where
515 N: NodePrimitives,
516 Evm: ConfigureEvm<Primitives = N>,
517{
518 pub env: ExecutionEnv<Evm>,
520 pub evm_config: Evm,
522 pub saved_cache: Option<SavedCache>,
524 pub provider: StateProviderBuilder<N, P>,
526 pub(crate) bal_prewarm_pool: Option<Arc<BalPrewarmPool>>,
529 pub metrics: PrewarmMetrics,
531 pub cache_metrics: Option<CachedStateMetrics>,
534 pub cache_state_metrics: Option<CachedStateCacheMetrics>,
536 pub terminate_execution: Arc<AtomicBool>,
538 pub executed_tx_index: Arc<AtomicUsize>,
542 pub precompile_cache_disabled: bool,
544 pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
546 pub disable_bal_parallel_state_root: bool,
549 pub disable_bal_batch_io: bool,
551}
552
553type PrewarmEvmState<Evm> =
556 Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
557
558impl<N, P, Evm> PrewarmContext<N, P, Evm>
559where
560 N: NodePrimitives,
561 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
562 Evm: ConfigureEvm<Primitives = N> + 'static,
563{
564 #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
566 fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
567 let mut state_provider = match self.provider.build() {
568 Ok(provider) => provider,
569 Err(err) => {
570 trace!(
571 target: "engine::tree::payload_processor::prewarm",
572 %err,
573 "Failed to build state provider in prewarm thread"
574 );
575 return None
576 }
577 };
578
579 if let Some(saved_cache) = &self.saved_cache {
581 let caches = saved_cache.cache().clone();
582 state_provider = Box::new(CachedStateProvider::new_prewarm(state_provider, caches));
583 }
584
585 let state_provider = StateProviderDatabase::new(state_provider);
586
587 let mut evm_env = self.env.evm_env.clone();
588
589 evm_env.cfg_env.disable_nonce_check = true;
592
593 evm_env.cfg_env.disable_balance_check = true;
596
597 let spec_id = *evm_env.spec_id();
599 let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
600
601 if !self.precompile_cache_disabled {
602 evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
604 CachedPrecompile::wrap(
605 precompile,
606 self.precompile_cache_map.cache_for_address(*address),
607 spec_id,
608 None, )
610 });
611 }
612
613 Some(evm)
614 }
615
616 #[inline]
618 pub fn should_stop(&self) -> bool {
619 self.terminate_execution.load(Ordering::Relaxed)
620 }
621
622 #[inline]
624 pub fn stop(&self) {
625 self.terminate_execution.store(true, Ordering::Relaxed);
626 }
627
628 fn send_bal_hashed_state(
637 &self,
638 parent_span: &Span,
639 provider: &mut Option<Box<dyn AccountReader>>,
640 account_changes: &alloy_eip7928::AccountChanges,
641 to_sparse_trie_task: &CrossbeamSender<StateRootMessage>,
642 ) {
643 if self.disable_bal_parallel_state_root {
644 return;
645 }
646 let address = account_changes.address;
647 let mut hashed_address = None;
648 let account_fields = BalAccountStateFields::from_changes(account_changes);
649
650 if !bal_account_changes_state_root(account_changes, account_fields) {
651 return;
652 }
653
654 if !account_changes.storage_changes.is_empty() {
655 let hashed_address = *hashed_address.get_or_insert_with(|| keccak256(address));
656 let mut storage_map = reth_trie::HashedStorage::new(false);
657
658 for slot_changes in &account_changes.storage_changes {
659 let hashed_slot = keccak256(slot_changes.slot.to_be_bytes::<32>());
660 if let Some(last_change) = slot_changes.changes.last() {
661 storage_map.storage.insert(hashed_slot, last_change.new_value);
662 }
663 }
664
665 let mut hashed_state = reth_trie::HashedPostState::default();
666 hashed_state.storages.insert(hashed_address, storage_map);
667 let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
668 }
669
670 let existing_account = if account_fields.needs_parent_account() {
671 if provider.is_none() {
672 let _span = debug_span!(
673 target: "engine::tree::payload_processor::prewarm",
674 parent: parent_span,
675 "bal_hashed_state_provider_init",
676 has_saved_cache = !self.disable_bal_batch_io && self.saved_cache.is_some(),
677 )
678 .entered();
679
680 let inner = match self.provider.build() {
681 Ok(p) => p,
682 Err(err) => {
683 warn!(
684 target: "engine::tree::payload_processor::prewarm",
685 ?err,
686 "Failed to build provider for BAL account reads"
687 );
688 return;
689 }
690 };
691 let boxed: Box<dyn AccountReader> =
692 match (self.disable_bal_batch_io, &self.saved_cache) {
693 (false, Some(saved)) => {
694 let caches = saved.cache().clone();
695 Box::new(CachedStateProvider::new_prewarm(inner, caches))
696 }
697 _ => Box::new(inner),
698 };
699 *provider = Some(boxed);
700 }
701 let account_reader = provider.as_ref().expect("provider just initialized");
702 account_reader.basic_account(&address).ok().flatten()
703 } else {
704 None
705 };
706
707 let account = account_fields.into_account(existing_account);
708
709 let hashed_address = hashed_address.unwrap_or_else(|| keccak256(address));
710 let mut hashed_state = reth_trie::HashedPostState::default();
711 hashed_state.accounts.insert(hashed_address, Some(account));
712
713 let _ = to_sparse_trie_task.send(StateRootMessage::HashedStateUpdate(hashed_state));
714 }
715}
716
717#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
718struct BalAccountStateFields {
719 balance: Option<U256>,
720 nonce: Option<u64>,
721 code_hash: Option<B256>,
722}
723
724impl BalAccountStateFields {
725 fn from_changes(account_changes: &alloy_eip7928::AccountChanges) -> Self {
726 Self {
727 balance: account_changes.balance_changes.last().map(|change| change.post_balance),
728 nonce: account_changes.nonce_changes.last().map(|change| change.new_nonce),
729 code_hash: account_changes.code_changes.last().map(|code_change| {
730 if code_change.new_code.is_empty() {
731 alloy_consensus::constants::KECCAK_EMPTY
732 } else {
733 keccak256(&code_change.new_code)
734 }
735 }),
736 }
737 }
738
739 const fn is_empty(self) -> bool {
740 self.balance.is_none() && self.nonce.is_none() && self.code_hash.is_none()
741 }
742
743 const fn needs_parent_account(self) -> bool {
744 self.balance.is_none() || self.nonce.is_none() || self.code_hash.is_none()
745 }
746
747 fn into_account(self, existing_account: Option<Account>) -> Account {
748 let existing_account = existing_account.as_ref();
749 Account {
750 balance: self.balance.unwrap_or_else(|| {
751 existing_account
752 .map(|account| account.balance)
753 .unwrap_or(alloy_primitives::U256::ZERO)
754 }),
755 nonce: self
756 .nonce
757 .unwrap_or_else(|| existing_account.map(|account| account.nonce).unwrap_or(0)),
758 bytecode_hash: self.code_hash.or_else(|| {
759 existing_account
760 .and_then(|account| account.bytecode_hash)
761 .or(Some(alloy_consensus::constants::KECCAK_EMPTY))
762 }),
763 }
764 }
765}
766
767const fn bal_account_changes_state_root(
768 account_changes: &alloy_eip7928::AccountChanges,
769 account_fields: BalAccountStateFields,
770) -> bool {
771 !account_fields.is_empty() || !account_changes.storage_changes.is_empty()
772}
773
774fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
779 MultiProofTargetsV2 {
780 account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
781 ..Default::default()
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use super::*;
788 use alloy_eip7928::{
789 AccountChanges, BalanceChange, BlockAccessIndex, CodeChange, NonceChange, SlotChanges,
790 StorageChange,
791 };
792 use alloy_primitives::{address, bytes};
793
794 #[test]
795 fn bal_read_only_account_does_not_change_state_root() {
796 let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
797 .with_storage_read(U256::from(1));
798 let fields = BalAccountStateFields::from_changes(&changes);
799
800 assert!(fields.is_empty());
801 assert!(!bal_account_changes_state_root(&changes, fields));
802 }
803
804 #[test]
805 fn bal_account_with_all_leaf_fields_does_not_need_parent_account() {
806 let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
807 .with_balance_change(BalanceChange::new(BlockAccessIndex::new(1), U256::from(10)))
808 .with_nonce_change(NonceChange::new(BlockAccessIndex::new(1), 7))
809 .with_code_change(CodeChange::new(BlockAccessIndex::new(1), bytes!("6001600155")));
810 let fields = BalAccountStateFields::from_changes(&changes);
811
812 assert!(bal_account_changes_state_root(&changes, fields));
813 assert!(!fields.needs_parent_account());
814 }
815
816 #[test]
817 fn bal_storage_change_needs_parent_account_when_leaf_fields_missing() {
818 let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
819 .with_storage_change(SlotChanges::new(
820 U256::from(1),
821 vec![StorageChange::new(BlockAccessIndex::new(1), U256::from(2))],
822 ));
823 let fields = BalAccountStateFields::from_changes(&changes);
824
825 assert!(bal_account_changes_state_root(&changes, fields));
826 assert!(fields.needs_parent_account());
827 }
828
829 #[test]
830 fn bal_account_uses_existing_fields_only_when_missing() {
831 let changes = AccountChanges::new(address!("0000000000000000000000000000000000000001"))
832 .with_balance_change(BalanceChange::new(BlockAccessIndex::new(1), U256::from(10)));
833 let fields = BalAccountStateFields::from_changes(&changes);
834 let account = fields.into_account(Some(Account {
835 balance: U256::from(1),
836 nonce: 3,
837 bytecode_hash: Some(B256::repeat_byte(0xaa)),
838 }));
839
840 assert_eq!(account.balance, U256::from(10));
841 assert_eq!(account.nonce, 3);
842 assert_eq!(account.bytecode_hash, Some(B256::repeat_byte(0xaa)));
843 }
844}
845
846#[derive(Debug)]
851pub enum PrewarmTaskEvent<R> {
852 TerminateTransactionExecution,
854 Terminate {
857 execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
860 valid_block_rx: mpsc::Receiver<()>,
865 },
866 FinishedTxExecution {
868 executed_transactions: usize,
870 },
871}
872
873#[derive(Metrics, Clone)]
875#[metrics(scope = "sync.prewarm")]
876pub struct PrewarmMetrics {
877 pub(crate) transactions: Gauge,
879 pub(crate) transactions_histogram: Histogram,
881 pub(crate) total_runtime: Histogram,
883 pub(crate) execution_duration: Histogram,
885 pub(crate) prefetch_storage_targets: Histogram,
887 pub(crate) cache_saving_duration: Gauge,
889 pub(crate) transaction_errors: Counter,
891 pub(crate) bal_slot_iteration_duration: Histogram,
893}