1use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{keccak256, map::HashSet, B256};
6use derive_more::derive::Deref;
7use metrics::Histogram;
8use reth_errors::ProviderError;
9use reth_metrics::Metrics;
10use reth_provider::{
11 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
12};
13use reth_revm::state::EvmState;
14use reth_trie::{
15 prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, HashedPostState,
16 HashedPostStateSorted, HashedStorage, MultiProof, MultiProofTargets, TrieInput,
17};
18use reth_trie_parallel::proof::ParallelProof;
19use std::{
20 collections::{BTreeMap, VecDeque},
21 ops::DerefMut,
22 sync::{
23 mpsc::{channel, Receiver, Sender},
24 Arc,
25 },
26 time::{Duration, Instant},
27};
28use tracing::{debug, error, trace};
29
30const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
32
33#[derive(Default, Debug)]
36pub struct SparseTrieUpdate {
37 pub(crate) state: HashedPostState,
39 pub(crate) multiproof: MultiProof,
41}
42
43impl SparseTrieUpdate {
44 pub(super) fn is_empty(&self) -> bool {
46 self.state.is_empty() && self.multiproof.is_empty()
47 }
48
49 #[cfg(test)]
51 pub(super) fn from_multiproof(multiproof: MultiProof) -> Self {
52 Self { multiproof, ..Default::default() }
53 }
54
55 pub(super) fn extend(&mut self, other: Self) {
57 self.state.extend(other.state);
58 self.multiproof.extend(other.multiproof);
59 }
60}
61
62#[derive(Debug, Clone)]
64pub(super) struct MultiProofConfig<Factory> {
65 pub consistent_view: ConsistentDbView<Factory>,
67 pub nodes_sorted: Arc<TrieUpdatesSorted>,
70 pub state_sorted: Arc<HashedPostStateSorted>,
72 pub prefix_sets: Arc<TriePrefixSetsMut>,
76}
77
78impl<Factory> MultiProofConfig<Factory> {
79 pub(super) fn new_from_input(
81 consistent_view: ConsistentDbView<Factory>,
82 input: TrieInput,
83 ) -> Self {
84 Self {
85 consistent_view,
86 nodes_sorted: Arc::new(input.nodes.into_sorted()),
87 state_sorted: Arc::new(input.state.into_sorted()),
88 prefix_sets: Arc::new(input.prefix_sets),
89 }
90 }
91}
92
93#[derive(Debug)]
95pub(super) enum MultiProofMessage {
96 PrefetchProofs(MultiProofTargets),
98 StateUpdate(StateChangeSource, EvmState),
100 EmptyProof {
105 sequence_number: u64,
107 state: HashedPostState,
109 },
110 ProofCalculated(Box<ProofCalculated>),
112 ProofCalculationError(ProviderError),
114 FinishedStateUpdates,
119}
120
121#[derive(Debug)]
123pub(super) struct ProofCalculated {
124 sequence_number: u64,
126 update: SparseTrieUpdate,
128 elapsed: Duration,
130}
131
132#[derive(Debug, Default)]
134struct ProofSequencer {
135 next_sequence: u64,
137 next_to_deliver: u64,
139 pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
141}
142
143impl ProofSequencer {
144 fn next_sequence(&mut self) -> u64 {
146 let seq = self.next_sequence;
147 self.next_sequence += 1;
148 seq
149 }
150
151 fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
154 if sequence >= self.next_to_deliver {
155 self.pending_proofs.insert(sequence, update);
156 }
157
158 if !self.pending_proofs.contains_key(&self.next_to_deliver) {
160 return Vec::new()
161 }
162
163 let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
164 let mut current_sequence = self.next_to_deliver;
165
166 while let Some(pending) = self.pending_proofs.remove(¤t_sequence) {
168 consecutive_proofs.push(pending);
169 current_sequence += 1;
170
171 if !self.pending_proofs.contains_key(¤t_sequence) {
173 break;
174 }
175 }
176
177 self.next_to_deliver += consecutive_proofs.len() as u64;
178
179 consecutive_proofs
180 }
181
182 pub(crate) fn has_pending(&self) -> bool {
184 !self.pending_proofs.is_empty()
185 }
186}
187
188#[derive(Deref, Debug)]
194pub(super) struct StateHookSender(Sender<MultiProofMessage>);
195
196impl StateHookSender {
197 pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
198 Self(inner)
199 }
200}
201
202impl Drop for StateHookSender {
203 fn drop(&mut self) {
204 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
206 }
207}
208
209pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
210 let mut hashed_state = HashedPostState::with_capacity(update.len());
211
212 for (address, account) in update {
213 if account.is_touched() {
214 let hashed_address = keccak256(address);
215 trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
216
217 let destroyed = account.is_selfdestructed();
218 let info = if destroyed { None } else { Some(account.info.into()) };
219 hashed_state.accounts.insert(hashed_address, info);
220
221 let mut changed_storage_iter = account
222 .storage
223 .into_iter()
224 .filter(|(_slot, value)| value.is_changed())
225 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
226 .peekable();
227
228 if destroyed {
229 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
230 } else if changed_storage_iter.peek().is_some() {
231 hashed_state
232 .storages
233 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
234 }
235 }
236 }
237
238 hashed_state
239}
240
241#[derive(Debug)]
243struct MultiproofInput<Factory> {
244 config: MultiProofConfig<Factory>,
245 source: Option<StateChangeSource>,
246 hashed_state_update: HashedPostState,
247 proof_targets: MultiProofTargets,
248 proof_sequence_number: u64,
249 state_root_message_sender: Sender<MultiProofMessage>,
250}
251
252#[derive(Debug)]
257pub struct MultiproofManager<Factory> {
258 max_concurrent: usize,
260 inflight: usize,
262 pending: VecDeque<MultiproofInput<Factory>>,
264 executor: WorkloadExecutor,
266 metrics: MultiProofTaskMetrics,
268}
269
270impl<Factory> MultiproofManager<Factory>
271where
272 Factory:
273 DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
274{
275 fn new(executor: WorkloadExecutor, metrics: MultiProofTaskMetrics) -> Self {
277 let max_concurrent = executor.rayon_pool().current_num_threads();
278 Self {
279 pending: VecDeque::with_capacity(max_concurrent),
280 max_concurrent,
281 executor,
282 inflight: 0,
283 metrics,
284 }
285 }
286
287 fn spawn_or_queue(&mut self, input: MultiproofInput<Factory>) {
290 if input.proof_targets.is_empty() {
292 debug!(
293 sequence_number = input.proof_sequence_number,
294 "No proof targets, sending empty multiproof back immediately"
295 );
296 let _ = input.state_root_message_sender.send(MultiProofMessage::EmptyProof {
297 sequence_number: input.proof_sequence_number,
298 state: input.hashed_state_update,
299 });
300 return
301 }
302
303 if self.inflight >= self.max_concurrent {
304 self.pending.push_back(input);
305 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
306 return;
307 }
308
309 self.spawn_multiproof(input);
310 }
311
312 fn on_calculation_complete(&mut self) {
315 self.inflight = self.inflight.saturating_sub(1);
316 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
317
318 if let Some(input) = self.pending.pop_front() {
319 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
320 self.spawn_multiproof(input);
321 }
322 }
323
324 fn spawn_multiproof(
326 &mut self,
327 MultiproofInput {
328 config,
329 source,
330 hashed_state_update,
331 proof_targets,
332 proof_sequence_number,
333 state_root_message_sender,
334 }: MultiproofInput<Factory>,
335 ) {
336 let executor = self.executor.clone();
337
338 self.executor.spawn_blocking(move || {
339 let account_targets = proof_targets.len();
340 let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
341
342 trace!(
343 target: "engine::root",
344 proof_sequence_number,
345 ?proof_targets,
346 account_targets,
347 storage_targets,
348 "Starting multiproof calculation",
349 );
350 let start = Instant::now();
351 let result = ParallelProof::new(
352 config.consistent_view,
353 config.nodes_sorted,
354 config.state_sorted,
355 config.prefix_sets,
356 executor.handle().clone(),
357 )
358 .with_branch_node_masks(true)
359 .multiproof(proof_targets);
360 let elapsed = start.elapsed();
361 trace!(
362 target: "engine::root",
363 proof_sequence_number,
364 ?elapsed,
365 ?source,
366 account_targets,
367 storage_targets,
368 "Multiproof calculated",
369 );
370
371 match result {
372 Ok(proof) => {
373 let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
374 Box::new(ProofCalculated {
375 sequence_number: proof_sequence_number,
376 update: SparseTrieUpdate {
377 state: hashed_state_update,
378 multiproof: proof,
379 },
380 elapsed,
381 }),
382 ));
383 }
384 Err(error) => {
385 let _ = state_root_message_sender
386 .send(MultiProofMessage::ProofCalculationError(error.into()));
387 }
388 }
389 });
390
391 self.inflight += 1;
392 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
393 }
394}
395
396#[derive(Metrics, Clone)]
397#[metrics(scope = "tree.root")]
398pub(crate) struct MultiProofTaskMetrics {
399 pub inflight_multiproofs_histogram: Histogram,
401 pub pending_multiproofs_histogram: Histogram,
403
404 pub prefetch_proof_targets_accounts_histogram: Histogram,
406 pub prefetch_proof_targets_storages_histogram: Histogram,
408 pub prefetch_proof_chunks_histogram: Histogram,
410
411 pub state_update_proof_targets_accounts_histogram: Histogram,
413 pub state_update_proof_targets_storages_histogram: Histogram,
415 pub state_update_proof_chunks_histogram: Histogram,
417
418 pub proof_calculation_duration_histogram: Histogram,
420
421 pub sparse_trie_update_duration_histogram: Histogram,
423 pub sparse_trie_final_update_duration_histogram: Histogram,
425 pub sparse_trie_total_duration_histogram: Histogram,
427
428 pub state_updates_received_histogram: Histogram,
430 pub proofs_processed_histogram: Histogram,
432 pub multiproof_task_total_duration_histogram: Histogram,
434 pub first_update_wait_time_histogram: Histogram,
436}
437
438#[derive(Debug)]
448pub(super) struct MultiProofTask<Factory> {
449 config: MultiProofConfig<Factory>,
451 rx: Receiver<MultiProofMessage>,
453 tx: Sender<MultiProofMessage>,
455 to_sparse_trie: Sender<SparseTrieUpdate>,
457 fetched_proof_targets: MultiProofTargets,
459 proof_sequencer: ProofSequencer,
461 multiproof_manager: MultiproofManager<Factory>,
463 metrics: MultiProofTaskMetrics,
465}
466
467impl<Factory> MultiProofTask<Factory>
468where
469 Factory:
470 DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
471{
472 pub(super) fn new(
474 config: MultiProofConfig<Factory>,
475 executor: WorkloadExecutor,
476 to_sparse_trie: Sender<SparseTrieUpdate>,
477 ) -> Self {
478 let (tx, rx) = channel();
479 let metrics = MultiProofTaskMetrics::default();
480 Self {
481 config,
482 rx,
483 tx,
484 to_sparse_trie,
485 fetched_proof_targets: Default::default(),
486 proof_sequencer: ProofSequencer::default(),
487 multiproof_manager: MultiproofManager::new(executor, metrics.clone()),
488 metrics,
489 }
490 }
491
492 pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
494 self.tx.clone()
495 }
496
497 fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
501 let proof_targets = self.get_prefetch_proof_targets(targets);
502 self.fetched_proof_targets.extend_ref(&proof_targets);
503
504 self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
505 self.metrics
506 .prefetch_proof_targets_storages_histogram
507 .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
508
509 let mut chunks = 0;
511 for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
512 self.multiproof_manager.spawn_or_queue(MultiproofInput {
513 config: self.config.clone(),
514 source: None,
515 hashed_state_update: Default::default(),
516 proof_targets: proof_targets_chunk,
517 proof_sequence_number: self.proof_sequencer.next_sequence(),
518 state_root_message_sender: self.tx.clone(),
519 });
520 chunks += 1;
521 }
522 self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
523
524 chunks
525 }
526
527 fn is_done(
529 &self,
530 proofs_processed: u64,
531 state_update_proofs_requested: u64,
532 prefetch_proofs_requested: u64,
533 updates_finished: bool,
534 ) -> bool {
535 let all_proofs_processed =
536 proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
537 let no_pending = !self.proof_sequencer.has_pending();
538 debug!(
539 target: "engine::root",
540 proofs_processed,
541 state_update_proofs_requested,
542 prefetch_proofs_requested,
543 no_pending,
544 updates_finished,
545 "Checking end condition"
546 );
547 all_proofs_processed && no_pending && updates_finished
548 }
549
550 fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
552 let mut duplicates = 0;
556
557 targets.retain(|hashed_address, target_storage| {
559 let keep = self
560 .fetched_proof_targets
561 .get(hashed_address)
562 .is_none_or(|fetched_storage| {
564 !target_storage.is_subset(fetched_storage)
566 });
567
568 if !keep {
569 duplicates += target_storage.len();
570 }
571
572 keep
573 });
574
575 for (hashed_address, target_storage) in targets.deref_mut() {
577 let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
578 continue
581 };
582
583 let prev_target_storage_len = target_storage.len();
584
585 target_storage.retain(|slot| !fetched_storage.contains(slot));
589
590 duplicates += prev_target_storage_len - target_storage.len();
591 }
592
593 if duplicates > 0 {
594 trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
595 }
596
597 targets
598 }
599
600 fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
604 let hashed_state_update = evm_state_to_hashed_post_state(update);
605 let (fetched_state_update, not_fetched_state_update) =
608 hashed_state_update.partition_by_targets(&self.fetched_proof_targets);
609
610 let mut state_updates = 0;
611 if !fetched_state_update.is_empty() {
614 let _ = self.tx.send(MultiProofMessage::EmptyProof {
615 sequence_number: self.proof_sequencer.next_sequence(),
616 state: fetched_state_update,
617 });
618 state_updates += 1;
619 }
620
621 let mut chunks = 0;
623 let mut spawned_proof_targets = MultiProofTargets::default();
624 for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
625 let proof_targets = get_proof_targets(&chunk, &self.fetched_proof_targets);
626 spawned_proof_targets.extend_ref(&proof_targets);
627
628 self.multiproof_manager.spawn_or_queue(MultiproofInput {
629 config: self.config.clone(),
630 source: Some(source),
631 hashed_state_update: chunk,
632 proof_targets,
633 proof_sequence_number: self.proof_sequencer.next_sequence(),
634 state_root_message_sender: self.tx.clone(),
635 });
636 chunks += 1;
637 }
638
639 self.metrics
640 .state_update_proof_targets_accounts_histogram
641 .record(spawned_proof_targets.len() as f64);
642 self.metrics
643 .state_update_proof_targets_storages_histogram
644 .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
645 self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
646
647 self.fetched_proof_targets.extend(spawned_proof_targets);
648
649 state_updates + chunks
650 }
651
652 fn on_proof(
654 &mut self,
655 sequence_number: u64,
656 update: SparseTrieUpdate,
657 ) -> Option<SparseTrieUpdate> {
658 let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
659
660 ready_proofs
661 .into_iter()
662 .reduce(|mut acc_update, update| {
664 acc_update.extend(update);
665 acc_update
666 })
667 .filter(|proof| !proof.is_empty())
669 }
670
671 pub(crate) fn run(mut self) {
706 let mut prefetch_proofs_requested = 0;
708 let mut state_update_proofs_requested = 0;
709 let mut proofs_processed = 0;
710
711 let mut updates_finished = false;
712
713 let start = Instant::now();
715
716 let mut first_update_time = None;
718 let mut last_update_time = None;
720
721 loop {
722 trace!(target: "engine::root", "entering main channel receiving loop");
723 match self.rx.recv() {
724 Ok(message) => match message {
725 MultiProofMessage::PrefetchProofs(targets) => {
726 trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
727 if first_update_time.is_none() {
728 self.metrics
730 .first_update_wait_time_histogram
731 .record(start.elapsed().as_secs_f64());
732 first_update_time = Some(Instant::now());
733 debug!(target: "engine::root", "Started state root calculation");
734 }
735
736 let account_targets = targets.len();
737 let storage_targets =
738 targets.values().map(|slots| slots.len()).sum::<usize>();
739 prefetch_proofs_requested += self.on_prefetch_proof(targets);
740 debug!(
741 target: "engine::root",
742 account_targets,
743 storage_targets,
744 prefetch_proofs_requested,
745 "Prefetching proofs"
746 );
747 }
748 MultiProofMessage::StateUpdate(source, update) => {
749 trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
750 if first_update_time.is_none() {
751 self.metrics
753 .first_update_wait_time_histogram
754 .record(start.elapsed().as_secs_f64());
755 first_update_time = Some(Instant::now());
756 debug!(target: "engine::root", "Started state root calculation");
757 }
758 last_update_time = Some(Instant::now());
759
760 let len = update.len();
761 state_update_proofs_requested += self.on_state_update(source, update);
762 debug!(
763 target: "engine::root",
764 ?source,
765 len,
766 ?state_update_proofs_requested,
767 "Received new state update"
768 );
769 }
770 MultiProofMessage::FinishedStateUpdates => {
771 trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
772 updates_finished = true;
773 if self.is_done(
774 proofs_processed,
775 state_update_proofs_requested,
776 prefetch_proofs_requested,
777 updates_finished,
778 ) {
779 debug!(
780 target: "engine::root",
781 "State updates finished and all proofs processed, ending calculation"
782 );
783 break
784 }
785 }
786 MultiProofMessage::EmptyProof { sequence_number, state } => {
787 trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
788
789 proofs_processed += 1;
790
791 if let Some(combined_update) = self.on_proof(
792 sequence_number,
793 SparseTrieUpdate { state, multiproof: MultiProof::default() },
794 ) {
795 let _ = self.to_sparse_trie.send(combined_update);
796 }
797
798 if self.is_done(
799 proofs_processed,
800 state_update_proofs_requested,
801 prefetch_proofs_requested,
802 updates_finished,
803 ) {
804 debug!(
805 target: "engine::root",
806 "State updates finished and all proofs processed, ending calculation"
807 );
808 break
809 }
810 }
811 MultiProofMessage::ProofCalculated(proof_calculated) => {
812 trace!(target: "engine::root", "processing
813 MultiProofMessage::ProofCalculated");
814
815 proofs_processed += 1;
818
819 self.metrics
820 .proof_calculation_duration_histogram
821 .record(proof_calculated.elapsed);
822
823 debug!(
824 target: "engine::root",
825 sequence = proof_calculated.sequence_number,
826 total_proofs = proofs_processed,
827 "Processing calculated proof"
828 );
829
830 self.multiproof_manager.on_calculation_complete();
831
832 if let Some(combined_update) =
833 self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
834 {
835 let _ = self.to_sparse_trie.send(combined_update);
836 }
837
838 if self.is_done(
839 proofs_processed,
840 state_update_proofs_requested,
841 prefetch_proofs_requested,
842 updates_finished,
843 ) {
844 debug!(
845 target: "engine::root",
846 "State updates finished and all proofs processed, ending calculation");
847 break
848 }
849 }
850 MultiProofMessage::ProofCalculationError(err) => {
851 error!(
852 target: "engine::root",
853 ?err,
854 "proof calculation error"
855 );
856 return
857 }
858 },
859 Err(_) => {
860 error!(
863 target: "engine::root",
864 "Internal message channel closed unexpectedly"
865 );
866 }
867 }
868 }
869
870 debug!(
871 target: "engine::root",
872 total_updates = state_update_proofs_requested,
873 total_proofs = proofs_processed,
874 total_time = ?first_update_time.map(|t|t.elapsed()),
875 time_from_last_update = ?last_update_time.map(|t|t.elapsed()),
876 "All proofs processed, ending calculation"
877 );
878
879 self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
881 self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
882 if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
883 self.metrics.multiproof_task_total_duration_histogram.record(total_time);
884 }
885 }
886}
887
888fn get_proof_targets(
892 state_update: &HashedPostState,
893 fetched_proof_targets: &MultiProofTargets,
894) -> MultiProofTargets {
895 let mut targets = MultiProofTargets::default();
896
897 for &hashed_address in state_update.accounts.keys() {
899 if !fetched_proof_targets.contains_key(&hashed_address) {
900 targets.insert(hashed_address, HashSet::default());
901 }
902 }
903
904 for (hashed_address, storage) in &state_update.storages {
906 let fetched = fetched_proof_targets.get(hashed_address);
907 let mut changed_slots = storage
908 .storage
909 .keys()
910 .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
911 .peekable();
912
913 if changed_slots.peek().is_some() {
914 targets.entry(*hashed_address).or_default().extend(changed_slots);
915 }
916 }
917
918 targets
919}
920
921#[cfg(test)]
922mod tests {
923 use super::*;
924 use alloy_primitives::map::B256Set;
925 use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
926 use reth_trie::TrieInput;
927 use revm_primitives::{B256, U256};
928 use std::sync::Arc;
929
930 fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
931 where
932 F: DatabaseProviderFactory<Provider: BlockReader>
933 + StateCommitmentProvider
934 + Clone
935 + 'static,
936 {
937 let consistent_view = ConsistentDbView::new(factory, None);
938 let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
939 let state_sorted = Arc::new(input.state.clone().into_sorted());
940 let prefix_sets = Arc::new(input.prefix_sets);
941
942 MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
943 }
944
945 fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
946 where
947 F: DatabaseProviderFactory<Provider: BlockReader>
948 + StateCommitmentProvider
949 + Clone
950 + 'static,
951 {
952 let executor = WorkloadExecutor::with_num_cpu_threads(2);
953 let config = create_state_root_config(factory, TrieInput::default());
954 let channel = channel();
955
956 MultiProofTask::new(config, executor, channel.0)
957 }
958
959 #[test]
960 fn test_add_proof_in_sequence() {
961 let mut sequencer = ProofSequencer::default();
962 let proof1 = MultiProof::default();
963 let proof2 = MultiProof::default();
964 sequencer.next_sequence = 2;
965
966 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
967 assert_eq!(ready.len(), 1);
968 assert!(!sequencer.has_pending());
969
970 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
971 assert_eq!(ready.len(), 1);
972 assert!(!sequencer.has_pending());
973 }
974
975 #[test]
976 fn test_add_proof_out_of_order() {
977 let mut sequencer = ProofSequencer::default();
978 let proof1 = MultiProof::default();
979 let proof2 = MultiProof::default();
980 let proof3 = MultiProof::default();
981 sequencer.next_sequence = 3;
982
983 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
984 assert_eq!(ready.len(), 0);
985 assert!(sequencer.has_pending());
986
987 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
988 assert_eq!(ready.len(), 1);
989 assert!(sequencer.has_pending());
990
991 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
992 assert_eq!(ready.len(), 2);
993 assert!(!sequencer.has_pending());
994 }
995
996 #[test]
997 fn test_add_proof_with_gaps() {
998 let mut sequencer = ProofSequencer::default();
999 let proof1 = MultiProof::default();
1000 let proof3 = MultiProof::default();
1001 sequencer.next_sequence = 3;
1002
1003 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1004 assert_eq!(ready.len(), 1);
1005
1006 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
1007 assert_eq!(ready.len(), 0);
1008 assert!(sequencer.has_pending());
1009 }
1010
1011 #[test]
1012 fn test_add_proof_duplicate_sequence() {
1013 let mut sequencer = ProofSequencer::default();
1014 let proof1 = MultiProof::default();
1015 let proof2 = MultiProof::default();
1016
1017 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1018 assert_eq!(ready.len(), 1);
1019
1020 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2));
1021 assert_eq!(ready.len(), 0);
1022 assert!(!sequencer.has_pending());
1023 }
1024
1025 #[test]
1026 fn test_add_proof_batch_processing() {
1027 let mut sequencer = ProofSequencer::default();
1028 let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1029 sequencer.next_sequence = 5;
1030
1031 sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()));
1032 sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()));
1033 sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()));
1034 sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()));
1035
1036 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()));
1037 assert_eq!(ready.len(), 5);
1038 assert!(!sequencer.has_pending());
1039 }
1040
1041 fn create_get_proof_targets_state() -> HashedPostState {
1042 let mut state = HashedPostState::default();
1043
1044 let addr1 = B256::random();
1045 let addr2 = B256::random();
1046 state.accounts.insert(addr1, Some(Default::default()));
1047 state.accounts.insert(addr2, Some(Default::default()));
1048
1049 let mut storage = HashedStorage::default();
1050 let slot1 = B256::random();
1051 let slot2 = B256::random();
1052 storage.storage.insert(slot1, U256::ZERO);
1053 storage.storage.insert(slot2, U256::from(1));
1054 state.storages.insert(addr1, storage);
1055
1056 state
1057 }
1058
1059 #[test]
1060 fn test_get_proof_targets_new_account_targets() {
1061 let state = create_get_proof_targets_state();
1062 let fetched = MultiProofTargets::default();
1063
1064 let targets = get_proof_targets(&state, &fetched);
1065
1066 assert_eq!(targets.len(), state.accounts.len());
1068 for addr in state.accounts.keys() {
1069 assert!(targets.contains_key(addr));
1070 }
1071 }
1072
1073 #[test]
1074 fn test_get_proof_targets_new_storage_targets() {
1075 let state = create_get_proof_targets_state();
1076 let fetched = MultiProofTargets::default();
1077
1078 let targets = get_proof_targets(&state, &fetched);
1079
1080 for (addr, storage) in &state.storages {
1082 assert!(targets.contains_key(addr));
1083 let target_slots = &targets[addr];
1084 assert_eq!(target_slots.len(), storage.storage.len());
1085 for slot in storage.storage.keys() {
1086 assert!(target_slots.contains(slot));
1087 }
1088 }
1089 }
1090
1091 #[test]
1092 fn test_get_proof_targets_filter_already_fetched_accounts() {
1093 let state = create_get_proof_targets_state();
1094 let mut fetched = MultiProofTargets::default();
1095
1096 let fetched_addr = state
1098 .accounts
1099 .keys()
1100 .find(|&&addr| !state.storages.contains_key(&addr))
1101 .expect("Should have an account without storage");
1102
1103 fetched.insert(*fetched_addr, HashSet::default());
1105
1106 let targets = get_proof_targets(&state, &fetched);
1107
1108 assert!(!targets.contains_key(fetched_addr));
1110 assert_eq!(targets.len(), state.accounts.len() - 1);
1112 }
1113
1114 #[test]
1115 fn test_get_proof_targets_filter_already_fetched_storage() {
1116 let state = create_get_proof_targets_state();
1117 let mut fetched = MultiProofTargets::default();
1118
1119 let (addr, storage) = state.storages.iter().next().unwrap();
1121 let mut fetched_slots = HashSet::default();
1122 let fetched_slot = *storage.storage.keys().next().unwrap();
1123 fetched_slots.insert(fetched_slot);
1124 fetched.insert(*addr, fetched_slots);
1125
1126 let targets = get_proof_targets(&state, &fetched);
1127
1128 let target_slots = &targets[addr];
1130 assert!(!target_slots.contains(&fetched_slot));
1131 assert_eq!(target_slots.len(), storage.storage.len() - 1);
1132 }
1133
1134 #[test]
1135 fn test_get_proof_targets_empty_state() {
1136 let state = HashedPostState::default();
1137 let fetched = MultiProofTargets::default();
1138
1139 let targets = get_proof_targets(&state, &fetched);
1140
1141 assert!(targets.is_empty());
1142 }
1143
1144 #[test]
1145 fn test_get_proof_targets_mixed_fetched_state() {
1146 let mut state = HashedPostState::default();
1147 let mut fetched = MultiProofTargets::default();
1148
1149 let addr1 = B256::random();
1150 let addr2 = B256::random();
1151 let slot1 = B256::random();
1152 let slot2 = B256::random();
1153
1154 state.accounts.insert(addr1, Some(Default::default()));
1155 state.accounts.insert(addr2, Some(Default::default()));
1156
1157 let mut storage = HashedStorage::default();
1158 storage.storage.insert(slot1, U256::ZERO);
1159 storage.storage.insert(slot2, U256::from(1));
1160 state.storages.insert(addr1, storage);
1161
1162 let mut fetched_slots = HashSet::default();
1163 fetched_slots.insert(slot1);
1164 fetched.insert(addr1, fetched_slots);
1165
1166 let targets = get_proof_targets(&state, &fetched);
1167
1168 assert!(targets.contains_key(&addr2));
1169 assert!(!targets[&addr1].contains(&slot1));
1170 assert!(targets[&addr1].contains(&slot2));
1171 }
1172
1173 #[test]
1174 fn test_get_proof_targets_unmodified_account_with_storage() {
1175 let mut state = HashedPostState::default();
1176 let fetched = MultiProofTargets::default();
1177
1178 let addr = B256::random();
1179 let slot1 = B256::random();
1180 let slot2 = B256::random();
1181
1182 let mut storage = HashedStorage::default();
1185 storage.storage.insert(slot1, U256::from(1));
1186 storage.storage.insert(slot2, U256::from(2));
1187 state.storages.insert(addr, storage);
1188
1189 assert!(!state.accounts.contains_key(&addr));
1190 assert!(!fetched.contains_key(&addr));
1191
1192 let targets = get_proof_targets(&state, &fetched);
1193
1194 assert!(targets.contains_key(&addr));
1196
1197 let target_slots = &targets[&addr];
1198 assert_eq!(target_slots.len(), 2);
1199 assert!(target_slots.contains(&slot1));
1200 assert!(target_slots.contains(&slot2));
1201 }
1202
1203 #[test]
1204 fn test_get_prefetch_proof_targets_no_duplicates() {
1205 let test_provider_factory = create_test_provider_factory();
1206 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1207
1208 let mut targets = MultiProofTargets::default();
1210 let addr1 = B256::random();
1211 let addr2 = B256::random();
1212 let slot1 = B256::random();
1213 let slot2 = B256::random();
1214 targets.insert(addr1, vec![slot1].into_iter().collect());
1215 targets.insert(addr2, vec![slot2].into_iter().collect());
1216
1217 let prefetch_proof_targets =
1218 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1219
1220 assert_eq!(prefetch_proof_targets, targets);
1223
1224 let addr3 = B256::random();
1226 let slot3 = B256::random();
1227 test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect());
1228
1229 let prefetch_proof_targets =
1230 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1231
1232 assert_eq!(prefetch_proof_targets, targets);
1235 }
1236
1237 #[test]
1238 fn test_get_prefetch_proof_targets_remove_subset() {
1239 let test_provider_factory = create_test_provider_factory();
1240 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1241
1242 let mut targets = MultiProofTargets::default();
1244 let addr1 = B256::random();
1245 let addr2 = B256::random();
1246 let slot1 = B256::random();
1247 let slot2 = B256::random();
1248 targets.insert(addr1, vec![slot1].into_iter().collect());
1249 targets.insert(addr2, vec![slot2].into_iter().collect());
1250
1251 test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect());
1253
1254 let prefetch_proof_targets =
1255 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1256
1257 assert_eq!(prefetch_proof_targets.len(), 1);
1259 assert!(!prefetch_proof_targets.contains_key(&addr1));
1260 assert!(prefetch_proof_targets.contains_key(&addr2));
1261
1262 let slot3 = B256::random();
1264 targets.get_mut(&addr1).unwrap().insert(slot3);
1265
1266 let prefetch_proof_targets =
1267 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1268
1269 assert_eq!(prefetch_proof_targets.len(), 2);
1272 assert!(prefetch_proof_targets.contains_key(&addr1));
1273 assert_eq!(
1274 *prefetch_proof_targets.get(&addr1).unwrap(),
1275 vec![slot3].into_iter().collect::<B256Set>()
1276 );
1277 assert!(prefetch_proof_targets.contains_key(&addr2));
1278 assert_eq!(
1279 *prefetch_proof_targets.get(&addr2).unwrap(),
1280 vec![slot2].into_iter().collect::<B256Set>()
1281 );
1282 }
1283}