1use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{
6 keccak256,
7 map::{B256Set, HashSet},
8 B256,
9};
10use derive_more::derive::Deref;
11use metrics::Histogram;
12use reth_errors::ProviderError;
13use reth_metrics::Metrics;
14use reth_provider::{
15 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx,
16 StateCommitmentProvider,
17};
18use reth_revm::state::EvmState;
19use reth_trie::{
20 prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, HashedPostState,
21 HashedPostStateSorted, HashedStorage, MultiProof, MultiProofTargets, TrieInput,
22};
23use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle};
24use std::{
25 collections::{BTreeMap, VecDeque},
26 ops::DerefMut,
27 sync::{
28 mpsc::{channel, Receiver, Sender},
29 Arc,
30 },
31 time::{Duration, Instant},
32};
33use tracing::{debug, error, trace};
34
35const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
37
38#[derive(Default, Debug)]
41pub struct SparseTrieUpdate {
42 pub(crate) state: HashedPostState,
44 pub(crate) multiproof: MultiProof,
46}
47
48impl SparseTrieUpdate {
49 pub(super) fn is_empty(&self) -> bool {
51 self.state.is_empty() && self.multiproof.is_empty()
52 }
53
54 #[cfg(test)]
56 pub(super) fn from_multiproof(multiproof: MultiProof) -> Self {
57 Self { multiproof, ..Default::default() }
58 }
59
60 pub(super) fn extend(&mut self, other: Self) {
62 self.state.extend(other.state);
63 self.multiproof.extend(other.multiproof);
64 }
65}
66
67#[derive(Debug, Clone)]
69pub(super) struct MultiProofConfig<Factory> {
70 pub consistent_view: ConsistentDbView<Factory>,
72 pub nodes_sorted: Arc<TrieUpdatesSorted>,
75 pub state_sorted: Arc<HashedPostStateSorted>,
77 pub prefix_sets: Arc<TriePrefixSetsMut>,
81}
82
83impl<Factory> MultiProofConfig<Factory> {
84 pub(super) fn new_from_input(
86 consistent_view: ConsistentDbView<Factory>,
87 input: TrieInput,
88 ) -> Self {
89 Self {
90 consistent_view,
91 nodes_sorted: Arc::new(input.nodes.into_sorted()),
92 state_sorted: Arc::new(input.state.into_sorted()),
93 prefix_sets: Arc::new(input.prefix_sets),
94 }
95 }
96}
97
98#[derive(Debug)]
100pub(super) enum MultiProofMessage {
101 PrefetchProofs(MultiProofTargets),
103 StateUpdate(StateChangeSource, EvmState),
105 EmptyProof {
110 sequence_number: u64,
112 state: HashedPostState,
114 },
115 ProofCalculated(Box<ProofCalculated>),
117 ProofCalculationError(ProviderError),
119 FinishedStateUpdates,
124}
125
126#[derive(Debug)]
128pub(super) struct ProofCalculated {
129 sequence_number: u64,
131 update: SparseTrieUpdate,
133 elapsed: Duration,
135}
136
137#[derive(Debug, Default)]
139struct ProofSequencer {
140 next_sequence: u64,
142 next_to_deliver: u64,
144 pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
146}
147
148impl ProofSequencer {
149 const fn next_sequence(&mut self) -> u64 {
151 let seq = self.next_sequence;
152 self.next_sequence += 1;
153 seq
154 }
155
156 fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
159 if sequence >= self.next_to_deliver {
160 self.pending_proofs.insert(sequence, update);
161 }
162
163 if !self.pending_proofs.contains_key(&self.next_to_deliver) {
165 return Vec::new()
166 }
167
168 let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
169 let mut current_sequence = self.next_to_deliver;
170
171 while let Some(pending) = self.pending_proofs.remove(¤t_sequence) {
173 consecutive_proofs.push(pending);
174 current_sequence += 1;
175
176 if !self.pending_proofs.contains_key(¤t_sequence) {
178 break;
179 }
180 }
181
182 self.next_to_deliver += consecutive_proofs.len() as u64;
183
184 consecutive_proofs
185 }
186
187 pub(crate) fn has_pending(&self) -> bool {
189 !self.pending_proofs.is_empty()
190 }
191}
192
193#[derive(Deref, Debug)]
199pub(super) struct StateHookSender(Sender<MultiProofMessage>);
200
201impl StateHookSender {
202 pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
203 Self(inner)
204 }
205}
206
207impl Drop for StateHookSender {
208 fn drop(&mut self) {
209 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
211 }
212}
213
214pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
215 let mut hashed_state = HashedPostState::with_capacity(update.len());
216
217 for (address, account) in update {
218 if account.is_touched() {
219 let hashed_address = keccak256(address);
220 trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
221
222 let destroyed = account.is_selfdestructed();
223 let info = if destroyed { None } else { Some(account.info.into()) };
224 hashed_state.accounts.insert(hashed_address, info);
225
226 let mut changed_storage_iter = account
227 .storage
228 .into_iter()
229 .filter(|(_slot, value)| value.is_changed())
230 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
231 .peekable();
232
233 if destroyed {
234 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
235 } else if changed_storage_iter.peek().is_some() {
236 hashed_state
237 .storages
238 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
239 }
240 }
241 }
242
243 hashed_state
244}
245
246#[derive(Debug)]
248enum PendingMultiproofTask<Factory> {
249 Storage(StorageMultiproofInput<Factory>),
251 Regular(MultiproofInput<Factory>),
253}
254
255impl<Factory> PendingMultiproofTask<Factory> {
256 const fn proof_sequence_number(&self) -> u64 {
258 match self {
259 Self::Storage(input) => input.proof_sequence_number,
260 Self::Regular(input) => input.proof_sequence_number,
261 }
262 }
263
264 fn proof_targets_is_empty(&self) -> bool {
266 match self {
267 Self::Storage(input) => input.proof_targets.is_empty(),
268 Self::Regular(input) => input.proof_targets.is_empty(),
269 }
270 }
271
272 fn send_empty_proof(self) {
274 match self {
275 Self::Storage(input) => input.send_empty_proof(),
276 Self::Regular(input) => input.send_empty_proof(),
277 }
278 }
279}
280
281impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
282 fn from(input: StorageMultiproofInput<Factory>) -> Self {
283 Self::Storage(input)
284 }
285}
286
287impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
288 fn from(input: MultiproofInput<Factory>) -> Self {
289 Self::Regular(input)
290 }
291}
292
293#[derive(Debug)]
295struct StorageMultiproofInput<Factory> {
296 config: MultiProofConfig<Factory>,
297 source: Option<StateChangeSource>,
298 hashed_state_update: HashedPostState,
299 hashed_address: B256,
300 proof_targets: B256Set,
301 proof_sequence_number: u64,
302 state_root_message_sender: Sender<MultiProofMessage>,
303}
304
305impl<Factory> StorageMultiproofInput<Factory> {
306 fn send_empty_proof(self) {
308 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
309 sequence_number: self.proof_sequence_number,
310 state: self.hashed_state_update,
311 });
312 }
313}
314
315#[derive(Debug)]
317struct MultiproofInput<Factory> {
318 config: MultiProofConfig<Factory>,
319 source: Option<StateChangeSource>,
320 hashed_state_update: HashedPostState,
321 proof_targets: MultiProofTargets,
322 proof_sequence_number: u64,
323 state_root_message_sender: Sender<MultiProofMessage>,
324}
325
326impl<Factory> MultiproofInput<Factory> {
327 fn send_empty_proof(self) {
329 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
330 sequence_number: self.proof_sequence_number,
331 state: self.hashed_state_update,
332 });
333 }
334}
335
336#[derive(Debug)]
341pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
342 max_concurrent: usize,
344 inflight: usize,
346 pending: VecDeque<PendingMultiproofTask<Factory>>,
348 executor: WorkloadExecutor,
350 storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
352 metrics: MultiProofTaskMetrics,
354}
355
356impl<Factory> MultiproofManager<Factory>
357where
358 Factory:
359 DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
360{
361 fn new(
363 executor: WorkloadExecutor,
364 metrics: MultiProofTaskMetrics,
365 storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
366 max_concurrent: usize,
367 ) -> Self {
368 Self {
369 pending: VecDeque::with_capacity(max_concurrent),
370 max_concurrent,
371 executor,
372 inflight: 0,
373 metrics,
374 storage_proof_task_handle,
375 }
376 }
377
378 fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
381 if input.proof_targets_is_empty() {
383 debug!(
384 sequence_number = input.proof_sequence_number(),
385 "No proof targets, sending empty multiproof back immediately"
386 );
387 input.send_empty_proof();
388 return
389 }
390
391 if self.inflight >= self.max_concurrent {
392 self.pending.push_back(input);
393 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
394 return;
395 }
396
397 self.spawn_multiproof_task(input);
398 }
399
400 fn on_calculation_complete(&mut self) {
403 self.inflight = self.inflight.saturating_sub(1);
404 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
405
406 if let Some(input) = self.pending.pop_front() {
407 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
408 self.spawn_multiproof_task(input);
409 }
410 }
411
412 fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
415 match input {
416 PendingMultiproofTask::Storage(storage_input) => {
417 self.spawn_storage_proof(storage_input);
418 }
419 PendingMultiproofTask::Regular(multiproof_input) => {
420 self.spawn_multiproof(multiproof_input);
421 }
422 }
423 }
424
425 fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
427 let StorageMultiproofInput {
428 config,
429 source,
430 hashed_state_update,
431 hashed_address,
432 proof_targets,
433 proof_sequence_number,
434 state_root_message_sender,
435 } = storage_multiproof_input;
436
437 let storage_proof_task_handle = self.storage_proof_task_handle.clone();
438
439 self.executor.spawn_blocking(move || {
440 let storage_targets = proof_targets.len();
441
442 trace!(
443 target: "engine::root",
444 proof_sequence_number,
445 ?proof_targets,
446 storage_targets,
447 "Starting dedicated storage proof calculation",
448 );
449 let start = Instant::now();
450 let result = ParallelProof::new(
451 config.consistent_view,
452 config.nodes_sorted,
453 config.state_sorted,
454 config.prefix_sets,
455 storage_proof_task_handle.clone(),
456 )
457 .with_branch_node_masks(true)
458 .storage_proof(hashed_address, proof_targets);
459 let elapsed = start.elapsed();
460 trace!(
461 target: "engine::root",
462 proof_sequence_number,
463 ?elapsed,
464 ?source,
465 storage_targets,
466 "Storage multiproofs calculated",
467 );
468
469 match result {
470 Ok(proof) => {
471 let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
472 Box::new(ProofCalculated {
473 sequence_number: proof_sequence_number,
474 update: SparseTrieUpdate {
475 state: hashed_state_update,
476 multiproof: MultiProof::from_storage_proof(hashed_address, proof),
477 },
478 elapsed,
479 }),
480 ));
481 }
482 Err(error) => {
483 let _ = state_root_message_sender
484 .send(MultiProofMessage::ProofCalculationError(error.into()));
485 }
486 }
487 });
488
489 self.inflight += 1;
490 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
491 }
492
493 fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
495 let MultiproofInput {
496 config,
497 source,
498 hashed_state_update,
499 proof_targets,
500 proof_sequence_number,
501 state_root_message_sender,
502 } = multiproof_input;
503 let storage_proof_task_handle = self.storage_proof_task_handle.clone();
504
505 self.executor.spawn_blocking(move || {
506 let account_targets = proof_targets.len();
507 let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
508
509 trace!(
510 target: "engine::root",
511 proof_sequence_number,
512 ?proof_targets,
513 account_targets,
514 storage_targets,
515 "Starting multiproof calculation",
516 );
517 let start = Instant::now();
518 let result = ParallelProof::new(
519 config.consistent_view,
520 config.nodes_sorted,
521 config.state_sorted,
522 config.prefix_sets,
523 storage_proof_task_handle.clone(),
524 )
525 .with_branch_node_masks(true)
526 .multiproof(proof_targets);
527 let elapsed = start.elapsed();
528 trace!(
529 target: "engine::root",
530 proof_sequence_number,
531 ?elapsed,
532 ?source,
533 account_targets,
534 storage_targets,
535 "Multiproof calculated",
536 );
537
538 match result {
539 Ok(proof) => {
540 let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
541 Box::new(ProofCalculated {
542 sequence_number: proof_sequence_number,
543 update: SparseTrieUpdate {
544 state: hashed_state_update,
545 multiproof: proof,
546 },
547 elapsed,
548 }),
549 ));
550 }
551 Err(error) => {
552 let _ = state_root_message_sender
553 .send(MultiProofMessage::ProofCalculationError(error.into()));
554 }
555 }
556 });
557
558 self.inflight += 1;
559 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
560 }
561}
562
563#[derive(Metrics, Clone)]
564#[metrics(scope = "tree.root")]
565pub(crate) struct MultiProofTaskMetrics {
566 pub inflight_multiproofs_histogram: Histogram,
568 pub pending_multiproofs_histogram: Histogram,
570
571 pub prefetch_proof_targets_accounts_histogram: Histogram,
573 pub prefetch_proof_targets_storages_histogram: Histogram,
575 pub prefetch_proof_chunks_histogram: Histogram,
577
578 pub state_update_proof_targets_accounts_histogram: Histogram,
580 pub state_update_proof_targets_storages_histogram: Histogram,
582 pub state_update_proof_chunks_histogram: Histogram,
584
585 pub proof_calculation_duration_histogram: Histogram,
587
588 pub sparse_trie_update_duration_histogram: Histogram,
590 pub sparse_trie_final_update_duration_histogram: Histogram,
592 pub sparse_trie_total_duration_histogram: Histogram,
594
595 pub state_updates_received_histogram: Histogram,
597 pub proofs_processed_histogram: Histogram,
599 pub multiproof_task_total_duration_histogram: Histogram,
601 pub first_update_wait_time_histogram: Histogram,
603 pub last_proof_wait_time_histogram: Histogram,
605}
606
607#[derive(Debug)]
617pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
618 config: MultiProofConfig<Factory>,
620 rx: Receiver<MultiProofMessage>,
622 tx: Sender<MultiProofMessage>,
624 to_sparse_trie: Sender<SparseTrieUpdate>,
626 fetched_proof_targets: MultiProofTargets,
628 proof_sequencer: ProofSequencer,
630 multiproof_manager: MultiproofManager<Factory>,
632 metrics: MultiProofTaskMetrics,
634}
635
636impl<Factory> MultiProofTask<Factory>
637where
638 Factory:
639 DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
640{
641 pub(super) fn new(
643 config: MultiProofConfig<Factory>,
644 executor: WorkloadExecutor,
645 proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
646 to_sparse_trie: Sender<SparseTrieUpdate>,
647 max_concurrency: usize,
648 ) -> Self {
649 let (tx, rx) = channel();
650 let metrics = MultiProofTaskMetrics::default();
651
652 Self {
653 config,
654 rx,
655 tx,
656 to_sparse_trie,
657 fetched_proof_targets: Default::default(),
658 proof_sequencer: ProofSequencer::default(),
659 multiproof_manager: MultiproofManager::new(
660 executor,
661 metrics.clone(),
662 proof_task_handle,
663 max_concurrency,
664 ),
665 metrics,
666 }
667 }
668
669 pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
671 self.tx.clone()
672 }
673
674 fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
678 let proof_targets = self.get_prefetch_proof_targets(targets);
679 self.fetched_proof_targets.extend_ref(&proof_targets);
680
681 self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
682 self.metrics
683 .prefetch_proof_targets_storages_histogram
684 .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
685
686 let mut chunks = 0;
688 for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
689 self.multiproof_manager.spawn_or_queue(
690 MultiproofInput {
691 config: self.config.clone(),
692 source: None,
693 hashed_state_update: Default::default(),
694 proof_targets: proof_targets_chunk,
695 proof_sequence_number: self.proof_sequencer.next_sequence(),
696 state_root_message_sender: self.tx.clone(),
697 }
698 .into(),
699 );
700 chunks += 1;
701 }
702 self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
703
704 chunks
705 }
706
707 fn is_done(
709 &self,
710 proofs_processed: u64,
711 state_update_proofs_requested: u64,
712 prefetch_proofs_requested: u64,
713 updates_finished: bool,
714 ) -> bool {
715 let all_proofs_processed =
716 proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
717 let no_pending = !self.proof_sequencer.has_pending();
718 debug!(
719 target: "engine::root",
720 proofs_processed,
721 state_update_proofs_requested,
722 prefetch_proofs_requested,
723 no_pending,
724 updates_finished,
725 "Checking end condition"
726 );
727 all_proofs_processed && no_pending && updates_finished
728 }
729
730 fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
732 let mut duplicates = 0;
736
737 targets.retain(|hashed_address, target_storage| {
739 let keep = self
740 .fetched_proof_targets
741 .get(hashed_address)
742 .is_none_or(|fetched_storage| {
744 !target_storage.is_subset(fetched_storage)
746 });
747
748 if !keep {
749 duplicates += target_storage.len();
750 }
751
752 keep
753 });
754
755 for (hashed_address, target_storage) in targets.deref_mut() {
757 let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
758 continue
761 };
762
763 let prev_target_storage_len = target_storage.len();
764
765 target_storage.retain(|slot| !fetched_storage.contains(slot));
769
770 duplicates += prev_target_storage_len - target_storage.len();
771 }
772
773 if duplicates > 0 {
774 trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
775 }
776
777 targets
778 }
779
780 fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
784 let hashed_state_update = evm_state_to_hashed_post_state(update);
785 let (fetched_state_update, not_fetched_state_update) =
788 hashed_state_update.partition_by_targets(&self.fetched_proof_targets);
789
790 let mut state_updates = 0;
791 if !fetched_state_update.is_empty() {
794 let _ = self.tx.send(MultiProofMessage::EmptyProof {
795 sequence_number: self.proof_sequencer.next_sequence(),
796 state: fetched_state_update,
797 });
798 state_updates += 1;
799 }
800
801 let mut chunks = 0;
803 let mut spawned_proof_targets = MultiProofTargets::default();
804 for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
805 let proof_targets = get_proof_targets(&chunk, &self.fetched_proof_targets);
806 spawned_proof_targets.extend_ref(&proof_targets);
807
808 self.multiproof_manager.spawn_or_queue(
809 MultiproofInput {
810 config: self.config.clone(),
811 source: Some(source),
812 hashed_state_update: chunk,
813 proof_targets,
814 proof_sequence_number: self.proof_sequencer.next_sequence(),
815 state_root_message_sender: self.tx.clone(),
816 }
817 .into(),
818 );
819 chunks += 1;
820 }
821
822 self.metrics
823 .state_update_proof_targets_accounts_histogram
824 .record(spawned_proof_targets.len() as f64);
825 self.metrics
826 .state_update_proof_targets_storages_histogram
827 .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
828 self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
829
830 self.fetched_proof_targets.extend(spawned_proof_targets);
831
832 state_updates + chunks
833 }
834
835 fn on_proof(
837 &mut self,
838 sequence_number: u64,
839 update: SparseTrieUpdate,
840 ) -> Option<SparseTrieUpdate> {
841 let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
842
843 ready_proofs
844 .into_iter()
845 .reduce(|mut acc_update, update| {
847 acc_update.extend(update);
848 acc_update
849 })
850 .filter(|proof| !proof.is_empty())
852 }
853
854 pub(crate) fn run(mut self) {
889 let mut prefetch_proofs_requested = 0;
891 let mut state_update_proofs_requested = 0;
892 let mut proofs_processed = 0;
893
894 let mut updates_finished = false;
895
896 let start = Instant::now();
898
899 let mut first_update_time = None;
901 let mut updates_finished_time = None;
903
904 loop {
905 trace!(target: "engine::root", "entering main channel receiving loop");
906 match self.rx.recv() {
907 Ok(message) => match message {
908 MultiProofMessage::PrefetchProofs(targets) => {
909 trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
910 if first_update_time.is_none() {
911 self.metrics
913 .first_update_wait_time_histogram
914 .record(start.elapsed().as_secs_f64());
915 first_update_time = Some(Instant::now());
916 debug!(target: "engine::root", "Started state root calculation");
917 }
918
919 let account_targets = targets.len();
920 let storage_targets =
921 targets.values().map(|slots| slots.len()).sum::<usize>();
922 prefetch_proofs_requested += self.on_prefetch_proof(targets);
923 debug!(
924 target: "engine::root",
925 account_targets,
926 storage_targets,
927 prefetch_proofs_requested,
928 "Prefetching proofs"
929 );
930 }
931 MultiProofMessage::StateUpdate(source, update) => {
932 trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
933 if first_update_time.is_none() {
934 self.metrics
936 .first_update_wait_time_histogram
937 .record(start.elapsed().as_secs_f64());
938 first_update_time = Some(Instant::now());
939 debug!(target: "engine::root", "Started state root calculation");
940 }
941
942 let len = update.len();
943 state_update_proofs_requested += self.on_state_update(source, update);
944 debug!(
945 target: "engine::root",
946 ?source,
947 len,
948 ?state_update_proofs_requested,
949 "Received new state update"
950 );
951 }
952 MultiProofMessage::FinishedStateUpdates => {
953 trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
954 updates_finished = true;
955 updates_finished_time = Some(Instant::now());
956 if self.is_done(
957 proofs_processed,
958 state_update_proofs_requested,
959 prefetch_proofs_requested,
960 updates_finished,
961 ) {
962 debug!(
963 target: "engine::root",
964 "State updates finished and all proofs processed, ending calculation"
965 );
966 break
967 }
968 }
969 MultiProofMessage::EmptyProof { sequence_number, state } => {
970 trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
971
972 proofs_processed += 1;
973
974 if let Some(combined_update) = self.on_proof(
975 sequence_number,
976 SparseTrieUpdate { state, multiproof: MultiProof::default() },
977 ) {
978 let _ = self.to_sparse_trie.send(combined_update);
979 }
980
981 if self.is_done(
982 proofs_processed,
983 state_update_proofs_requested,
984 prefetch_proofs_requested,
985 updates_finished,
986 ) {
987 debug!(
988 target: "engine::root",
989 "State updates finished and all proofs processed, ending calculation"
990 );
991 break
992 }
993 }
994 MultiProofMessage::ProofCalculated(proof_calculated) => {
995 trace!(target: "engine::root", "processing
996 MultiProofMessage::ProofCalculated");
997
998 proofs_processed += 1;
1001
1002 self.metrics
1003 .proof_calculation_duration_histogram
1004 .record(proof_calculated.elapsed);
1005
1006 debug!(
1007 target: "engine::root",
1008 sequence = proof_calculated.sequence_number,
1009 total_proofs = proofs_processed,
1010 "Processing calculated proof"
1011 );
1012
1013 self.multiproof_manager.on_calculation_complete();
1014
1015 if let Some(combined_update) =
1016 self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
1017 {
1018 let _ = self.to_sparse_trie.send(combined_update);
1019 }
1020
1021 if self.is_done(
1022 proofs_processed,
1023 state_update_proofs_requested,
1024 prefetch_proofs_requested,
1025 updates_finished,
1026 ) {
1027 debug!(
1028 target: "engine::root",
1029 "State updates finished and all proofs processed, ending calculation");
1030 break
1031 }
1032 }
1033 MultiProofMessage::ProofCalculationError(err) => {
1034 error!(
1035 target: "engine::root",
1036 ?err,
1037 "proof calculation error"
1038 );
1039 return
1040 }
1041 },
1042 Err(_) => {
1043 error!(
1046 target: "engine::root",
1047 "Internal message channel closed unexpectedly"
1048 );
1049 }
1050 }
1051 }
1052
1053 debug!(
1054 target: "engine::root",
1055 total_updates = state_update_proofs_requested,
1056 total_proofs = proofs_processed,
1057 total_time = ?first_update_time.map(|t|t.elapsed()),
1058 time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1059 "All proofs processed, ending calculation"
1060 );
1061
1062 self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1064 self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1065 if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1066 self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1067 }
1068
1069 if let Some(updates_finished_time) = updates_finished_time {
1070 self.metrics
1071 .last_proof_wait_time_histogram
1072 .record(updates_finished_time.elapsed().as_secs_f64());
1073 }
1074 }
1075}
1076
1077fn get_proof_targets(
1081 state_update: &HashedPostState,
1082 fetched_proof_targets: &MultiProofTargets,
1083) -> MultiProofTargets {
1084 let mut targets = MultiProofTargets::default();
1085
1086 for &hashed_address in state_update.accounts.keys() {
1088 if !fetched_proof_targets.contains_key(&hashed_address) {
1089 targets.insert(hashed_address, HashSet::default());
1090 }
1091 }
1092
1093 for (hashed_address, storage) in &state_update.storages {
1095 let fetched = fetched_proof_targets.get(hashed_address);
1096 let mut changed_slots = storage
1097 .storage
1098 .keys()
1099 .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
1100 .peekable();
1101
1102 if storage.wiped && fetched.is_none() {
1104 targets.entry(*hashed_address).or_default();
1105 }
1106
1107 if changed_slots.peek().is_some() {
1108 targets.entry(*hashed_address).or_default().extend(changed_slots);
1109 }
1110 }
1111
1112 targets
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117 use super::*;
1118 use alloy_primitives::map::B256Set;
1119 use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
1120 use reth_trie::TrieInput;
1121 use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1122 use revm_primitives::{B256, U256};
1123 use std::sync::Arc;
1124
1125 fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
1126 where
1127 F: DatabaseProviderFactory<Provider: BlockReader>
1128 + StateCommitmentProvider
1129 + Clone
1130 + 'static,
1131 {
1132 let consistent_view = ConsistentDbView::new(factory, None);
1133 let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
1134 let state_sorted = Arc::new(input.state.clone().into_sorted());
1135 let prefix_sets = Arc::new(input.prefix_sets);
1136
1137 MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
1138 }
1139
1140 fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
1141 where
1142 F: DatabaseProviderFactory<Provider: BlockReader>
1143 + StateCommitmentProvider
1144 + Clone
1145 + 'static,
1146 {
1147 let executor = WorkloadExecutor::default();
1148 let config = create_state_root_config(factory, TrieInput::default());
1149 let task_ctx = ProofTaskCtx::new(
1150 config.nodes_sorted.clone(),
1151 config.state_sorted.clone(),
1152 config.prefix_sets.clone(),
1153 );
1154 let proof_task = ProofTaskManager::new(
1155 executor.handle().clone(),
1156 config.consistent_view.clone(),
1157 task_ctx,
1158 1,
1159 );
1160 let channel = channel();
1161
1162 MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1)
1163 }
1164
1165 #[test]
1166 fn test_add_proof_in_sequence() {
1167 let mut sequencer = ProofSequencer::default();
1168 let proof1 = MultiProof::default();
1169 let proof2 = MultiProof::default();
1170 sequencer.next_sequence = 2;
1171
1172 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1173 assert_eq!(ready.len(), 1);
1174 assert!(!sequencer.has_pending());
1175
1176 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
1177 assert_eq!(ready.len(), 1);
1178 assert!(!sequencer.has_pending());
1179 }
1180
1181 #[test]
1182 fn test_add_proof_out_of_order() {
1183 let mut sequencer = ProofSequencer::default();
1184 let proof1 = MultiProof::default();
1185 let proof2 = MultiProof::default();
1186 let proof3 = MultiProof::default();
1187 sequencer.next_sequence = 3;
1188
1189 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
1190 assert_eq!(ready.len(), 0);
1191 assert!(sequencer.has_pending());
1192
1193 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1194 assert_eq!(ready.len(), 1);
1195 assert!(sequencer.has_pending());
1196
1197 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
1198 assert_eq!(ready.len(), 2);
1199 assert!(!sequencer.has_pending());
1200 }
1201
1202 #[test]
1203 fn test_add_proof_with_gaps() {
1204 let mut sequencer = ProofSequencer::default();
1205 let proof1 = MultiProof::default();
1206 let proof3 = MultiProof::default();
1207 sequencer.next_sequence = 3;
1208
1209 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1210 assert_eq!(ready.len(), 1);
1211
1212 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
1213 assert_eq!(ready.len(), 0);
1214 assert!(sequencer.has_pending());
1215 }
1216
1217 #[test]
1218 fn test_add_proof_duplicate_sequence() {
1219 let mut sequencer = ProofSequencer::default();
1220 let proof1 = MultiProof::default();
1221 let proof2 = MultiProof::default();
1222
1223 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1224 assert_eq!(ready.len(), 1);
1225
1226 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2));
1227 assert_eq!(ready.len(), 0);
1228 assert!(!sequencer.has_pending());
1229 }
1230
1231 #[test]
1232 fn test_add_proof_batch_processing() {
1233 let mut sequencer = ProofSequencer::default();
1234 let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1235 sequencer.next_sequence = 5;
1236
1237 sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()));
1238 sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()));
1239 sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()));
1240 sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()));
1241
1242 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()));
1243 assert_eq!(ready.len(), 5);
1244 assert!(!sequencer.has_pending());
1245 }
1246
1247 fn create_get_proof_targets_state() -> HashedPostState {
1248 let mut state = HashedPostState::default();
1249
1250 let addr1 = B256::random();
1251 let addr2 = B256::random();
1252 state.accounts.insert(addr1, Some(Default::default()));
1253 state.accounts.insert(addr2, Some(Default::default()));
1254
1255 let mut storage = HashedStorage::default();
1256 let slot1 = B256::random();
1257 let slot2 = B256::random();
1258 storage.storage.insert(slot1, U256::ZERO);
1259 storage.storage.insert(slot2, U256::from(1));
1260 state.storages.insert(addr1, storage);
1261
1262 state
1263 }
1264
1265 #[test]
1266 fn test_get_proof_targets_new_account_targets() {
1267 let state = create_get_proof_targets_state();
1268 let fetched = MultiProofTargets::default();
1269
1270 let targets = get_proof_targets(&state, &fetched);
1271
1272 assert_eq!(targets.len(), state.accounts.len());
1274 for addr in state.accounts.keys() {
1275 assert!(targets.contains_key(addr));
1276 }
1277 }
1278
1279 #[test]
1280 fn test_get_proof_targets_new_storage_targets() {
1281 let state = create_get_proof_targets_state();
1282 let fetched = MultiProofTargets::default();
1283
1284 let targets = get_proof_targets(&state, &fetched);
1285
1286 for (addr, storage) in &state.storages {
1288 assert!(targets.contains_key(addr));
1289 let target_slots = &targets[addr];
1290 assert_eq!(target_slots.len(), storage.storage.len());
1291 for slot in storage.storage.keys() {
1292 assert!(target_slots.contains(slot));
1293 }
1294 }
1295 }
1296
1297 #[test]
1298 fn test_get_proof_targets_filter_already_fetched_accounts() {
1299 let state = create_get_proof_targets_state();
1300 let mut fetched = MultiProofTargets::default();
1301
1302 let fetched_addr = state
1304 .accounts
1305 .keys()
1306 .find(|&&addr| !state.storages.contains_key(&addr))
1307 .expect("Should have an account without storage");
1308
1309 fetched.insert(*fetched_addr, HashSet::default());
1311
1312 let targets = get_proof_targets(&state, &fetched);
1313
1314 assert!(!targets.contains_key(fetched_addr));
1316 assert_eq!(targets.len(), state.accounts.len() - 1);
1318 }
1319
1320 #[test]
1321 fn test_get_proof_targets_filter_already_fetched_storage() {
1322 let state = create_get_proof_targets_state();
1323 let mut fetched = MultiProofTargets::default();
1324
1325 let (addr, storage) = state.storages.iter().next().unwrap();
1327 let mut fetched_slots = HashSet::default();
1328 let fetched_slot = *storage.storage.keys().next().unwrap();
1329 fetched_slots.insert(fetched_slot);
1330 fetched.insert(*addr, fetched_slots);
1331
1332 let targets = get_proof_targets(&state, &fetched);
1333
1334 let target_slots = &targets[addr];
1336 assert!(!target_slots.contains(&fetched_slot));
1337 assert_eq!(target_slots.len(), storage.storage.len() - 1);
1338 }
1339
1340 #[test]
1341 fn test_get_proof_targets_empty_state() {
1342 let state = HashedPostState::default();
1343 let fetched = MultiProofTargets::default();
1344
1345 let targets = get_proof_targets(&state, &fetched);
1346
1347 assert!(targets.is_empty());
1348 }
1349
1350 #[test]
1351 fn test_get_proof_targets_mixed_fetched_state() {
1352 let mut state = HashedPostState::default();
1353 let mut fetched = MultiProofTargets::default();
1354
1355 let addr1 = B256::random();
1356 let addr2 = B256::random();
1357 let slot1 = B256::random();
1358 let slot2 = B256::random();
1359
1360 state.accounts.insert(addr1, Some(Default::default()));
1361 state.accounts.insert(addr2, Some(Default::default()));
1362
1363 let mut storage = HashedStorage::default();
1364 storage.storage.insert(slot1, U256::ZERO);
1365 storage.storage.insert(slot2, U256::from(1));
1366 state.storages.insert(addr1, storage);
1367
1368 let mut fetched_slots = HashSet::default();
1369 fetched_slots.insert(slot1);
1370 fetched.insert(addr1, fetched_slots);
1371
1372 let targets = get_proof_targets(&state, &fetched);
1373
1374 assert!(targets.contains_key(&addr2));
1375 assert!(!targets[&addr1].contains(&slot1));
1376 assert!(targets[&addr1].contains(&slot2));
1377 }
1378
1379 #[test]
1380 fn test_get_proof_targets_unmodified_account_with_storage() {
1381 let mut state = HashedPostState::default();
1382 let fetched = MultiProofTargets::default();
1383
1384 let addr = B256::random();
1385 let slot1 = B256::random();
1386 let slot2 = B256::random();
1387
1388 let mut storage = HashedStorage::default();
1391 storage.storage.insert(slot1, U256::from(1));
1392 storage.storage.insert(slot2, U256::from(2));
1393 state.storages.insert(addr, storage);
1394
1395 assert!(!state.accounts.contains_key(&addr));
1396 assert!(!fetched.contains_key(&addr));
1397
1398 let targets = get_proof_targets(&state, &fetched);
1399
1400 assert!(targets.contains_key(&addr));
1402
1403 let target_slots = &targets[&addr];
1404 assert_eq!(target_slots.len(), 2);
1405 assert!(target_slots.contains(&slot1));
1406 assert!(target_slots.contains(&slot2));
1407 }
1408
1409 #[test]
1410 fn test_get_prefetch_proof_targets_no_duplicates() {
1411 let test_provider_factory = create_test_provider_factory();
1412 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1413
1414 let mut targets = MultiProofTargets::default();
1416 let addr1 = B256::random();
1417 let addr2 = B256::random();
1418 let slot1 = B256::random();
1419 let slot2 = B256::random();
1420 targets.insert(addr1, vec![slot1].into_iter().collect());
1421 targets.insert(addr2, vec![slot2].into_iter().collect());
1422
1423 let prefetch_proof_targets =
1424 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1425
1426 assert_eq!(prefetch_proof_targets, targets);
1429
1430 let addr3 = B256::random();
1432 let slot3 = B256::random();
1433 test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect());
1434
1435 let prefetch_proof_targets =
1436 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1437
1438 assert_eq!(prefetch_proof_targets, targets);
1441 }
1442
1443 #[test]
1444 fn test_get_prefetch_proof_targets_remove_subset() {
1445 let test_provider_factory = create_test_provider_factory();
1446 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1447
1448 let mut targets = MultiProofTargets::default();
1450 let addr1 = B256::random();
1451 let addr2 = B256::random();
1452 let slot1 = B256::random();
1453 let slot2 = B256::random();
1454 targets.insert(addr1, vec![slot1].into_iter().collect());
1455 targets.insert(addr2, vec![slot2].into_iter().collect());
1456
1457 test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect());
1459
1460 let prefetch_proof_targets =
1461 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1462
1463 assert_eq!(prefetch_proof_targets.len(), 1);
1465 assert!(!prefetch_proof_targets.contains_key(&addr1));
1466 assert!(prefetch_proof_targets.contains_key(&addr2));
1467
1468 let slot3 = B256::random();
1470 targets.get_mut(&addr1).unwrap().insert(slot3);
1471
1472 let prefetch_proof_targets =
1473 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1474
1475 assert_eq!(prefetch_proof_targets.len(), 2);
1478 assert!(prefetch_proof_targets.contains_key(&addr1));
1479 assert_eq!(
1480 *prefetch_proof_targets.get(&addr1).unwrap(),
1481 vec![slot3].into_iter().collect::<B256Set>()
1482 );
1483 assert!(prefetch_proof_targets.contains_key(&addr2));
1484 assert_eq!(
1485 *prefetch_proof_targets.get(&addr2).unwrap(),
1486 vec![slot2].into_iter().collect::<B256Set>()
1487 );
1488 }
1489}