1use crate::{
33 root::ParallelStateRootError,
34 stats::{ParallelTrieStats, ParallelTrieTracker},
35 targets_v2::MultiProofTargetsV2,
36 value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
37 StorageRootTargets,
38};
39use alloy_primitives::{
40 map::{B256Map, B256Set},
41 B256,
42};
43use alloy_rlp::{BufMut, Encodable};
44use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
45use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
46use reth_primitives_traits::dashmap::{self, DashMap};
47use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
48use reth_storage_errors::db::DatabaseError;
49use reth_tasks::Runtime;
50use reth_trie::{
51 hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
52 node_iter::{TrieElement, TrieNodeIter},
53 prefix_set::TriePrefixSets,
54 proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
55 proof_v2,
56 trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
57 walker::TrieWalker,
58 DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState,
59 MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
60};
61use reth_trie_common::{
62 added_removed_keys::MultiAddedRemovedKeys,
63 prefix_set::{PrefixSet, PrefixSetMut},
64 proof::{DecodedProofNodes, ProofRetainer},
65 BranchNodeMasks, BranchNodeMasksMap,
66};
67use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
68use std::{
69 cell::RefCell,
70 rc::Rc,
71 sync::{
72 atomic::{AtomicUsize, Ordering},
73 mpsc::{channel, Receiver, Sender},
74 Arc,
75 },
76 time::{Duration, Instant},
77};
78use tracing::{debug, debug_span, error, trace};
79
80#[cfg(feature = "metrics")]
81use crate::proof_task_metrics::{
82 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
83};
84
85type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
86
87type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
89 <Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
90 <Provider as HashedCursorFactory>::AccountCursor<'a>,
91 AsyncAccountValueEncoder<
92 <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
93 <Provider as HashedCursorFactory>::StorageCursor<'a>,
94 >,
95>;
96
97type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
99 <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
100 <Provider as HashedCursorFactory>::StorageCursor<'a>,
101>;
102
103#[derive(Debug, Clone)]
109pub struct ProofWorkerHandle {
110 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
112 account_work_tx: CrossbeamSender<AccountWorkerJob>,
114 storage_available_workers: Arc<AtomicUsize>,
117 account_available_workers: Arc<AtomicUsize>,
120 storage_worker_count: usize,
122 account_worker_count: usize,
124 v2_proofs_enabled: bool,
126}
127
128impl ProofWorkerHandle {
129 pub fn new<Factory>(
139 runtime: &Runtime,
140 task_ctx: ProofTaskCtx<Factory>,
141 v2_proofs_enabled: bool,
142 ) -> Self
143 where
144 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
145 + Clone
146 + Send
147 + 'static,
148 {
149 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
150 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
151
152 let storage_available_workers = Arc::<AtomicUsize>::default();
153 let account_available_workers = Arc::<AtomicUsize>::default();
154
155 let cached_storage_roots = Arc::<DashMap<_, _>>::default();
156
157 let storage_worker_count = runtime.proof_storage_worker_pool().current_num_threads();
158 let account_worker_count = runtime.proof_account_worker_pool().current_num_threads();
159
160 debug!(
161 target: "trie::proof_task",
162 storage_worker_count,
163 account_worker_count,
164 ?v2_proofs_enabled,
165 "Spawning proof worker pools"
166 );
167
168 let storage_pool = runtime.proof_storage_worker_pool();
169 let task_ctx_for_storage = task_ctx.clone();
170 let cached_storage_roots_for_storage = cached_storage_roots.clone();
171
172 for worker_id in 0..storage_worker_count {
173 let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
174 let task_ctx_clone = task_ctx_for_storage.clone();
175 let work_rx_clone = storage_work_rx.clone();
176 let storage_available_workers_clone = storage_available_workers.clone();
177 let cached_storage_roots = cached_storage_roots_for_storage.clone();
178
179 storage_pool.spawn(move || {
180 #[cfg(feature = "metrics")]
181 let metrics = ProofTaskTrieMetrics::default();
182 #[cfg(feature = "metrics")]
183 let cursor_metrics = ProofTaskCursorMetrics::new();
184
185 let _guard = span.enter();
186 let worker = StorageProofWorker::new(
187 task_ctx_clone,
188 work_rx_clone,
189 worker_id,
190 storage_available_workers_clone,
191 cached_storage_roots,
192 #[cfg(feature = "metrics")]
193 metrics,
194 #[cfg(feature = "metrics")]
195 cursor_metrics,
196 )
197 .with_v2_proofs(v2_proofs_enabled);
198 if let Err(error) = worker.run() {
199 error!(
200 target: "trie::proof_task",
201 worker_id,
202 ?error,
203 "Storage worker failed"
204 );
205 }
206 });
207 }
208
209 let account_pool = runtime.proof_account_worker_pool();
210
211 for worker_id in 0..account_worker_count {
212 let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
213 let task_ctx_clone = task_ctx.clone();
214 let work_rx_clone = account_work_rx.clone();
215 let storage_work_tx_clone = storage_work_tx.clone();
216 let account_available_workers_clone = account_available_workers.clone();
217 let cached_storage_roots = cached_storage_roots.clone();
218
219 account_pool.spawn(move || {
220 #[cfg(feature = "metrics")]
221 let metrics = ProofTaskTrieMetrics::default();
222 #[cfg(feature = "metrics")]
223 let cursor_metrics = ProofTaskCursorMetrics::new();
224
225 let _guard = span.enter();
226 let worker = AccountProofWorker::new(
227 task_ctx_clone,
228 work_rx_clone,
229 worker_id,
230 storage_work_tx_clone,
231 account_available_workers_clone,
232 cached_storage_roots,
233 #[cfg(feature = "metrics")]
234 metrics,
235 #[cfg(feature = "metrics")]
236 cursor_metrics,
237 )
238 .with_v2_proofs(v2_proofs_enabled);
239 if let Err(error) = worker.run() {
240 error!(
241 target: "trie::proof_task",
242 worker_id,
243 ?error,
244 "Account worker failed"
245 );
246 }
247 });
248 }
249
250 Self {
251 storage_work_tx,
252 account_work_tx,
253 storage_available_workers,
254 account_available_workers,
255 storage_worker_count,
256 account_worker_count,
257 v2_proofs_enabled,
258 }
259 }
260
261 pub const fn v2_proofs_enabled(&self) -> bool {
263 self.v2_proofs_enabled
264 }
265
266 pub fn available_storage_workers(&self) -> usize {
268 self.storage_available_workers.load(Ordering::Relaxed)
269 }
270
271 pub fn available_account_workers(&self) -> usize {
273 self.account_available_workers.load(Ordering::Relaxed)
274 }
275
276 pub fn pending_storage_tasks(&self) -> usize {
278 self.storage_work_tx.len()
279 }
280
281 pub fn pending_account_tasks(&self) -> usize {
283 self.account_work_tx.len()
284 }
285
286 pub const fn total_storage_workers(&self) -> usize {
288 self.storage_worker_count
289 }
290
291 pub const fn total_account_workers(&self) -> usize {
293 self.account_worker_count
294 }
295
296 pub fn active_storage_workers(&self) -> usize {
300 self.storage_worker_count.saturating_sub(self.available_storage_workers())
301 }
302
303 pub fn active_account_workers(&self) -> usize {
307 self.account_worker_count.saturating_sub(self.available_account_workers())
308 }
309
310 pub fn dispatch_storage_proof(
314 &self,
315 input: StorageProofInput,
316 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
317 ) -> Result<(), ProviderError> {
318 let hashed_address = input.hashed_address();
319 self.storage_work_tx
320 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
321 .map_err(|err| {
322 if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
323 let _ = proof_result_sender.send(StorageProofResultMessage {
324 hashed_address,
325 result: Err(DatabaseError::Other(
326 "storage workers unavailable".to_string(),
327 )
328 .into()),
329 });
330 }
331
332 ProviderError::other(std::io::Error::other("storage workers unavailable"))
333 })
334 }
335
336 pub fn dispatch_account_multiproof(
340 &self,
341 input: AccountMultiproofInput,
342 ) -> Result<(), ProviderError> {
343 self.account_work_tx
344 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
345 .map_err(|err| {
346 let error =
347 ProviderError::other(std::io::Error::other("account workers unavailable"));
348
349 if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
350 let ProofResultContext {
351 sender: result_tx,
352 sequence_number: seq,
353 state,
354 start_time: start,
355 } = input.into_proof_result_sender();
356
357 let _ = result_tx.send(ProofResultMessage {
358 sequence_number: seq,
359 result: Err(ParallelStateRootError::Provider(error.clone())),
360 elapsed: start.elapsed(),
361 state,
362 });
363 }
364
365 error
366 })
367 }
368
369 pub(crate) fn dispatch_blinded_storage_node(
371 &self,
372 account: B256,
373 path: Nibbles,
374 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
375 let (tx, rx) = channel();
376 self.storage_work_tx
377 .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
378 .map_err(|_| {
379 ProviderError::other(std::io::Error::other("storage workers unavailable"))
380 })?;
381
382 Ok(rx)
383 }
384
385 pub(crate) fn dispatch_blinded_account_node(
387 &self,
388 path: Nibbles,
389 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
390 let (tx, rx) = channel();
391 self.account_work_tx
392 .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
393 .map_err(|_| {
394 ProviderError::other(std::io::Error::other("account workers unavailable"))
395 })?;
396
397 Ok(rx)
398 }
399}
400
401#[derive(Clone, Debug)]
403pub struct ProofTaskCtx<Factory> {
404 factory: Factory,
406}
407
408impl<Factory> ProofTaskCtx<Factory> {
409 pub const fn new(factory: Factory) -> Self {
411 Self { factory }
412 }
413}
414
415#[derive(Debug)]
417pub struct ProofTaskTx<Provider> {
418 provider: Provider,
420
421 id: usize,
423}
424
425impl<Provider> ProofTaskTx<Provider> {
426 const fn new(provider: Provider, id: usize) -> Self {
428 Self { provider, id }
429 }
430}
431
432impl<Provider> ProofTaskTx<Provider>
433where
434 Provider: TrieCursorFactory + HashedCursorFactory,
435{
436 #[inline]
440 fn compute_legacy_storage_proof(
441 &self,
442 input: StorageProofInput,
443 trie_cursor_metrics: &mut TrieCursorMetricsCache,
444 hashed_cursor_metrics: &mut HashedCursorMetricsCache,
445 ) -> Result<StorageProofResult, StateProofError> {
446 let StorageProofInput::Legacy {
448 hashed_address,
449 prefix_set,
450 target_slots,
451 with_branch_node_masks,
452 multi_added_removed_keys,
453 } = input
454 else {
455 panic!("compute_legacy_storage_proof only accepts StorageProofInput::Legacy")
456 };
457
458 let multi_added_removed_keys =
460 multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
461 let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
462
463 let span = debug_span!(
464 target: "trie::proof_task",
465 "Storage proof calculation",
466 target_slots = ?target_slots.len(),
467 );
468 let _span_guard = span.enter();
469
470 let proof_start = Instant::now();
471
472 let raw_proof_result =
474 StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
475 .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
476 .with_branch_node_masks(with_branch_node_masks)
477 .with_added_removed_keys(added_removed_keys)
478 .with_trie_cursor_metrics(trie_cursor_metrics)
479 .with_hashed_cursor_metrics(hashed_cursor_metrics)
480 .storage_multiproof(target_slots);
481 trie_cursor_metrics.record_span("trie_cursor");
482 hashed_cursor_metrics.record_span("hashed_cursor");
483
484 let decoded_result =
486 raw_proof_result.and_then(|raw_proof| raw_proof.try_into().map_err(Into::into))?;
487
488 trace!(
489 target: "trie::proof_task",
490 hashed_address = ?hashed_address,
491 proof_time_us = proof_start.elapsed().as_micros(),
492 worker_id = self.id,
493 "Completed storage proof calculation"
494 );
495
496 Ok(StorageProofResult::Legacy { proof: decoded_result })
497 }
498
499 fn compute_v2_storage_proof(
500 &self,
501 input: StorageProofInput,
502 calculator: &mut proof_v2::StorageProofCalculator<
503 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
504 <Provider as HashedCursorFactory>::StorageCursor<'_>,
505 >,
506 ) -> Result<StorageProofResult, StateProofError> {
507 let StorageProofInput::V2 { hashed_address, mut targets } = input else {
508 panic!("compute_v2_storage_proof only accepts StorageProofInput::V2")
509 };
510
511 let span = debug_span!(
512 target: "trie::proof_task",
513 "V2 Storage proof calculation",
514 targets = ?targets.len(),
515 );
516 let _span_guard = span.enter();
517
518 let proof_start = Instant::now();
519
520 let proof = if targets.is_empty() {
522 let root_node = calculator.storage_root_node(hashed_address)?;
523 vec![root_node]
524 } else {
525 calculator.storage_proof(hashed_address, &mut targets)?
526 };
527
528 let root = calculator.compute_root_hash(&proof)?;
529
530 trace!(
531 target: "trie::proof_task",
532 hashed_address = ?hashed_address,
533 proof_time_us = proof_start.elapsed().as_micros(),
534 ?root,
535 worker_id = self.id,
536 "Completed V2 storage proof calculation"
537 );
538
539 Ok(StorageProofResult::V2 { proof, root })
540 }
541
542 fn process_blinded_storage_node(
546 &self,
547 account: B256,
548 path: &Nibbles,
549 ) -> TrieNodeProviderResult {
550 let storage_node_provider =
551 ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
552 storage_node_provider.trie_node(path)
553 }
554}
555impl TrieNodeProviderFactory for ProofWorkerHandle {
556 type AccountNodeProvider = ProofTaskTrieNodeProvider;
557 type StorageNodeProvider = ProofTaskTrieNodeProvider;
558
559 fn account_node_provider(&self) -> Self::AccountNodeProvider {
560 ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
561 }
562
563 fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
564 ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
565 }
566}
567
568#[derive(Debug)]
570pub enum ProofTaskTrieNodeProvider {
571 AccountNode {
573 handle: ProofWorkerHandle,
575 },
576 StorageNode {
578 account: B256,
580 handle: ProofWorkerHandle,
582 },
583}
584
585impl TrieNodeProvider for ProofTaskTrieNodeProvider {
586 fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
587 match self {
588 Self::AccountNode { handle } => {
589 let rx = handle
590 .dispatch_blinded_account_node(*path)
591 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
592 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
593 }
594 Self::StorageNode { handle, account } => {
595 let rx = handle
596 .dispatch_blinded_storage_node(*account, *path)
597 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
598 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
599 }
600 }
601 }
602}
603
604#[derive(Debug)]
606pub enum ProofResult {
607 Legacy(DecodedMultiProof, ParallelTrieStats),
609 V2(DecodedMultiProofV2),
611}
612
613impl ProofResult {
614 pub fn empty(v2_enabled: bool) -> Self {
619 if v2_enabled {
620 Self::V2(DecodedMultiProofV2::default())
621 } else {
622 let stats = ParallelTrieTracker::default().finish();
623 Self::Legacy(DecodedMultiProof::default(), stats)
624 }
625 }
626
627 pub fn is_empty(&self) -> bool {
629 match self {
630 Self::Legacy(proof, _) => proof.is_empty(),
631 Self::V2(proof) => proof.is_empty(),
632 }
633 }
634
635 pub fn extend(&mut self, other: Self) {
641 match (self, other) {
642 (Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other),
643 (Self::V2(proof), Self::V2(other)) => proof.extend(other),
644 _ => panic!("mismatched ProofResults, cannot extend one with the other"),
645 }
646 }
647
648 pub fn account_proofs_len(&self) -> usize {
650 match self {
651 Self::Legacy(proof, _) => proof.account_subtree.len(),
652 Self::V2(proof) => proof.account_proofs.len(),
653 }
654 }
655
656 pub fn storage_proofs_len(&self) -> usize {
658 match self {
659 Self::Legacy(proof, _) => {
660 proof.storages.values().map(|p| p.subtree.len()).sum::<usize>()
661 }
662 Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::<usize>(),
663 }
664 }
665}
666
667pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
672
673#[derive(Debug)]
678pub struct ProofResultMessage {
679 pub sequence_number: u64,
681 pub result: Result<ProofResult, ParallelStateRootError>,
683 pub elapsed: Duration,
685 pub state: HashedPostState,
687}
688
689#[derive(Debug, Clone)]
694pub struct ProofResultContext {
695 pub sender: ProofResultSender,
697 pub sequence_number: u64,
699 pub state: HashedPostState,
701 pub start_time: Instant,
703}
704
705impl ProofResultContext {
706 pub const fn new(
708 sender: ProofResultSender,
709 sequence_number: u64,
710 state: HashedPostState,
711 start_time: Instant,
712 ) -> Self {
713 Self { sender, sequence_number, state, start_time }
714 }
715}
716
717#[derive(Debug)]
719pub(crate) enum StorageProofResult {
720 Legacy {
721 proof: DecodedStorageMultiProof,
723 },
724 V2 {
725 proof: Vec<ProofTrieNode>,
727 root: Option<B256>,
729 },
730}
731
732impl StorageProofResult {
733 const fn root(&self) -> Option<B256> {
735 match self {
736 Self::Legacy { proof } => Some(proof.root),
737 Self::V2 { root, .. } => *root,
738 }
739 }
740}
741
742impl From<StorageProofResult> for Option<DecodedStorageMultiProof> {
743 fn from(proof_result: StorageProofResult) -> Self {
745 match proof_result {
746 StorageProofResult::Legacy { proof } => Some(proof),
747 StorageProofResult::V2 { proof, root } => root.map(|root| {
748 let branch_node_masks = proof
749 .iter()
750 .filter_map(|node| node.masks.map(|masks| (node.path, masks)))
751 .collect();
752 let subtree = proof.into_iter().map(|node| (node.path, node.node)).collect();
753 DecodedStorageMultiProof { root, subtree, branch_node_masks }
754 }),
755 }
756 }
757}
758
759#[derive(Debug)]
761pub struct StorageProofResultMessage {
762 pub(crate) hashed_address: B256,
764 pub(crate) result: Result<StorageProofResult, StateProofError>,
766}
767
768#[derive(Debug)]
770pub(crate) enum StorageWorkerJob {
771 StorageProof {
773 input: StorageProofInput,
775 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
777 },
778 BlindedStorageNode {
780 account: B256,
782 path: Nibbles,
784 result_sender: Sender<TrieNodeProviderResult>,
786 },
787}
788
789struct StorageProofWorker<Factory> {
794 task_ctx: ProofTaskCtx<Factory>,
796 work_rx: CrossbeamReceiver<StorageWorkerJob>,
798 worker_id: usize,
800 available_workers: Arc<AtomicUsize>,
802 cached_storage_roots: Arc<DashMap<B256, B256>>,
804 #[cfg(feature = "metrics")]
806 metrics: ProofTaskTrieMetrics,
807 #[cfg(feature = "metrics")]
809 cursor_metrics: ProofTaskCursorMetrics,
810 v2_enabled: bool,
812}
813
814impl<Factory> StorageProofWorker<Factory>
815where
816 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
817{
818 const fn new(
820 task_ctx: ProofTaskCtx<Factory>,
821 work_rx: CrossbeamReceiver<StorageWorkerJob>,
822 worker_id: usize,
823 available_workers: Arc<AtomicUsize>,
824 cached_storage_roots: Arc<DashMap<B256, B256>>,
825 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
826 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
827 ) -> Self {
828 Self {
829 task_ctx,
830 work_rx,
831 worker_id,
832 available_workers,
833 cached_storage_roots,
834 #[cfg(feature = "metrics")]
835 metrics,
836 #[cfg(feature = "metrics")]
837 cursor_metrics,
838 v2_enabled: false,
839 }
840 }
841
842 const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
844 self.v2_enabled = v2_enabled;
845 self
846 }
847
848 fn run(mut self) -> ProviderResult<()> {
866 let provider = self.task_ctx.factory.database_provider_ro()?;
868 let proof_tx = ProofTaskTx::new(provider, self.worker_id);
869
870 trace!(
871 target: "trie::proof_task",
872 worker_id = self.worker_id,
873 "Storage worker started"
874 );
875
876 let mut storage_proofs_processed = 0u64;
877 let mut storage_nodes_processed = 0u64;
878 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
879 let mut v2_calculator = if self.v2_enabled {
880 let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
881 let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
882 Some(proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor))
883 } else {
884 None
885 };
886
887 self.available_workers.fetch_add(1, Ordering::Relaxed);
889
890 let mut total_idle_time = Duration::ZERO;
891 let mut idle_start = Instant::now();
892
893 while let Ok(job) = self.work_rx.recv() {
894 total_idle_time += idle_start.elapsed();
895
896 self.available_workers.fetch_sub(1, Ordering::Relaxed);
898
899 match job {
900 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
901 self.process_storage_proof(
902 &proof_tx,
903 v2_calculator.as_mut(),
904 input,
905 proof_result_sender,
906 &mut storage_proofs_processed,
907 &mut cursor_metrics_cache,
908 );
909 }
910
911 StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
912 Self::process_blinded_node(
913 self.worker_id,
914 &proof_tx,
915 account,
916 path,
917 result_sender,
918 &mut storage_nodes_processed,
919 );
920 }
921 }
922
923 self.available_workers.fetch_add(1, Ordering::Relaxed);
925
926 idle_start = Instant::now();
927 }
928
929 trace!(
930 target: "trie::proof_task",
931 worker_id = self.worker_id,
932 storage_proofs_processed,
933 storage_nodes_processed,
934 total_idle_time_us = total_idle_time.as_micros(),
935 "Storage worker shutting down"
936 );
937
938 #[cfg(feature = "metrics")]
939 {
940 self.metrics.record_storage_nodes(storage_nodes_processed as usize);
941 self.metrics.record_storage_worker_idle_time(total_idle_time);
942 self.cursor_metrics.record(&mut cursor_metrics_cache);
943 }
944
945 Ok(())
946 }
947
948 fn process_storage_proof<Provider>(
950 &self,
951 proof_tx: &ProofTaskTx<Provider>,
952 v2_calculator: Option<
953 &mut proof_v2::StorageProofCalculator<
954 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
955 <Provider as HashedCursorFactory>::StorageCursor<'_>,
956 >,
957 >,
958 input: StorageProofInput,
959 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
960 storage_proofs_processed: &mut u64,
961 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
962 ) where
963 Provider: TrieCursorFactory + HashedCursorFactory,
964 {
965 let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
966 let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
967 let hashed_address = input.hashed_address();
968 let proof_start = Instant::now();
969
970 let result = match &input {
971 StorageProofInput::Legacy { hashed_address, prefix_set, target_slots, .. } => {
972 trace!(
973 target: "trie::proof_task",
974 worker_id = self.worker_id,
975 hashed_address = ?hashed_address,
976 prefix_set_len = prefix_set.len(),
977 target_slots_len = target_slots.len(),
978 "Processing storage proof"
979 );
980
981 proof_tx.compute_legacy_storage_proof(
982 input,
983 &mut trie_cursor_metrics,
984 &mut hashed_cursor_metrics,
985 )
986 }
987 StorageProofInput::V2 { hashed_address, targets } => {
988 trace!(
989 target: "trie::proof_task",
990 worker_id = self.worker_id,
991 hashed_address = ?hashed_address,
992 targets_len = targets.len(),
993 "Processing V2 storage proof"
994 );
995 proof_tx
996 .compute_v2_storage_proof(input, v2_calculator.expect("v2 calculator provided"))
997 }
998 };
999
1000 let proof_elapsed = proof_start.elapsed();
1001 *storage_proofs_processed += 1;
1002
1003 let root = result.as_ref().ok().and_then(|result| result.root());
1004
1005 if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
1006 trace!(
1007 target: "trie::proof_task",
1008 worker_id = self.worker_id,
1009 hashed_address = ?hashed_address,
1010 storage_proofs_processed,
1011 "Proof result receiver dropped, discarding result"
1012 );
1013 }
1014
1015 if let Some(root) = root {
1016 self.cached_storage_roots.insert(hashed_address, root);
1017 }
1018
1019 trace!(
1020 target: "trie::proof_task",
1021 worker_id = self.worker_id,
1022 hashed_address = ?hashed_address,
1023 proof_time_us = proof_elapsed.as_micros(),
1024 total_processed = storage_proofs_processed,
1025 trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
1026 hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
1027 ?trie_cursor_metrics,
1028 ?hashed_cursor_metrics,
1029 ?root,
1030 "Storage proof completed"
1031 );
1032
1033 #[cfg(feature = "metrics")]
1034 {
1035 let per_proof_cache = ProofTaskCursorMetricsCache {
1037 account_trie_cursor: TrieCursorMetricsCache::default(),
1038 account_hashed_cursor: HashedCursorMetricsCache::default(),
1039 storage_trie_cursor: trie_cursor_metrics,
1040 storage_hashed_cursor: hashed_cursor_metrics,
1041 };
1042 cursor_metrics_cache.extend(&per_proof_cache);
1043 }
1044 }
1045
1046 fn process_blinded_node<Provider>(
1048 worker_id: usize,
1049 proof_tx: &ProofTaskTx<Provider>,
1050 account: B256,
1051 path: Nibbles,
1052 result_sender: Sender<TrieNodeProviderResult>,
1053 storage_nodes_processed: &mut u64,
1054 ) where
1055 Provider: TrieCursorFactory + HashedCursorFactory,
1056 {
1057 trace!(
1058 target: "trie::proof_task",
1059 worker_id,
1060 ?account,
1061 ?path,
1062 "Processing blinded storage node"
1063 );
1064
1065 let start = Instant::now();
1066 let result = proof_tx.process_blinded_storage_node(account, &path);
1067 let elapsed = start.elapsed();
1068
1069 *storage_nodes_processed += 1;
1070
1071 if result_sender.send(result).is_err() {
1072 trace!(
1073 target: "trie::proof_task",
1074 worker_id,
1075 ?account,
1076 ?path,
1077 storage_nodes_processed,
1078 "Blinded storage node receiver dropped, discarding result"
1079 );
1080 }
1081
1082 trace!(
1083 target: "trie::proof_task",
1084 worker_id,
1085 ?account,
1086 ?path,
1087 elapsed_us = elapsed.as_micros(),
1088 total_processed = storage_nodes_processed,
1089 "Blinded storage node completed"
1090 );
1091 }
1092}
1093
1094struct AccountProofWorker<Factory> {
1099 task_ctx: ProofTaskCtx<Factory>,
1101 work_rx: CrossbeamReceiver<AccountWorkerJob>,
1103 worker_id: usize,
1105 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1107 available_workers: Arc<AtomicUsize>,
1109 cached_storage_roots: Arc<DashMap<B256, B256>>,
1111 #[cfg(feature = "metrics")]
1113 metrics: ProofTaskTrieMetrics,
1114 #[cfg(feature = "metrics")]
1116 cursor_metrics: ProofTaskCursorMetrics,
1117 v2_enabled: bool,
1119}
1120
1121impl<Factory> AccountProofWorker<Factory>
1122where
1123 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
1124{
1125 #[allow(clippy::too_many_arguments)]
1127 const fn new(
1128 task_ctx: ProofTaskCtx<Factory>,
1129 work_rx: CrossbeamReceiver<AccountWorkerJob>,
1130 worker_id: usize,
1131 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1132 available_workers: Arc<AtomicUsize>,
1133 cached_storage_roots: Arc<DashMap<B256, B256>>,
1134 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
1135 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
1136 ) -> Self {
1137 Self {
1138 task_ctx,
1139 work_rx,
1140 worker_id,
1141 storage_work_tx,
1142 available_workers,
1143 cached_storage_roots,
1144 #[cfg(feature = "metrics")]
1145 metrics,
1146 #[cfg(feature = "metrics")]
1147 cursor_metrics,
1148 v2_enabled: false,
1149 }
1150 }
1151
1152 const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
1154 self.v2_enabled = v2_enabled;
1155 self
1156 }
1157
1158 fn run(mut self) -> ProviderResult<()> {
1176 let provider = self.task_ctx.factory.database_provider_ro()?;
1177
1178 trace!(
1179 target: "trie::proof_task",
1180 worker_id=self.worker_id,
1181 "Account worker started"
1182 );
1183
1184 let mut account_proofs_processed = 0u64;
1185 let mut account_nodes_processed = 0u64;
1186 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1187
1188 let (mut v2_account_calculator, v2_storage_calculator) = if self.v2_enabled {
1191 let account_trie_cursor = provider.account_trie_cursor()?;
1192 let account_hashed_cursor = provider.hashed_account_cursor()?;
1193
1194 let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
1195 let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
1196
1197 (
1198 Some(proof_v2::ProofCalculator::<
1199 _,
1200 _,
1201 AsyncAccountValueEncoder<
1202 <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
1203 <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
1204 >,
1205 >::new(account_trie_cursor, account_hashed_cursor)),
1206 Some(Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
1207 storage_trie_cursor,
1208 storage_hashed_cursor,
1209 )))),
1210 )
1211 } else {
1212 (None, None)
1213 };
1214
1215 self.available_workers.fetch_add(1, Ordering::Relaxed);
1217
1218 let mut total_idle_time = Duration::ZERO;
1219 let mut idle_start = Instant::now();
1220 let mut value_encoder_stats_cache = ValueEncoderStats::default();
1221
1222 while let Ok(job) = self.work_rx.recv() {
1223 total_idle_time += idle_start.elapsed();
1224
1225 self.available_workers.fetch_sub(1, Ordering::Relaxed);
1227
1228 match job {
1229 AccountWorkerJob::AccountMultiproof { input } => {
1230 let value_encoder_stats = self.process_account_multiproof(
1231 &provider,
1232 v2_account_calculator.as_mut(),
1233 v2_storage_calculator.clone(),
1234 *input,
1235 &mut account_proofs_processed,
1236 &mut cursor_metrics_cache,
1237 );
1238 total_idle_time += value_encoder_stats.storage_wait_time;
1239 value_encoder_stats_cache.extend(&value_encoder_stats);
1240 }
1241
1242 AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1243 Self::process_blinded_node(
1244 self.worker_id,
1245 &provider,
1246 path,
1247 result_sender,
1248 &mut account_nodes_processed,
1249 );
1250 }
1251 }
1252
1253 self.available_workers.fetch_add(1, Ordering::Relaxed);
1255
1256 idle_start = Instant::now();
1257 }
1258
1259 trace!(
1260 target: "trie::proof_task",
1261 worker_id=self.worker_id,
1262 account_proofs_processed,
1263 account_nodes_processed,
1264 total_idle_time_us = total_idle_time.as_micros(),
1265 "Account worker shutting down"
1266 );
1267
1268 #[cfg(feature = "metrics")]
1269 {
1270 self.metrics.record_account_nodes(account_nodes_processed as usize);
1271 self.metrics.record_account_worker_idle_time(total_idle_time);
1272 self.cursor_metrics.record(&mut cursor_metrics_cache);
1273 self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
1274 }
1275
1276 Ok(())
1277 }
1278
1279 fn compute_legacy_account_multiproof<Provider>(
1280 &self,
1281 provider: &Provider,
1282 targets: MultiProofTargets,
1283 mut prefix_sets: TriePrefixSets,
1284 collect_branch_node_masks: bool,
1285 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1286 proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1287 ) -> Result<(ProofResult, Duration), ParallelStateRootError>
1288 where
1289 Provider: TrieCursorFactory + HashedCursorFactory,
1290 {
1291 let span = debug_span!(
1292 target: "trie::proof_task",
1293 "Account multiproof calculation",
1294 targets = targets.len(),
1295 num_slots = targets.values().map(|slots| slots.len()).sum::<usize>(),
1296 );
1297 let _span_guard = span.enter();
1298
1299 trace!(
1300 target: "trie::proof_task",
1301 "Processing account multiproof"
1302 );
1303
1304 let mut tracker = ParallelTrieTracker::default();
1305
1306 let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1307
1308 let storage_root_targets_len =
1309 StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1310
1311 tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1312
1313 let storage_proof_receivers = dispatch_storage_proofs(
1314 &self.storage_work_tx,
1315 &targets,
1316 &mut storage_prefix_sets,
1317 collect_branch_node_masks,
1318 multi_added_removed_keys.as_ref(),
1319 )?;
1320
1321 let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1322
1323 let ctx = AccountMultiproofParams {
1324 targets: &targets,
1325 prefix_set: account_prefix_set,
1326 collect_branch_node_masks,
1327 multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1328 storage_proof_receivers,
1329 cached_storage_roots: &self.cached_storage_roots,
1330 };
1331
1332 let mut storage_wait_time = Duration::ZERO;
1333 let result = build_account_multiproof_with_storage_roots(
1334 provider,
1335 ctx,
1336 &mut tracker,
1337 proof_cursor_metrics,
1338 &mut storage_wait_time,
1339 )?;
1340
1341 let stats = tracker.finish();
1342 Ok((ProofResult::Legacy(result, stats), storage_wait_time))
1343 }
1344
1345 fn compute_v2_account_multiproof<'a, Provider>(
1346 &self,
1347 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1348 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1349 targets: MultiProofTargetsV2,
1350 ) -> Result<(ProofResult, ValueEncoderStats), ParallelStateRootError>
1351 where
1352 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1353 {
1354 let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1355
1356 let span = debug_span!(
1357 target: "trie::proof_task",
1358 "Account V2 multiproof calculation",
1359 account_targets = account_targets.len(),
1360 storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1361 );
1362 let _span_guard = span.enter();
1363
1364 trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1365
1366 let storage_proof_receivers =
1367 dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1368
1369 let mut value_encoder = AsyncAccountValueEncoder::new(
1370 storage_proof_receivers,
1371 self.cached_storage_roots.clone(),
1372 v2_storage_calculator,
1373 );
1374
1375 let account_proofs =
1376 v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
1377
1378 let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
1379
1380 let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
1381
1382 Ok((ProofResult::V2(proof), value_encoder_stats))
1383 }
1384
1385 fn process_account_multiproof<'a, Provider>(
1389 &self,
1390 provider: &Provider,
1391 v2_account_calculator: Option<&mut V2AccountProofCalculator<'a, Provider>>,
1392 v2_storage_calculator: Option<Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>>,
1393 input: AccountMultiproofInput,
1394 account_proofs_processed: &mut u64,
1395 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1396 ) -> ValueEncoderStats
1397 where
1398 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1399 {
1400 let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
1401 let proof_start = Instant::now();
1402
1403 let (proof_result_sender, result, value_encoder_stats) = match input {
1404 AccountMultiproofInput::Legacy {
1405 targets,
1406 prefix_sets,
1407 collect_branch_node_masks,
1408 multi_added_removed_keys,
1409 proof_result_sender,
1410 } => {
1411 let (result, value_encoder_stats) = match self.compute_legacy_account_multiproof(
1412 provider,
1413 targets,
1414 prefix_sets,
1415 collect_branch_node_masks,
1416 multi_added_removed_keys,
1417 &mut proof_cursor_metrics,
1418 ) {
1419 Ok((proof, wait_time)) => (
1420 Ok(proof),
1421 ValueEncoderStats { storage_wait_time: wait_time, ..Default::default() },
1422 ),
1423 Err(e) => (Err(e), ValueEncoderStats::default()),
1424 };
1425 (proof_result_sender, result, value_encoder_stats)
1426 }
1427 AccountMultiproofInput::V2 { targets, proof_result_sender } => {
1428 let (result, value_encoder_stats) = match self
1429 .compute_v2_account_multiproof::<Provider>(
1430 v2_account_calculator.expect("v2 account calculator provided"),
1431 v2_storage_calculator.expect("v2 storage calculator provided"),
1432 targets,
1433 ) {
1434 Ok((proof, stats)) => (Ok(proof), stats),
1435 Err(e) => (Err(e), ValueEncoderStats::default()),
1436 };
1437 (proof_result_sender, result, value_encoder_stats)
1438 }
1439 };
1440
1441 let ProofResultContext {
1442 sender: result_tx,
1443 sequence_number: seq,
1444 state,
1445 start_time: start,
1446 } = proof_result_sender;
1447
1448 let proof_elapsed = proof_start.elapsed();
1449 let total_elapsed = start.elapsed();
1450 *account_proofs_processed += 1;
1451
1452 if result_tx
1454 .send(ProofResultMessage {
1455 sequence_number: seq,
1456 result,
1457 elapsed: total_elapsed,
1458 state,
1459 })
1460 .is_err()
1461 {
1462 trace!(
1463 target: "trie::proof_task",
1464 worker_id=self.worker_id,
1465 account_proofs_processed,
1466 "Account multiproof receiver dropped, discarding result"
1467 );
1468 }
1469
1470 proof_cursor_metrics.record_spans();
1471
1472 trace!(
1473 target: "trie::proof_task",
1474 proof_time_us = proof_elapsed.as_micros(),
1475 total_elapsed_us = total_elapsed.as_micros(),
1476 total_processed = account_proofs_processed,
1477 account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1478 account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1479 storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1480 storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1481 account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1482 account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1483 storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1484 storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1485 "Account multiproof completed"
1486 );
1487
1488 #[cfg(feature = "metrics")]
1489 cursor_metrics_cache.extend(&proof_cursor_metrics);
1491
1492 value_encoder_stats
1493 }
1494
1495 fn process_blinded_node<Provider>(
1497 worker_id: usize,
1498 provider: &Provider,
1499 path: Nibbles,
1500 result_sender: Sender<TrieNodeProviderResult>,
1501 account_nodes_processed: &mut u64,
1502 ) where
1503 Provider: TrieCursorFactory + HashedCursorFactory,
1504 {
1505 let span = debug_span!(
1506 target: "trie::proof_task",
1507 "Blinded account node calculation",
1508 ?path,
1509 );
1510 let _span_guard = span.enter();
1511
1512 trace!(
1513 target: "trie::proof_task",
1514 "Processing blinded account node"
1515 );
1516
1517 let start = Instant::now();
1518 let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
1519 let result = account_node_provider.trie_node(&path);
1520 let elapsed = start.elapsed();
1521
1522 *account_nodes_processed += 1;
1523
1524 if result_sender.send(result).is_err() {
1525 trace!(
1526 target: "trie::proof_task",
1527 worker_id,
1528 ?path,
1529 account_nodes_processed,
1530 "Blinded account node receiver dropped, discarding result"
1531 );
1532 }
1533
1534 trace!(
1535 target: "trie::proof_task",
1536 node_time_us = elapsed.as_micros(),
1537 total_processed = account_nodes_processed,
1538 "Blinded account node completed"
1539 );
1540 }
1541}
1542
1543fn build_account_multiproof_with_storage_roots<P>(
1552 provider: &P,
1553 ctx: AccountMultiproofParams<'_>,
1554 tracker: &mut ParallelTrieTracker,
1555 proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1556 storage_wait_time: &mut Duration,
1557) -> Result<DecodedMultiProof, ParallelStateRootError>
1558where
1559 P: TrieCursorFactory + HashedCursorFactory,
1560{
1561 let accounts_added_removed_keys =
1562 ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1563
1564 let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1566 let account_trie_cursor = InstrumentedTrieCursor::new(
1567 account_trie_cursor,
1568 &mut proof_cursor_metrics.account_trie_cursor,
1569 );
1570
1571 let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1573 .with_added_removed_keys(accounts_added_removed_keys)
1574 .with_deletions_retained(true);
1575
1576 let retainer = ctx
1578 .targets
1579 .keys()
1580 .map(Nibbles::unpack)
1581 .collect::<ProofRetainer>()
1582 .with_added_removed_keys(accounts_added_removed_keys);
1583 let mut hash_builder = HashBuilder::default()
1584 .with_proof_retainer(retainer)
1585 .with_updates(ctx.collect_branch_node_masks);
1586
1587 let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1590 B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1591 let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1592
1593 let account_hashed_cursor =
1595 provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1596 let account_hashed_cursor = InstrumentedHashedCursor::new(
1597 account_hashed_cursor,
1598 &mut proof_cursor_metrics.account_hashed_cursor,
1599 );
1600
1601 let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1602
1603 let mut storage_proof_receivers = ctx.storage_proof_receivers;
1604
1605 while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1606 match account_node {
1607 TrieElement::Branch(node) => {
1608 hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1609 }
1610 TrieElement::Leaf(hashed_address, account) => {
1611 let root = match storage_proof_receivers.remove(&hashed_address) {
1612 Some(receiver) => {
1613 let _guard = debug_span!(
1614 target: "trie::proof_task",
1615 "Waiting for storage proof",
1616 );
1617 let wait_start = Instant::now();
1620 let proof_msg = receiver.recv().map_err(|_| {
1621 ParallelStateRootError::StorageRoot(
1622 reth_execution_errors::StorageRootError::Database(
1623 DatabaseError::Other(format!(
1624 "Storage proof channel closed for {hashed_address}"
1625 )),
1626 ),
1627 )
1628 })?;
1629 *storage_wait_time += wait_start.elapsed();
1630
1631 drop(_guard);
1632
1633 debug_assert_eq!(
1635 proof_msg.hashed_address, hashed_address,
1636 "storage worker must return same address"
1637 );
1638 let proof_result = proof_msg.result?;
1639 let Some(root) = proof_result.root() else {
1640 trace!(
1641 target: "trie::proof_task",
1642 ?proof_result,
1643 "Received proof_result without root",
1644 );
1645 panic!("Partial proofs are not yet supported");
1646 };
1647 let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1648 .expect("Partial proofs are not yet supported (into)");
1649 collected_decoded_storages.insert(hashed_address, proof);
1650 root
1651 }
1652 None => {
1655 tracker.inc_missed_leaves();
1656
1657 match ctx.cached_storage_roots.entry(hashed_address) {
1658 dashmap::Entry::Occupied(occ) => *occ.get(),
1659 dashmap::Entry::Vacant(vac) => {
1660 let root =
1661 StorageProof::new_hashed(provider, provider, hashed_address)
1662 .with_prefix_set_mut(Default::default())
1663 .with_trie_cursor_metrics(
1664 &mut proof_cursor_metrics.storage_trie_cursor,
1665 )
1666 .with_hashed_cursor_metrics(
1667 &mut proof_cursor_metrics.storage_hashed_cursor,
1668 )
1669 .storage_multiproof(
1670 ctx.targets
1671 .get(&hashed_address)
1672 .cloned()
1673 .unwrap_or_default(),
1674 )
1675 .map_err(|e| {
1676 ParallelStateRootError::StorageRoot(
1677 reth_execution_errors::StorageRootError::Database(
1678 DatabaseError::Other(e.to_string()),
1679 ),
1680 )
1681 })?
1682 .root;
1683
1684 vac.insert(root);
1685 root
1686 }
1687 }
1688 }
1689 };
1690
1691 account_rlp.clear();
1693 let account = account.into_trie_account(root);
1694 account.encode(&mut account_rlp as &mut dyn BufMut);
1695
1696 hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1697 }
1698 }
1699 }
1700
1701 let _ = hash_builder.root();
1702
1703 let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1704 let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1705
1706 let branch_node_masks = if ctx.collect_branch_node_masks {
1707 let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1708 updated_branch_nodes
1709 .into_iter()
1710 .map(|(path, node)| {
1711 (path, BranchNodeMasks { hash_mask: node.hash_mask, tree_mask: node.tree_mask })
1712 })
1713 .collect()
1714 } else {
1715 BranchNodeMasksMap::default()
1716 };
1717
1718 for (hashed_address, receiver) in storage_proof_receivers {
1721 let wait_start = Instant::now();
1722 if let Ok(proof_msg) = receiver.recv() {
1723 *storage_wait_time += wait_start.elapsed();
1724 let proof_result = proof_msg.result?;
1725 let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1726 .expect("Partial proofs are not yet supported");
1727 collected_decoded_storages.insert(hashed_address, proof);
1728 }
1729 }
1730
1731 Ok(DecodedMultiProof {
1732 account_subtree: decoded_account_subtree,
1733 branch_node_masks,
1734 storages: collected_decoded_storages,
1735 })
1736}
1737fn dispatch_storage_proofs(
1745 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1746 targets: &MultiProofTargets,
1747 storage_prefix_sets: &mut B256Map<PrefixSet>,
1748 with_branch_node_masks: bool,
1749 multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1750) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1751 let mut storage_proof_receivers =
1752 B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1753
1754 let mut sorted_targets: Vec<_> = targets.iter().collect();
1755 sorted_targets.sort_unstable_by_key(|(addr, _)| *addr);
1756
1757 for (hashed_address, target_slots) in sorted_targets {
1759 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1761
1762 let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1764 let input = StorageProofInput::legacy(
1765 *hashed_address,
1766 prefix_set,
1767 target_slots.clone(),
1768 with_branch_node_masks,
1769 multi_added_removed_keys.cloned(),
1770 );
1771
1772 storage_work_tx
1775 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1776 .map_err(|_| {
1777 ParallelStateRootError::Other(format!(
1778 "Failed to queue storage proof for {}: storage worker pool unavailable",
1779 hashed_address
1780 ))
1781 })?;
1782
1783 storage_proof_receivers.insert(*hashed_address, result_rx);
1784 }
1785
1786 Ok(storage_proof_receivers)
1787}
1788
1789fn dispatch_v2_storage_proofs(
1797 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1798 account_targets: &Vec<proof_v2::Target>,
1799 mut storage_targets: B256Map<Vec<proof_v2::Target>>,
1800) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1801 let mut storage_proof_receivers =
1802 B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1803
1804 let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1806
1807 for (hashed_address, targets) in &mut storage_targets {
1810 if account_target_addresses.contains(hashed_address) &&
1811 let Some(first) = targets.first_mut()
1812 {
1813 *first = first.with_min_len(0);
1814 }
1815 }
1816
1817 let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1821 sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1822
1823 for (hashed_address, targets) in sorted_storage_targets {
1825 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1827 let input = StorageProofInput::new(hashed_address, targets);
1828
1829 storage_work_tx
1830 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1831 .map_err(|_| {
1832 ParallelStateRootError::Other(format!(
1833 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1834 ))
1835 })?;
1836
1837 storage_proof_receivers.insert(hashed_address, result_rx);
1838 }
1839
1840 for target in account_targets {
1843 let hashed_address = target.key();
1844 if storage_proof_receivers.contains_key(&hashed_address) {
1845 continue
1846 }
1847
1848 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1849 let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]);
1850
1851 storage_work_tx
1852 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1853 .map_err(|_| {
1854 ParallelStateRootError::Other(format!(
1855 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1856 ))
1857 })?;
1858
1859 storage_proof_receivers.insert(hashed_address, result_rx);
1860 }
1861
1862 Ok(storage_proof_receivers)
1863}
1864
1865#[derive(Debug)]
1867pub enum StorageProofInput {
1868 Legacy {
1870 hashed_address: B256,
1872 prefix_set: PrefixSet,
1874 target_slots: B256Set,
1876 with_branch_node_masks: bool,
1878 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1880 },
1881 V2 {
1883 hashed_address: B256,
1885 targets: Vec<proof_v2::Target>,
1887 },
1888}
1889
1890impl StorageProofInput {
1891 pub const fn legacy(
1894 hashed_address: B256,
1895 prefix_set: PrefixSet,
1896 target_slots: B256Set,
1897 with_branch_node_masks: bool,
1898 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1899 ) -> Self {
1900 Self::Legacy {
1901 hashed_address,
1902 prefix_set,
1903 target_slots,
1904 with_branch_node_masks,
1905 multi_added_removed_keys,
1906 }
1907 }
1908
1909 pub const fn new(hashed_address: B256, targets: Vec<proof_v2::Target>) -> Self {
1911 Self::V2 { hashed_address, targets }
1912 }
1913
1914 pub const fn hashed_address(&self) -> B256 {
1916 match self {
1917 Self::Legacy { hashed_address, .. } | Self::V2 { hashed_address, .. } => {
1918 *hashed_address
1919 }
1920 }
1921 }
1922}
1923
1924#[derive(Debug)]
1926pub enum AccountMultiproofInput {
1927 Legacy {
1929 targets: MultiProofTargets,
1931 prefix_sets: TriePrefixSets,
1933 collect_branch_node_masks: bool,
1935 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1937 proof_result_sender: ProofResultContext,
1939 },
1940 V2 {
1942 targets: MultiProofTargetsV2,
1944 proof_result_sender: ProofResultContext,
1946 },
1947}
1948
1949impl AccountMultiproofInput {
1950 fn into_proof_result_sender(self) -> ProofResultContext {
1952 match self {
1953 Self::Legacy { proof_result_sender, .. } | Self::V2 { proof_result_sender, .. } => {
1954 proof_result_sender
1955 }
1956 }
1957 }
1958}
1959
1960struct AccountMultiproofParams<'a> {
1962 targets: &'a MultiProofTargets,
1964 prefix_set: PrefixSet,
1966 collect_branch_node_masks: bool,
1968 multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1970 storage_proof_receivers: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
1972 cached_storage_roots: &'a DashMap<B256, B256>,
1975}
1976
1977#[derive(Debug)]
1979enum AccountWorkerJob {
1980 AccountMultiproof {
1982 input: Box<AccountMultiproofInput>,
1984 },
1985 BlindedAccountNode {
1987 path: Nibbles,
1989 result_sender: Sender<TrieNodeProviderResult>,
1991 },
1992}
1993
1994#[cfg(test)]
1995mod tests {
1996 use super::*;
1997 use reth_provider::test_utils::create_test_provider_factory;
1998
1999 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
2000 ProofTaskCtx::new(factory)
2001 }
2002
2003 #[test]
2005 fn spawn_proof_workers_creates_handle() {
2006 let provider_factory = create_test_provider_factory();
2007 let changeset_cache = reth_trie_db::ChangesetCache::new();
2008 let factory = reth_provider::providers::OverlayStateProviderFactory::new(
2009 provider_factory,
2010 changeset_cache,
2011 );
2012 let ctx = test_ctx(factory);
2013
2014 let runtime = reth_tasks::Runtime::test();
2015 let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
2016
2017 let _cloned_handle = proof_handle.clone();
2019
2020 drop(proof_handle);
2022 }
2023}