1use crate::{
33 root::ParallelStateRootError,
34 stats::{ParallelTrieStats, ParallelTrieTracker},
35 targets_v2::MultiProofTargetsV2,
36 value_encoder::AsyncAccountValueEncoder,
37 StorageRootTargets,
38};
39use alloy_primitives::{
40 map::{B256Map, B256Set},
41 B256,
42};
43use alloy_rlp::{BufMut, Encodable};
44use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
45use dashmap::DashMap;
46use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
47use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
48use reth_storage_errors::db::DatabaseError;
49use reth_trie::{
50 hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
51 node_iter::{TrieElement, TrieNodeIter},
52 prefix_set::TriePrefixSets,
53 proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
54 proof_v2,
55 trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
56 walker::TrieWalker,
57 DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState,
58 MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
59};
60use reth_trie_common::{
61 added_removed_keys::MultiAddedRemovedKeys,
62 prefix_set::{PrefixSet, PrefixSetMut},
63 proof::{DecodedProofNodes, ProofRetainer},
64 BranchNodeMasks, BranchNodeMasksMap,
65};
66use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
67use std::{
68 sync::{
69 atomic::{AtomicUsize, Ordering},
70 mpsc::{channel, Receiver, Sender},
71 Arc,
72 },
73 time::{Duration, Instant},
74};
75use tokio::runtime::Handle;
76use tracing::{debug, debug_span, error, trace};
77
78#[cfg(feature = "metrics")]
79use crate::proof_task_metrics::{
80 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
81};
82
83type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
84
85#[derive(Debug, Clone)]
91pub struct ProofWorkerHandle {
92 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
94 account_work_tx: CrossbeamSender<AccountWorkerJob>,
96 storage_available_workers: Arc<AtomicUsize>,
99 account_available_workers: Arc<AtomicUsize>,
102 storage_worker_count: usize,
104 account_worker_count: usize,
106 v2_proofs_enabled: bool,
108}
109
110impl ProofWorkerHandle {
111 pub fn new<Factory>(
123 executor: Handle,
124 task_ctx: ProofTaskCtx<Factory>,
125 storage_worker_count: usize,
126 account_worker_count: usize,
127 v2_proofs_enabled: bool,
128 ) -> Self
129 where
130 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
131 + Clone
132 + Send
133 + 'static,
134 {
135 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
136 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
137
138 let storage_available_workers = Arc::new(AtomicUsize::new(0));
141 let account_available_workers = Arc::new(AtomicUsize::new(0));
142
143 let cached_storage_roots = Arc::new(DashMap::new());
144
145 debug!(
146 target: "trie::proof_task",
147 storage_worker_count,
148 account_worker_count,
149 ?v2_proofs_enabled,
150 "Spawning proof worker pools"
151 );
152
153 let parent_span =
154 debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
155 .entered();
156 for worker_id in 0..storage_worker_count {
158 let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
159 let task_ctx_clone = task_ctx.clone();
160 let work_rx_clone = storage_work_rx.clone();
161 let storage_available_workers_clone = storage_available_workers.clone();
162 let cached_storage_roots = cached_storage_roots.clone();
163
164 executor.spawn_blocking(move || {
165 #[cfg(feature = "metrics")]
166 let metrics = ProofTaskTrieMetrics::default();
167 #[cfg(feature = "metrics")]
168 let cursor_metrics = ProofTaskCursorMetrics::new();
169
170 let _guard = span.enter();
171 let worker = StorageProofWorker::new(
172 task_ctx_clone,
173 work_rx_clone,
174 worker_id,
175 storage_available_workers_clone,
176 cached_storage_roots,
177 #[cfg(feature = "metrics")]
178 metrics,
179 #[cfg(feature = "metrics")]
180 cursor_metrics,
181 )
182 .with_v2_proofs(v2_proofs_enabled);
183 if let Err(error) = worker.run() {
184 error!(
185 target: "trie::proof_task",
186 worker_id,
187 ?error,
188 "Storage worker failed"
189 );
190 }
191 });
192 }
193 drop(parent_span);
194
195 let parent_span =
196 debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
197 .entered();
198 for worker_id in 0..account_worker_count {
200 let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
201 let task_ctx_clone = task_ctx.clone();
202 let work_rx_clone = account_work_rx.clone();
203 let storage_work_tx_clone = storage_work_tx.clone();
204 let account_available_workers_clone = account_available_workers.clone();
205 let cached_storage_roots = cached_storage_roots.clone();
206
207 executor.spawn_blocking(move || {
208 #[cfg(feature = "metrics")]
209 let metrics = ProofTaskTrieMetrics::default();
210 #[cfg(feature = "metrics")]
211 let cursor_metrics = ProofTaskCursorMetrics::new();
212
213 let _guard = span.enter();
214 let worker = AccountProofWorker::new(
215 task_ctx_clone,
216 work_rx_clone,
217 worker_id,
218 storage_work_tx_clone,
219 account_available_workers_clone,
220 cached_storage_roots,
221 #[cfg(feature = "metrics")]
222 metrics,
223 #[cfg(feature = "metrics")]
224 cursor_metrics,
225 )
226 .with_v2_proofs(v2_proofs_enabled);
227 if let Err(error) = worker.run() {
228 error!(
229 target: "trie::proof_task",
230 worker_id,
231 ?error,
232 "Account worker failed"
233 );
234 }
235 });
236 }
237 drop(parent_span);
238
239 Self {
240 storage_work_tx,
241 account_work_tx,
242 storage_available_workers,
243 account_available_workers,
244 storage_worker_count,
245 account_worker_count,
246 v2_proofs_enabled,
247 }
248 }
249
250 pub const fn v2_proofs_enabled(&self) -> bool {
252 self.v2_proofs_enabled
253 }
254
255 pub fn available_storage_workers(&self) -> usize {
257 self.storage_available_workers.load(Ordering::Relaxed)
258 }
259
260 pub fn available_account_workers(&self) -> usize {
262 self.account_available_workers.load(Ordering::Relaxed)
263 }
264
265 pub fn pending_storage_tasks(&self) -> usize {
267 self.storage_work_tx.len()
268 }
269
270 pub fn pending_account_tasks(&self) -> usize {
272 self.account_work_tx.len()
273 }
274
275 pub const fn total_storage_workers(&self) -> usize {
277 self.storage_worker_count
278 }
279
280 pub const fn total_account_workers(&self) -> usize {
282 self.account_worker_count
283 }
284
285 pub fn active_storage_workers(&self) -> usize {
289 self.storage_worker_count.saturating_sub(self.available_storage_workers())
290 }
291
292 pub fn active_account_workers(&self) -> usize {
296 self.account_worker_count.saturating_sub(self.available_account_workers())
297 }
298
299 pub fn dispatch_storage_proof(
303 &self,
304 input: StorageProofInput,
305 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
306 ) -> Result<(), ProviderError> {
307 let hashed_address = input.hashed_address();
308 self.storage_work_tx
309 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
310 .map_err(|err| {
311 if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
312 let _ = proof_result_sender.send(StorageProofResultMessage {
313 hashed_address,
314 result: Err(DatabaseError::Other(
315 "storage workers unavailable".to_string(),
316 )
317 .into()),
318 });
319 }
320
321 ProviderError::other(std::io::Error::other("storage workers unavailable"))
322 })
323 }
324
325 pub fn dispatch_account_multiproof(
329 &self,
330 input: AccountMultiproofInput,
331 ) -> Result<(), ProviderError> {
332 self.account_work_tx
333 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
334 .map_err(|err| {
335 let error =
336 ProviderError::other(std::io::Error::other("account workers unavailable"));
337
338 if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
339 let ProofResultContext {
340 sender: result_tx,
341 sequence_number: seq,
342 state,
343 start_time: start,
344 } = input.into_proof_result_sender();
345
346 let _ = result_tx.send(ProofResultMessage {
347 sequence_number: seq,
348 result: Err(ParallelStateRootError::Provider(error.clone())),
349 elapsed: start.elapsed(),
350 state,
351 });
352 }
353
354 error
355 })
356 }
357
358 pub(crate) fn dispatch_blinded_storage_node(
360 &self,
361 account: B256,
362 path: Nibbles,
363 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
364 let (tx, rx) = channel();
365 self.storage_work_tx
366 .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
367 .map_err(|_| {
368 ProviderError::other(std::io::Error::other("storage workers unavailable"))
369 })?;
370
371 Ok(rx)
372 }
373
374 pub(crate) fn dispatch_blinded_account_node(
376 &self,
377 path: Nibbles,
378 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
379 let (tx, rx) = channel();
380 self.account_work_tx
381 .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
382 .map_err(|_| {
383 ProviderError::other(std::io::Error::other("account workers unavailable"))
384 })?;
385
386 Ok(rx)
387 }
388}
389
390#[derive(Clone, Debug)]
392pub struct ProofTaskCtx<Factory> {
393 factory: Factory,
395}
396
397impl<Factory> ProofTaskCtx<Factory> {
398 pub const fn new(factory: Factory) -> Self {
400 Self { factory }
401 }
402}
403
404#[derive(Debug)]
406pub struct ProofTaskTx<Provider> {
407 provider: Provider,
409
410 id: usize,
412}
413
414impl<Provider> ProofTaskTx<Provider> {
415 const fn new(provider: Provider, id: usize) -> Self {
417 Self { provider, id }
418 }
419}
420
421impl<Provider> ProofTaskTx<Provider>
422where
423 Provider: TrieCursorFactory + HashedCursorFactory,
424{
425 #[inline]
429 fn compute_legacy_storage_proof(
430 &self,
431 input: StorageProofInput,
432 trie_cursor_metrics: &mut TrieCursorMetricsCache,
433 hashed_cursor_metrics: &mut HashedCursorMetricsCache,
434 ) -> Result<StorageProofResult, StateProofError> {
435 let StorageProofInput::Legacy {
437 hashed_address,
438 prefix_set,
439 target_slots,
440 with_branch_node_masks,
441 multi_added_removed_keys,
442 } = input
443 else {
444 panic!("compute_legacy_storage_proof only accepts StorageProofInput::Legacy")
445 };
446
447 let multi_added_removed_keys =
449 multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
450 let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
451
452 let span = debug_span!(
453 target: "trie::proof_task",
454 "Storage proof calculation",
455 ?hashed_address,
456 target_slots = ?target_slots.len(),
457 worker_id = self.id,
458 );
459 let _span_guard = span.enter();
460
461 let proof_start = Instant::now();
462
463 let raw_proof_result =
465 StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
466 .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
467 .with_branch_node_masks(with_branch_node_masks)
468 .with_added_removed_keys(added_removed_keys)
469 .with_trie_cursor_metrics(trie_cursor_metrics)
470 .with_hashed_cursor_metrics(hashed_cursor_metrics)
471 .storage_multiproof(target_slots);
472 trie_cursor_metrics.record_span("trie_cursor");
473 hashed_cursor_metrics.record_span("hashed_cursor");
474
475 let decoded_result =
477 raw_proof_result.and_then(|raw_proof| raw_proof.try_into().map_err(Into::into))?;
478
479 trace!(
480 target: "trie::proof_task",
481 hashed_address = ?hashed_address,
482 proof_time_us = proof_start.elapsed().as_micros(),
483 worker_id = self.id,
484 "Completed storage proof calculation"
485 );
486
487 Ok(StorageProofResult::Legacy { proof: decoded_result })
488 }
489
490 fn compute_v2_storage_proof(
491 &self,
492 input: StorageProofInput,
493 calculator: &mut proof_v2::StorageProofCalculator<
494 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
495 <Provider as HashedCursorFactory>::StorageCursor<'_>,
496 >,
497 ) -> Result<StorageProofResult, StateProofError> {
498 let StorageProofInput::V2 { hashed_address, mut targets } = input else {
499 panic!("compute_v2_storage_proof only accepts StorageProofInput::V2")
500 };
501
502 if targets.is_empty() {
506 targets.push(proof_v2::Target::new(B256::ZERO));
507 }
508
509 let span = debug_span!(
510 target: "trie::proof_task",
511 "V2 Storage proof calculation",
512 ?hashed_address,
513 targets = ?targets.len(),
514 worker_id = self.id,
515 );
516 let _span_guard = span.enter();
517
518 let proof_start = Instant::now();
519 let proof = calculator.storage_proof(hashed_address, &mut targets)?;
520 let root = calculator.compute_root_hash(&proof)?;
521
522 trace!(
523 target: "trie::proof_task",
524 hashed_address = ?hashed_address,
525 proof_time_us = proof_start.elapsed().as_micros(),
526 ?root,
527 worker_id = self.id,
528 "Completed V2 storage proof calculation"
529 );
530
531 Ok(StorageProofResult::V2 { proof, root })
532 }
533
534 fn process_blinded_storage_node(
538 &self,
539 account: B256,
540 path: &Nibbles,
541 ) -> TrieNodeProviderResult {
542 let storage_node_provider =
543 ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
544 storage_node_provider.trie_node(path)
545 }
546
547 fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
551 let account_node_provider =
552 ProofBlindedAccountProvider::new(&self.provider, &self.provider);
553 account_node_provider.trie_node(path)
554 }
555}
556impl TrieNodeProviderFactory for ProofWorkerHandle {
557 type AccountNodeProvider = ProofTaskTrieNodeProvider;
558 type StorageNodeProvider = ProofTaskTrieNodeProvider;
559
560 fn account_node_provider(&self) -> Self::AccountNodeProvider {
561 ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
562 }
563
564 fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
565 ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
566 }
567}
568
569#[derive(Debug)]
571pub enum ProofTaskTrieNodeProvider {
572 AccountNode {
574 handle: ProofWorkerHandle,
576 },
577 StorageNode {
579 account: B256,
581 handle: ProofWorkerHandle,
583 },
584}
585
586impl TrieNodeProvider for ProofTaskTrieNodeProvider {
587 fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
588 match self {
589 Self::AccountNode { handle } => {
590 let rx = handle
591 .dispatch_blinded_account_node(*path)
592 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
593 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
594 }
595 Self::StorageNode { handle, account } => {
596 let rx = handle
597 .dispatch_blinded_storage_node(*account, *path)
598 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
599 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
600 }
601 }
602 }
603}
604
605#[derive(Debug)]
607pub enum ProofResult {
608 Legacy(DecodedMultiProof, ParallelTrieStats),
610 V2(DecodedMultiProofV2),
612}
613
614impl ProofResult {
615 pub fn empty(v2_enabled: bool) -> Self {
620 if v2_enabled {
621 Self::V2(DecodedMultiProofV2::default())
622 } else {
623 let stats = ParallelTrieTracker::default().finish();
624 Self::Legacy(DecodedMultiProof::default(), stats)
625 }
626 }
627
628 pub fn is_empty(&self) -> bool {
630 match self {
631 Self::Legacy(proof, _) => proof.is_empty(),
632 Self::V2(proof) => proof.is_empty(),
633 }
634 }
635
636 pub fn extend(&mut self, other: Self) {
642 match (self, other) {
643 (Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other),
644 (Self::V2(proof), Self::V2(other)) => proof.extend(other),
645 _ => panic!("mismatched ProofResults, cannot extend one with the other"),
646 }
647 }
648
649 pub fn account_proofs_len(&self) -> usize {
651 match self {
652 Self::Legacy(proof, _) => proof.account_subtree.len(),
653 Self::V2(proof) => proof.account_proofs.len(),
654 }
655 }
656
657 pub fn storage_proofs_len(&self) -> usize {
659 match self {
660 Self::Legacy(proof, _) => {
661 proof.storages.values().map(|p| p.subtree.len()).sum::<usize>()
662 }
663 Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::<usize>(),
664 }
665 }
666}
667
668pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
673
674#[derive(Debug)]
679pub struct ProofResultMessage {
680 pub sequence_number: u64,
682 pub result: Result<ProofResult, ParallelStateRootError>,
684 pub elapsed: Duration,
686 pub state: HashedPostState,
688}
689
690#[derive(Debug, Clone)]
695pub struct ProofResultContext {
696 pub sender: ProofResultSender,
698 pub sequence_number: u64,
700 pub state: HashedPostState,
702 pub start_time: Instant,
704}
705
706impl ProofResultContext {
707 pub const fn new(
709 sender: ProofResultSender,
710 sequence_number: u64,
711 state: HashedPostState,
712 start_time: Instant,
713 ) -> Self {
714 Self { sender, sequence_number, state, start_time }
715 }
716}
717
718#[derive(Debug)]
720pub(crate) enum StorageProofResult {
721 Legacy {
722 proof: DecodedStorageMultiProof,
724 },
725 V2 {
726 proof: Vec<ProofTrieNode>,
728 root: Option<B256>,
730 },
731}
732
733impl StorageProofResult {
734 const fn root(&self) -> Option<B256> {
736 match self {
737 Self::Legacy { proof } => Some(proof.root),
738 Self::V2 { root, .. } => *root,
739 }
740 }
741}
742
743impl From<StorageProofResult> for Option<DecodedStorageMultiProof> {
744 fn from(proof_result: StorageProofResult) -> Self {
746 match proof_result {
747 StorageProofResult::Legacy { proof } => Some(proof),
748 StorageProofResult::V2 { proof, root } => root.map(|root| {
749 let branch_node_masks = proof
750 .iter()
751 .filter_map(|node| node.masks.map(|masks| (node.path, masks)))
752 .collect();
753 let subtree = proof.into_iter().map(|node| (node.path, node.node)).collect();
754 DecodedStorageMultiProof { root, subtree, branch_node_masks }
755 }),
756 }
757 }
758}
759
760#[derive(Debug)]
762pub struct StorageProofResultMessage {
763 pub(crate) hashed_address: B256,
765 pub(crate) result: Result<StorageProofResult, StateProofError>,
767}
768
769#[derive(Debug)]
771pub(crate) enum StorageWorkerJob {
772 StorageProof {
774 input: StorageProofInput,
776 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
778 },
779 BlindedStorageNode {
781 account: B256,
783 path: Nibbles,
785 result_sender: Sender<TrieNodeProviderResult>,
787 },
788}
789
790struct StorageProofWorker<Factory> {
795 task_ctx: ProofTaskCtx<Factory>,
797 work_rx: CrossbeamReceiver<StorageWorkerJob>,
799 worker_id: usize,
801 available_workers: Arc<AtomicUsize>,
803 cached_storage_roots: Arc<DashMap<B256, B256>>,
805 #[cfg(feature = "metrics")]
807 metrics: ProofTaskTrieMetrics,
808 #[cfg(feature = "metrics")]
810 cursor_metrics: ProofTaskCursorMetrics,
811 v2_enabled: bool,
813}
814
815impl<Factory> StorageProofWorker<Factory>
816where
817 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
818{
819 const fn new(
821 task_ctx: ProofTaskCtx<Factory>,
822 work_rx: CrossbeamReceiver<StorageWorkerJob>,
823 worker_id: usize,
824 available_workers: Arc<AtomicUsize>,
825 cached_storage_roots: Arc<DashMap<B256, B256>>,
826 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
827 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
828 ) -> Self {
829 Self {
830 task_ctx,
831 work_rx,
832 worker_id,
833 available_workers,
834 cached_storage_roots,
835 #[cfg(feature = "metrics")]
836 metrics,
837 #[cfg(feature = "metrics")]
838 cursor_metrics,
839 v2_enabled: false,
840 }
841 }
842
843 const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
845 self.v2_enabled = v2_enabled;
846 self
847 }
848
849 fn run(mut self) -> ProviderResult<()> {
867 let provider = self.task_ctx.factory.database_provider_ro()?;
869 let proof_tx = ProofTaskTx::new(provider, self.worker_id);
870
871 trace!(
872 target: "trie::proof_task",
873 worker_id = self.worker_id,
874 "Storage worker started"
875 );
876
877 let mut storage_proofs_processed = 0u64;
878 let mut storage_nodes_processed = 0u64;
879 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
880 let mut v2_calculator = if self.v2_enabled {
881 let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
882 let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
883 Some(proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor))
884 } else {
885 None
886 };
887
888 self.available_workers.fetch_add(1, Ordering::Relaxed);
890
891 while let Ok(job) = self.work_rx.recv() {
892 self.available_workers.fetch_sub(1, Ordering::Relaxed);
894
895 match job {
896 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
897 self.process_storage_proof(
898 &proof_tx,
899 v2_calculator.as_mut(),
900 input,
901 proof_result_sender,
902 &mut storage_proofs_processed,
903 &mut cursor_metrics_cache,
904 );
905 }
906
907 StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
908 Self::process_blinded_node(
909 self.worker_id,
910 &proof_tx,
911 account,
912 path,
913 result_sender,
914 &mut storage_nodes_processed,
915 );
916 }
917 }
918
919 self.available_workers.fetch_add(1, Ordering::Relaxed);
921 }
922
923 trace!(
924 target: "trie::proof_task",
925 worker_id = self.worker_id,
926 storage_proofs_processed,
927 storage_nodes_processed,
928 "Storage worker shutting down"
929 );
930
931 #[cfg(feature = "metrics")]
932 {
933 self.metrics.record_storage_nodes(storage_nodes_processed as usize);
934 self.cursor_metrics.record(&mut cursor_metrics_cache);
935 }
936
937 Ok(())
938 }
939
940 fn process_storage_proof<Provider>(
942 &self,
943 proof_tx: &ProofTaskTx<Provider>,
944 v2_calculator: Option<
945 &mut proof_v2::StorageProofCalculator<
946 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
947 <Provider as HashedCursorFactory>::StorageCursor<'_>,
948 >,
949 >,
950 input: StorageProofInput,
951 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
952 storage_proofs_processed: &mut u64,
953 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
954 ) where
955 Provider: TrieCursorFactory + HashedCursorFactory,
956 {
957 let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
958 let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
959 let hashed_address = input.hashed_address();
960 let proof_start = Instant::now();
961
962 let result = match &input {
963 StorageProofInput::Legacy { hashed_address, prefix_set, target_slots, .. } => {
964 trace!(
965 target: "trie::proof_task",
966 worker_id = self.worker_id,
967 hashed_address = ?hashed_address,
968 prefix_set_len = prefix_set.len(),
969 target_slots_len = target_slots.len(),
970 "Processing storage proof"
971 );
972
973 proof_tx.compute_legacy_storage_proof(
974 input,
975 &mut trie_cursor_metrics,
976 &mut hashed_cursor_metrics,
977 )
978 }
979 StorageProofInput::V2 { hashed_address, targets } => {
980 trace!(
981 target: "trie::proof_task",
982 worker_id = self.worker_id,
983 hashed_address = ?hashed_address,
984 targets_len = targets.len(),
985 "Processing V2 storage proof"
986 );
987 proof_tx
988 .compute_v2_storage_proof(input, v2_calculator.expect("v2 calculator provided"))
989 }
990 };
991
992 let proof_elapsed = proof_start.elapsed();
993 *storage_proofs_processed += 1;
994
995 let root = result.as_ref().ok().and_then(|result| result.root());
996
997 if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
998 trace!(
999 target: "trie::proof_task",
1000 worker_id = self.worker_id,
1001 hashed_address = ?hashed_address,
1002 storage_proofs_processed,
1003 "Proof result receiver dropped, discarding result"
1004 );
1005 }
1006
1007 if let Some(root) = root {
1008 self.cached_storage_roots.insert(hashed_address, root);
1009 }
1010
1011 trace!(
1012 target: "trie::proof_task",
1013 worker_id = self.worker_id,
1014 hashed_address = ?hashed_address,
1015 proof_time_us = proof_elapsed.as_micros(),
1016 total_processed = storage_proofs_processed,
1017 trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
1018 hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
1019 ?trie_cursor_metrics,
1020 ?hashed_cursor_metrics,
1021 ?root,
1022 "Storage proof completed"
1023 );
1024
1025 #[cfg(feature = "metrics")]
1026 {
1027 let per_proof_cache = ProofTaskCursorMetricsCache {
1029 account_trie_cursor: TrieCursorMetricsCache::default(),
1030 account_hashed_cursor: HashedCursorMetricsCache::default(),
1031 storage_trie_cursor: trie_cursor_metrics,
1032 storage_hashed_cursor: hashed_cursor_metrics,
1033 };
1034 cursor_metrics_cache.extend(&per_proof_cache);
1035 }
1036 }
1037
1038 fn process_blinded_node<Provider>(
1040 worker_id: usize,
1041 proof_tx: &ProofTaskTx<Provider>,
1042 account: B256,
1043 path: Nibbles,
1044 result_sender: Sender<TrieNodeProviderResult>,
1045 storage_nodes_processed: &mut u64,
1046 ) where
1047 Provider: TrieCursorFactory + HashedCursorFactory,
1048 {
1049 trace!(
1050 target: "trie::proof_task",
1051 worker_id,
1052 ?account,
1053 ?path,
1054 "Processing blinded storage node"
1055 );
1056
1057 let start = Instant::now();
1058 let result = proof_tx.process_blinded_storage_node(account, &path);
1059 let elapsed = start.elapsed();
1060
1061 *storage_nodes_processed += 1;
1062
1063 if result_sender.send(result).is_err() {
1064 trace!(
1065 target: "trie::proof_task",
1066 worker_id,
1067 ?account,
1068 ?path,
1069 storage_nodes_processed,
1070 "Blinded storage node receiver dropped, discarding result"
1071 );
1072 }
1073
1074 trace!(
1075 target: "trie::proof_task",
1076 worker_id,
1077 ?account,
1078 ?path,
1079 elapsed_us = elapsed.as_micros(),
1080 total_processed = storage_nodes_processed,
1081 "Blinded storage node completed"
1082 );
1083 }
1084}
1085
1086struct AccountProofWorker<Factory> {
1091 task_ctx: ProofTaskCtx<Factory>,
1093 work_rx: CrossbeamReceiver<AccountWorkerJob>,
1095 worker_id: usize,
1097 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1099 available_workers: Arc<AtomicUsize>,
1101 cached_storage_roots: Arc<DashMap<B256, B256>>,
1103 #[cfg(feature = "metrics")]
1105 metrics: ProofTaskTrieMetrics,
1106 #[cfg(feature = "metrics")]
1108 cursor_metrics: ProofTaskCursorMetrics,
1109 v2_enabled: bool,
1111}
1112
1113impl<Factory> AccountProofWorker<Factory>
1114where
1115 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
1116{
1117 #[allow(clippy::too_many_arguments)]
1119 const fn new(
1120 task_ctx: ProofTaskCtx<Factory>,
1121 work_rx: CrossbeamReceiver<AccountWorkerJob>,
1122 worker_id: usize,
1123 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1124 available_workers: Arc<AtomicUsize>,
1125 cached_storage_roots: Arc<DashMap<B256, B256>>,
1126 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
1127 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
1128 ) -> Self {
1129 Self {
1130 task_ctx,
1131 work_rx,
1132 worker_id,
1133 storage_work_tx,
1134 available_workers,
1135 cached_storage_roots,
1136 #[cfg(feature = "metrics")]
1137 metrics,
1138 #[cfg(feature = "metrics")]
1139 cursor_metrics,
1140 v2_enabled: false,
1141 }
1142 }
1143
1144 const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
1146 self.v2_enabled = v2_enabled;
1147 self
1148 }
1149
1150 fn run(mut self) -> ProviderResult<()> {
1168 let provider = self.task_ctx.factory.database_provider_ro()?;
1170 let proof_tx = ProofTaskTx::new(provider, self.worker_id);
1171
1172 trace!(
1173 target: "trie::proof_task",
1174 worker_id=self.worker_id,
1175 "Account worker started"
1176 );
1177
1178 let mut account_proofs_processed = 0u64;
1179 let mut account_nodes_processed = 0u64;
1180 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1181
1182 let mut v2_calculator = if self.v2_enabled {
1183 let trie_cursor = proof_tx.provider.account_trie_cursor()?;
1184 let hashed_cursor = proof_tx.provider.hashed_account_cursor()?;
1185 Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new(
1186 trie_cursor,
1187 hashed_cursor,
1188 ))
1189 } else {
1190 None
1191 };
1192
1193 self.available_workers.fetch_add(1, Ordering::Relaxed);
1195
1196 while let Ok(job) = self.work_rx.recv() {
1197 self.available_workers.fetch_sub(1, Ordering::Relaxed);
1199
1200 match job {
1201 AccountWorkerJob::AccountMultiproof { input } => {
1202 self.process_account_multiproof(
1203 &proof_tx,
1204 v2_calculator.as_mut(),
1205 *input,
1206 &mut account_proofs_processed,
1207 &mut cursor_metrics_cache,
1208 );
1209 }
1210
1211 AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1212 Self::process_blinded_node(
1213 self.worker_id,
1214 &proof_tx,
1215 path,
1216 result_sender,
1217 &mut account_nodes_processed,
1218 );
1219 }
1220 }
1221
1222 self.available_workers.fetch_add(1, Ordering::Relaxed);
1224 }
1225
1226 trace!(
1227 target: "trie::proof_task",
1228 worker_id=self.worker_id,
1229 account_proofs_processed,
1230 account_nodes_processed,
1231 "Account worker shutting down"
1232 );
1233
1234 #[cfg(feature = "metrics")]
1235 {
1236 self.metrics.record_account_nodes(account_nodes_processed as usize);
1237 self.cursor_metrics.record(&mut cursor_metrics_cache);
1238 }
1239
1240 Ok(())
1241 }
1242
1243 fn compute_legacy_account_multiproof<Provider>(
1244 &self,
1245 proof_tx: &ProofTaskTx<Provider>,
1246 targets: MultiProofTargets,
1247 mut prefix_sets: TriePrefixSets,
1248 collect_branch_node_masks: bool,
1249 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1250 proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1251 ) -> Result<ProofResult, ParallelStateRootError>
1252 where
1253 Provider: TrieCursorFactory + HashedCursorFactory,
1254 {
1255 let span = debug_span!(
1256 target: "trie::proof_task",
1257 "Account multiproof calculation",
1258 targets = targets.len(),
1259 worker_id=self.worker_id,
1260 );
1261 let _span_guard = span.enter();
1262
1263 trace!(
1264 target: "trie::proof_task",
1265 "Processing account multiproof"
1266 );
1267
1268 let mut tracker = ParallelTrieTracker::default();
1269
1270 let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1271
1272 let storage_root_targets_len =
1273 StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1274
1275 tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1276
1277 let storage_proof_receivers = dispatch_storage_proofs(
1278 &self.storage_work_tx,
1279 &targets,
1280 &mut storage_prefix_sets,
1281 collect_branch_node_masks,
1282 multi_added_removed_keys.as_ref(),
1283 )?;
1284
1285 let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1286
1287 let ctx = AccountMultiproofParams {
1288 targets: &targets,
1289 prefix_set: account_prefix_set,
1290 collect_branch_node_masks,
1291 multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1292 storage_proof_receivers,
1293 cached_storage_roots: &self.cached_storage_roots,
1294 };
1295
1296 let result = build_account_multiproof_with_storage_roots(
1297 &proof_tx.provider,
1298 ctx,
1299 &mut tracker,
1300 proof_cursor_metrics,
1301 );
1302
1303 let stats = tracker.finish();
1304 result.map(|proof| ProofResult::Legacy(proof, stats))
1305 }
1306
1307 fn compute_v2_account_multiproof<Provider>(
1308 &self,
1309 v2_calculator: &mut proof_v2::ProofCalculator<
1310 <Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
1311 <Provider as HashedCursorFactory>::AccountCursor<'_>,
1312 AsyncAccountValueEncoder,
1313 >,
1314 targets: MultiProofTargetsV2,
1315 ) -> Result<ProofResult, ParallelStateRootError>
1316 where
1317 Provider: TrieCursorFactory + HashedCursorFactory,
1318 {
1319 let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1320
1321 let span = debug_span!(
1322 target: "trie::proof_task",
1323 "Account V2 multiproof calculation",
1324 account_targets = account_targets.len(),
1325 storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1326 worker_id = self.worker_id,
1327 );
1328 let _span_guard = span.enter();
1329
1330 trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1331
1332 let storage_proof_receivers =
1333 dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1334
1335 let mut value_encoder = AsyncAccountValueEncoder::new(
1336 self.storage_work_tx.clone(),
1337 storage_proof_receivers,
1338 self.cached_storage_roots.clone(),
1339 );
1340
1341 let proof = DecodedMultiProofV2 {
1342 account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?,
1343 storage_proofs: value_encoder.into_storage_proofs()?,
1344 };
1345
1346 Ok(ProofResult::V2(proof))
1347 }
1348
1349 fn process_account_multiproof<Provider>(
1351 &self,
1352 proof_tx: &ProofTaskTx<Provider>,
1353 v2_calculator: Option<
1354 &mut proof_v2::ProofCalculator<
1355 <Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
1356 <Provider as HashedCursorFactory>::AccountCursor<'_>,
1357 AsyncAccountValueEncoder,
1358 >,
1359 >,
1360 input: AccountMultiproofInput,
1361 account_proofs_processed: &mut u64,
1362 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1363 ) where
1364 Provider: TrieCursorFactory + HashedCursorFactory,
1365 {
1366 let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
1367 let proof_start = Instant::now();
1368
1369 let (proof_result_sender, result) = match input {
1370 AccountMultiproofInput::Legacy {
1371 targets,
1372 prefix_sets,
1373 collect_branch_node_masks,
1374 multi_added_removed_keys,
1375 proof_result_sender,
1376 } => (
1377 proof_result_sender,
1378 self.compute_legacy_account_multiproof(
1379 proof_tx,
1380 targets,
1381 prefix_sets,
1382 collect_branch_node_masks,
1383 multi_added_removed_keys,
1384 &mut proof_cursor_metrics,
1385 ),
1386 ),
1387 AccountMultiproofInput::V2 { targets, proof_result_sender } => (
1388 proof_result_sender,
1389 self.compute_v2_account_multiproof::<Provider>(
1390 v2_calculator.expect("v2 calculator provided"),
1391 targets,
1392 ),
1393 ),
1394 };
1395
1396 let ProofResultContext {
1397 sender: result_tx,
1398 sequence_number: seq,
1399 state,
1400 start_time: start,
1401 } = proof_result_sender;
1402
1403 let proof_elapsed = proof_start.elapsed();
1404 let total_elapsed = start.elapsed();
1405 *account_proofs_processed += 1;
1406
1407 if result_tx
1409 .send(ProofResultMessage {
1410 sequence_number: seq,
1411 result,
1412 elapsed: total_elapsed,
1413 state,
1414 })
1415 .is_err()
1416 {
1417 trace!(
1418 target: "trie::proof_task",
1419 worker_id=self.worker_id,
1420 account_proofs_processed,
1421 "Account multiproof receiver dropped, discarding result"
1422 );
1423 }
1424
1425 proof_cursor_metrics.record_spans();
1426
1427 trace!(
1428 target: "trie::proof_task",
1429 proof_time_us = proof_elapsed.as_micros(),
1430 total_elapsed_us = total_elapsed.as_micros(),
1431 total_processed = account_proofs_processed,
1432 account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1433 account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1434 storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1435 storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1436 account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1437 account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1438 storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1439 storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1440 "Account multiproof completed"
1441 );
1442
1443 #[cfg(feature = "metrics")]
1444 cursor_metrics_cache.extend(&proof_cursor_metrics);
1446 }
1447
1448 fn process_blinded_node<Provider>(
1450 worker_id: usize,
1451 proof_tx: &ProofTaskTx<Provider>,
1452 path: Nibbles,
1453 result_sender: Sender<TrieNodeProviderResult>,
1454 account_nodes_processed: &mut u64,
1455 ) where
1456 Provider: TrieCursorFactory + HashedCursorFactory,
1457 {
1458 let span = debug_span!(
1459 target: "trie::proof_task",
1460 "Blinded account node calculation",
1461 ?path,
1462 worker_id,
1463 );
1464 let _span_guard = span.enter();
1465
1466 trace!(
1467 target: "trie::proof_task",
1468 "Processing blinded account node"
1469 );
1470
1471 let start = Instant::now();
1472 let result = proof_tx.process_blinded_account_node(&path);
1473 let elapsed = start.elapsed();
1474
1475 *account_nodes_processed += 1;
1476
1477 if result_sender.send(result).is_err() {
1478 trace!(
1479 target: "trie::proof_task",
1480 worker_id,
1481 ?path,
1482 account_nodes_processed,
1483 "Blinded account node receiver dropped, discarding result"
1484 );
1485 }
1486
1487 trace!(
1488 target: "trie::proof_task",
1489 node_time_us = elapsed.as_micros(),
1490 total_processed = account_nodes_processed,
1491 "Blinded account node completed"
1492 );
1493 }
1494}
1495
1496fn build_account_multiproof_with_storage_roots<P>(
1504 provider: &P,
1505 ctx: AccountMultiproofParams<'_>,
1506 tracker: &mut ParallelTrieTracker,
1507 proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1508) -> Result<DecodedMultiProof, ParallelStateRootError>
1509where
1510 P: TrieCursorFactory + HashedCursorFactory,
1511{
1512 let accounts_added_removed_keys =
1513 ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1514
1515 let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1517 let account_trie_cursor = InstrumentedTrieCursor::new(
1518 account_trie_cursor,
1519 &mut proof_cursor_metrics.account_trie_cursor,
1520 );
1521
1522 let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1524 .with_added_removed_keys(accounts_added_removed_keys)
1525 .with_deletions_retained(true);
1526
1527 let retainer = ctx
1529 .targets
1530 .keys()
1531 .map(Nibbles::unpack)
1532 .collect::<ProofRetainer>()
1533 .with_added_removed_keys(accounts_added_removed_keys);
1534 let mut hash_builder = HashBuilder::default()
1535 .with_proof_retainer(retainer)
1536 .with_updates(ctx.collect_branch_node_masks);
1537
1538 let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1541 B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1542 let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1543
1544 let account_hashed_cursor =
1546 provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1547 let account_hashed_cursor = InstrumentedHashedCursor::new(
1548 account_hashed_cursor,
1549 &mut proof_cursor_metrics.account_hashed_cursor,
1550 );
1551
1552 let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1553
1554 let mut storage_proof_receivers = ctx.storage_proof_receivers;
1555
1556 while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1557 match account_node {
1558 TrieElement::Branch(node) => {
1559 hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1560 }
1561 TrieElement::Leaf(hashed_address, account) => {
1562 let root = match storage_proof_receivers.remove(&hashed_address) {
1563 Some(receiver) => {
1564 let _guard = debug_span!(
1565 target: "trie::proof_task",
1566 "Waiting for storage proof",
1567 ?hashed_address,
1568 );
1569 let proof_msg = receiver.recv().map_err(|_| {
1572 ParallelStateRootError::StorageRoot(
1573 reth_execution_errors::StorageRootError::Database(
1574 DatabaseError::Other(format!(
1575 "Storage proof channel closed for {hashed_address}"
1576 )),
1577 ),
1578 )
1579 })?;
1580
1581 drop(_guard);
1582
1583 debug_assert_eq!(
1585 proof_msg.hashed_address, hashed_address,
1586 "storage worker must return same address"
1587 );
1588 let proof_result = proof_msg.result?;
1589 let Some(root) = proof_result.root() else {
1590 trace!(
1591 target: "trie::proof_task",
1592 ?proof_result,
1593 "Received proof_result without root",
1594 );
1595 panic!("Partial proofs are not yet supported");
1596 };
1597 let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1598 .expect("Partial proofs are not yet supported (into)");
1599 collected_decoded_storages.insert(hashed_address, proof);
1600 root
1601 }
1602 None => {
1605 tracker.inc_missed_leaves();
1606
1607 match ctx.cached_storage_roots.entry(hashed_address) {
1608 dashmap::Entry::Occupied(occ) => *occ.get(),
1609 dashmap::Entry::Vacant(vac) => {
1610 let root =
1611 StorageProof::new_hashed(provider, provider, hashed_address)
1612 .with_prefix_set_mut(Default::default())
1613 .with_trie_cursor_metrics(
1614 &mut proof_cursor_metrics.storage_trie_cursor,
1615 )
1616 .with_hashed_cursor_metrics(
1617 &mut proof_cursor_metrics.storage_hashed_cursor,
1618 )
1619 .storage_multiproof(
1620 ctx.targets
1621 .get(&hashed_address)
1622 .cloned()
1623 .unwrap_or_default(),
1624 )
1625 .map_err(|e| {
1626 ParallelStateRootError::StorageRoot(
1627 reth_execution_errors::StorageRootError::Database(
1628 DatabaseError::Other(e.to_string()),
1629 ),
1630 )
1631 })?
1632 .root;
1633
1634 vac.insert(root);
1635 root
1636 }
1637 }
1638 }
1639 };
1640
1641 account_rlp.clear();
1643 let account = account.into_trie_account(root);
1644 account.encode(&mut account_rlp as &mut dyn BufMut);
1645
1646 hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1647 }
1648 }
1649 }
1650
1651 let _ = hash_builder.root();
1652
1653 let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1654 let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1655
1656 let branch_node_masks = if ctx.collect_branch_node_masks {
1657 let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1658 updated_branch_nodes
1659 .into_iter()
1660 .map(|(path, node)| {
1661 (path, BranchNodeMasks { hash_mask: node.hash_mask, tree_mask: node.tree_mask })
1662 })
1663 .collect()
1664 } else {
1665 BranchNodeMasksMap::default()
1666 };
1667
1668 for (hashed_address, receiver) in storage_proof_receivers {
1671 if let Ok(proof_msg) = receiver.recv() {
1672 let proof_result = proof_msg.result?;
1673 let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1674 .expect("Partial proofs are not yet supported");
1675 collected_decoded_storages.insert(hashed_address, proof);
1676 }
1677 }
1678
1679 Ok(DecodedMultiProof {
1680 account_subtree: decoded_account_subtree,
1681 branch_node_masks,
1682 storages: collected_decoded_storages,
1683 })
1684}
1685fn dispatch_storage_proofs(
1693 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1694 targets: &MultiProofTargets,
1695 storage_prefix_sets: &mut B256Map<PrefixSet>,
1696 with_branch_node_masks: bool,
1697 multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1698) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1699 let mut storage_proof_receivers =
1700 B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1701
1702 let mut sorted_targets: Vec<_> = targets.iter().collect();
1703 sorted_targets.sort_unstable_by_key(|(addr, _)| *addr);
1704
1705 for (hashed_address, target_slots) in sorted_targets {
1707 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1709
1710 let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1712 let input = StorageProofInput::legacy(
1713 *hashed_address,
1714 prefix_set,
1715 target_slots.clone(),
1716 with_branch_node_masks,
1717 multi_added_removed_keys.cloned(),
1718 );
1719
1720 storage_work_tx
1723 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1724 .map_err(|_| {
1725 ParallelStateRootError::Other(format!(
1726 "Failed to queue storage proof for {}: storage worker pool unavailable",
1727 hashed_address
1728 ))
1729 })?;
1730
1731 storage_proof_receivers.insert(*hashed_address, result_rx);
1732 }
1733
1734 Ok(storage_proof_receivers)
1735}
1736
1737fn dispatch_v2_storage_proofs(
1745 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1746 account_targets: &Vec<proof_v2::Target>,
1747 storage_targets: B256Map<Vec<proof_v2::Target>>,
1748) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1749 let mut storage_proof_receivers =
1750 B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1751
1752 for (hashed_address, targets) in storage_targets {
1754 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1756 let input = StorageProofInput::new(hashed_address, targets);
1757
1758 storage_work_tx
1759 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1760 .map_err(|_| {
1761 ParallelStateRootError::Other(format!(
1762 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1763 ))
1764 })?;
1765
1766 storage_proof_receivers.insert(hashed_address, result_rx);
1767 }
1768
1769 for target in account_targets {
1772 let hashed_address = target.key();
1773 if storage_proof_receivers.contains_key(&hashed_address) {
1774 continue
1775 }
1776
1777 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1778 let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]);
1779
1780 storage_work_tx
1781 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1782 .map_err(|_| {
1783 ParallelStateRootError::Other(format!(
1784 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1785 ))
1786 })?;
1787
1788 storage_proof_receivers.insert(hashed_address, result_rx);
1789 }
1790
1791 Ok(storage_proof_receivers)
1792}
1793
1794#[derive(Debug)]
1796pub enum StorageProofInput {
1797 Legacy {
1799 hashed_address: B256,
1801 prefix_set: PrefixSet,
1803 target_slots: B256Set,
1805 with_branch_node_masks: bool,
1807 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1809 },
1810 V2 {
1812 hashed_address: B256,
1814 targets: Vec<proof_v2::Target>,
1816 },
1817}
1818
1819impl StorageProofInput {
1820 pub const fn legacy(
1823 hashed_address: B256,
1824 prefix_set: PrefixSet,
1825 target_slots: B256Set,
1826 with_branch_node_masks: bool,
1827 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1828 ) -> Self {
1829 Self::Legacy {
1830 hashed_address,
1831 prefix_set,
1832 target_slots,
1833 with_branch_node_masks,
1834 multi_added_removed_keys,
1835 }
1836 }
1837
1838 pub const fn new(hashed_address: B256, targets: Vec<proof_v2::Target>) -> Self {
1840 Self::V2 { hashed_address, targets }
1841 }
1842
1843 pub const fn hashed_address(&self) -> B256 {
1845 match self {
1846 Self::Legacy { hashed_address, .. } | Self::V2 { hashed_address, .. } => {
1847 *hashed_address
1848 }
1849 }
1850 }
1851}
1852
1853#[derive(Debug)]
1855pub enum AccountMultiproofInput {
1856 Legacy {
1858 targets: MultiProofTargets,
1860 prefix_sets: TriePrefixSets,
1862 collect_branch_node_masks: bool,
1864 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1866 proof_result_sender: ProofResultContext,
1868 },
1869 V2 {
1871 targets: MultiProofTargetsV2,
1873 proof_result_sender: ProofResultContext,
1875 },
1876}
1877
1878impl AccountMultiproofInput {
1879 fn into_proof_result_sender(self) -> ProofResultContext {
1881 match self {
1882 Self::Legacy { proof_result_sender, .. } | Self::V2 { proof_result_sender, .. } => {
1883 proof_result_sender
1884 }
1885 }
1886 }
1887}
1888
1889struct AccountMultiproofParams<'a> {
1891 targets: &'a MultiProofTargets,
1893 prefix_set: PrefixSet,
1895 collect_branch_node_masks: bool,
1897 multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1899 storage_proof_receivers: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
1901 cached_storage_roots: &'a DashMap<B256, B256>,
1904}
1905
1906#[derive(Debug)]
1908enum AccountWorkerJob {
1909 AccountMultiproof {
1911 input: Box<AccountMultiproofInput>,
1913 },
1914 BlindedAccountNode {
1916 path: Nibbles,
1918 result_sender: Sender<TrieNodeProviderResult>,
1920 },
1921}
1922
1923#[cfg(test)]
1924mod tests {
1925 use super::*;
1926 use reth_provider::test_utils::create_test_provider_factory;
1927 use tokio::{runtime::Builder, task};
1928
1929 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1930 ProofTaskCtx::new(factory)
1931 }
1932
1933 #[test]
1935 fn spawn_proof_workers_creates_handle() {
1936 let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
1937 runtime.block_on(async {
1938 let handle = tokio::runtime::Handle::current();
1939 let provider_factory = create_test_provider_factory();
1940 let changeset_cache = reth_trie_db::ChangesetCache::new();
1941 let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1942 provider_factory,
1943 changeset_cache,
1944 );
1945 let ctx = test_ctx(factory);
1946
1947 let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3, false);
1948
1949 let _cloned_handle = proof_handle.clone();
1951
1952 drop(proof_handle);
1954 task::yield_now().await;
1955 });
1956 }
1957}