1use crate::{
33 root::ParallelStateRootError,
34 stats::{ParallelTrieStats, ParallelTrieTracker},
35 StorageRootTargets,
36};
37use alloy_primitives::{
38 map::{B256Map, B256Set},
39 B256,
40};
41use alloy_rlp::{BufMut, Encodable};
42use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
43use dashmap::DashMap;
44use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
45use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
46use reth_storage_errors::db::DatabaseError;
47use reth_trie::{
48 hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
49 node_iter::{TrieElement, TrieNodeIter},
50 prefix_set::TriePrefixSets,
51 proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
52 trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
53 walker::TrieWalker,
54 DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets,
55 Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE,
56};
57use reth_trie_common::{
58 added_removed_keys::MultiAddedRemovedKeys,
59 prefix_set::{PrefixSet, PrefixSetMut},
60 proof::{DecodedProofNodes, ProofRetainer},
61 BranchNodeMasks, BranchNodeMasksMap,
62};
63use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
64use std::{
65 sync::{
66 atomic::{AtomicUsize, Ordering},
67 mpsc::{channel, Receiver, Sender},
68 Arc,
69 },
70 time::{Duration, Instant},
71};
72use tokio::runtime::Handle;
73use tracing::{debug, debug_span, error, trace};
74
75#[cfg(feature = "metrics")]
76use crate::proof_task_metrics::{
77 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
78};
79
80type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
81type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
82
83#[derive(Debug, Clone)]
89pub struct ProofWorkerHandle {
90 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
92 account_work_tx: CrossbeamSender<AccountWorkerJob>,
94 storage_available_workers: Arc<AtomicUsize>,
97 account_available_workers: Arc<AtomicUsize>,
100 storage_worker_count: usize,
102 account_worker_count: usize,
104}
105
106impl ProofWorkerHandle {
107 pub fn new<Factory>(
118 executor: Handle,
119 task_ctx: ProofTaskCtx<Factory>,
120 storage_worker_count: usize,
121 account_worker_count: usize,
122 ) -> Self
123 where
124 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
125 + Clone
126 + Send
127 + 'static,
128 {
129 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
130 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
131
132 let storage_available_workers = Arc::new(AtomicUsize::new(0));
135 let account_available_workers = Arc::new(AtomicUsize::new(0));
136
137 debug!(
138 target: "trie::proof_task",
139 storage_worker_count,
140 account_worker_count,
141 "Spawning proof worker pools"
142 );
143
144 let parent_span =
145 debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
146 .entered();
147 for worker_id in 0..storage_worker_count {
149 let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
150 let task_ctx_clone = task_ctx.clone();
151 let work_rx_clone = storage_work_rx.clone();
152 let storage_available_workers_clone = storage_available_workers.clone();
153
154 executor.spawn_blocking(move || {
155 #[cfg(feature = "metrics")]
156 let metrics = ProofTaskTrieMetrics::default();
157 #[cfg(feature = "metrics")]
158 let cursor_metrics = ProofTaskCursorMetrics::new();
159
160 let _guard = span.enter();
161 let worker = StorageProofWorker::new(
162 task_ctx_clone,
163 work_rx_clone,
164 worker_id,
165 storage_available_workers_clone,
166 #[cfg(feature = "metrics")]
167 metrics,
168 #[cfg(feature = "metrics")]
169 cursor_metrics,
170 );
171 if let Err(error) = worker.run() {
172 error!(
173 target: "trie::proof_task",
174 worker_id,
175 ?error,
176 "Storage worker failed"
177 );
178 }
179 });
180 }
181 drop(parent_span);
182
183 let parent_span =
184 debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
185 .entered();
186 for worker_id in 0..account_worker_count {
188 let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
189 let task_ctx_clone = task_ctx.clone();
190 let work_rx_clone = account_work_rx.clone();
191 let storage_work_tx_clone = storage_work_tx.clone();
192 let account_available_workers_clone = account_available_workers.clone();
193
194 executor.spawn_blocking(move || {
195 #[cfg(feature = "metrics")]
196 let metrics = ProofTaskTrieMetrics::default();
197 #[cfg(feature = "metrics")]
198 let cursor_metrics = ProofTaskCursorMetrics::new();
199
200 let _guard = span.enter();
201 let worker = AccountProofWorker::new(
202 task_ctx_clone,
203 work_rx_clone,
204 worker_id,
205 storage_work_tx_clone,
206 account_available_workers_clone,
207 #[cfg(feature = "metrics")]
208 metrics,
209 #[cfg(feature = "metrics")]
210 cursor_metrics,
211 );
212 if let Err(error) = worker.run() {
213 error!(
214 target: "trie::proof_task",
215 worker_id,
216 ?error,
217 "Account worker failed"
218 );
219 }
220 });
221 }
222 drop(parent_span);
223
224 Self {
225 storage_work_tx,
226 account_work_tx,
227 storage_available_workers,
228 account_available_workers,
229 storage_worker_count,
230 account_worker_count,
231 }
232 }
233
234 pub fn available_storage_workers(&self) -> usize {
236 self.storage_available_workers.load(Ordering::Relaxed)
237 }
238
239 pub fn available_account_workers(&self) -> usize {
241 self.account_available_workers.load(Ordering::Relaxed)
242 }
243
244 pub fn pending_storage_tasks(&self) -> usize {
246 self.storage_work_tx.len()
247 }
248
249 pub fn pending_account_tasks(&self) -> usize {
251 self.account_work_tx.len()
252 }
253
254 pub const fn total_storage_workers(&self) -> usize {
256 self.storage_worker_count
257 }
258
259 pub const fn total_account_workers(&self) -> usize {
261 self.account_worker_count
262 }
263
264 pub fn active_storage_workers(&self) -> usize {
268 self.storage_worker_count.saturating_sub(self.available_storage_workers())
269 }
270
271 pub fn active_account_workers(&self) -> usize {
275 self.account_worker_count.saturating_sub(self.available_account_workers())
276 }
277
278 pub fn dispatch_storage_proof(
282 &self,
283 input: StorageProofInput,
284 proof_result_sender: ProofResultContext,
285 ) -> Result<(), ProviderError> {
286 self.storage_work_tx
287 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
288 .map_err(|err| {
289 let error =
290 ProviderError::other(std::io::Error::other("storage workers unavailable"));
291
292 if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
293 let ProofResultContext {
294 sender: result_tx,
295 sequence_number: seq,
296 state,
297 start_time: start,
298 } = proof_result_sender;
299
300 let _ = result_tx.send(ProofResultMessage {
301 sequence_number: seq,
302 result: Err(ParallelStateRootError::Provider(error.clone())),
303 elapsed: start.elapsed(),
304 state,
305 });
306 }
307
308 error
309 })
310 }
311
312 pub fn dispatch_account_multiproof(
316 &self,
317 input: AccountMultiproofInput,
318 ) -> Result<(), ProviderError> {
319 self.account_work_tx
320 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
321 .map_err(|err| {
322 let error =
323 ProviderError::other(std::io::Error::other("account workers unavailable"));
324
325 if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
326 let AccountMultiproofInput {
327 proof_result_sender:
328 ProofResultContext {
329 sender: result_tx,
330 sequence_number: seq,
331 state,
332 start_time: start,
333 },
334 ..
335 } = *input;
336
337 let _ = result_tx.send(ProofResultMessage {
338 sequence_number: seq,
339 result: Err(ParallelStateRootError::Provider(error.clone())),
340 elapsed: start.elapsed(),
341 state,
342 });
343 }
344
345 error
346 })
347 }
348
349 pub(crate) fn dispatch_blinded_storage_node(
351 &self,
352 account: B256,
353 path: Nibbles,
354 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
355 let (tx, rx) = channel();
356 self.storage_work_tx
357 .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
358 .map_err(|_| {
359 ProviderError::other(std::io::Error::other("storage workers unavailable"))
360 })?;
361
362 Ok(rx)
363 }
364
365 pub(crate) fn dispatch_blinded_account_node(
367 &self,
368 path: Nibbles,
369 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
370 let (tx, rx) = channel();
371 self.account_work_tx
372 .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
373 .map_err(|_| {
374 ProviderError::other(std::io::Error::other("account workers unavailable"))
375 })?;
376
377 Ok(rx)
378 }
379}
380
381#[derive(Clone, Debug)]
383pub struct ProofTaskCtx<Factory> {
384 factory: Factory,
386}
387
388impl<Factory> ProofTaskCtx<Factory> {
389 pub const fn new(factory: Factory) -> Self {
391 Self { factory }
392 }
393}
394
395#[derive(Debug)]
397pub struct ProofTaskTx<Provider> {
398 provider: Provider,
400
401 id: usize,
403}
404
405impl<Provider> ProofTaskTx<Provider> {
406 const fn new(provider: Provider, id: usize) -> Self {
408 Self { provider, id }
409 }
410}
411
412impl<Provider> ProofTaskTx<Provider>
413where
414 Provider: TrieCursorFactory + HashedCursorFactory,
415{
416 #[inline]
420 fn compute_storage_proof(
421 &self,
422 input: StorageProofInput,
423 trie_cursor_metrics: &mut TrieCursorMetricsCache,
424 hashed_cursor_metrics: &mut HashedCursorMetricsCache,
425 ) -> StorageProofResult {
426 let StorageProofInput {
428 hashed_address,
429 prefix_set,
430 target_slots,
431 with_branch_node_masks,
432 multi_added_removed_keys,
433 } = input;
434
435 let multi_added_removed_keys =
437 multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
438 let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
439
440 let span = debug_span!(
441 target: "trie::proof_task",
442 "Storage proof calculation",
443 ?hashed_address,
444 target_slots = ?target_slots.len(),
445 worker_id = self.id,
446 );
447 let _span_guard = span.enter();
448
449 let proof_start = Instant::now();
450
451 let raw_proof_result =
453 StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
454 .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
455 .with_branch_node_masks(with_branch_node_masks)
456 .with_added_removed_keys(added_removed_keys)
457 .with_trie_cursor_metrics(trie_cursor_metrics)
458 .with_hashed_cursor_metrics(hashed_cursor_metrics)
459 .storage_multiproof(target_slots)
460 .map_err(|e| ParallelStateRootError::Other(e.to_string()));
461 trie_cursor_metrics.record_span("trie_cursor");
462 hashed_cursor_metrics.record_span("hashed_cursor");
463
464 let decoded_result = raw_proof_result.and_then(|raw_proof| {
466 raw_proof.try_into().map_err(|e: alloy_rlp::Error| {
467 ParallelStateRootError::Other(format!(
468 "Failed to decode storage proof for {}: {}",
469 hashed_address, e
470 ))
471 })
472 });
473
474 trace!(
475 target: "trie::proof_task",
476 hashed_address = ?hashed_address,
477 proof_time_us = proof_start.elapsed().as_micros(),
478 worker_id = self.id,
479 "Completed storage proof calculation"
480 );
481
482 decoded_result
483 }
484
485 fn process_blinded_storage_node(
489 &self,
490 account: B256,
491 path: &Nibbles,
492 ) -> TrieNodeProviderResult {
493 let storage_node_provider =
494 ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
495 storage_node_provider.trie_node(path)
496 }
497
498 fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
502 let account_node_provider =
503 ProofBlindedAccountProvider::new(&self.provider, &self.provider);
504 account_node_provider.trie_node(path)
505 }
506}
507impl TrieNodeProviderFactory for ProofWorkerHandle {
508 type AccountNodeProvider = ProofTaskTrieNodeProvider;
509 type StorageNodeProvider = ProofTaskTrieNodeProvider;
510
511 fn account_node_provider(&self) -> Self::AccountNodeProvider {
512 ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
513 }
514
515 fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
516 ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
517 }
518}
519
520#[derive(Debug)]
522pub enum ProofTaskTrieNodeProvider {
523 AccountNode {
525 handle: ProofWorkerHandle,
527 },
528 StorageNode {
530 account: B256,
532 handle: ProofWorkerHandle,
534 },
535}
536
537impl TrieNodeProvider for ProofTaskTrieNodeProvider {
538 fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
539 match self {
540 Self::AccountNode { handle } => {
541 let rx = handle
542 .dispatch_blinded_account_node(*path)
543 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
544 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
545 }
546 Self::StorageNode { handle, account } => {
547 let rx = handle
548 .dispatch_blinded_storage_node(*account, *path)
549 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
550 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
551 }
552 }
553 }
554}
555#[derive(Debug)]
557pub enum ProofResult {
558 AccountMultiproof {
560 proof: DecodedMultiProof,
562 stats: ParallelTrieStats,
564 },
565 StorageProof {
567 hashed_address: B256,
569 proof: DecodedStorageMultiProof,
571 },
572}
573
574impl ProofResult {
575 pub fn into_multiproof(self) -> DecodedMultiProof {
580 match self {
581 Self::AccountMultiproof { proof, stats: _ } => proof,
582 Self::StorageProof { hashed_address, proof } => {
583 DecodedMultiProof::from_storage_proof(hashed_address, proof)
584 }
585 }
586 }
587}
588pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
593
594#[derive(Debug)]
599pub struct ProofResultMessage {
600 pub sequence_number: u64,
602 pub result: Result<ProofResult, ParallelStateRootError>,
604 pub elapsed: Duration,
606 pub state: HashedPostState,
608}
609
610#[derive(Debug, Clone)]
615pub struct ProofResultContext {
616 pub sender: ProofResultSender,
618 pub sequence_number: u64,
620 pub state: HashedPostState,
622 pub start_time: Instant,
624}
625
626impl ProofResultContext {
627 pub const fn new(
629 sender: ProofResultSender,
630 sequence_number: u64,
631 state: HashedPostState,
632 start_time: Instant,
633 ) -> Self {
634 Self { sender, sequence_number, state, start_time }
635 }
636}
637#[derive(Debug)]
639enum StorageWorkerJob {
640 StorageProof {
642 input: StorageProofInput,
644 proof_result_sender: ProofResultContext,
646 },
647 BlindedStorageNode {
649 account: B256,
651 path: Nibbles,
653 result_sender: Sender<TrieNodeProviderResult>,
655 },
656}
657
658struct StorageProofWorker<Factory> {
663 task_ctx: ProofTaskCtx<Factory>,
665 work_rx: CrossbeamReceiver<StorageWorkerJob>,
667 worker_id: usize,
669 available_workers: Arc<AtomicUsize>,
671 #[cfg(feature = "metrics")]
673 metrics: ProofTaskTrieMetrics,
674 #[cfg(feature = "metrics")]
676 cursor_metrics: ProofTaskCursorMetrics,
677}
678
679impl<Factory> StorageProofWorker<Factory>
680where
681 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
682{
683 const fn new(
685 task_ctx: ProofTaskCtx<Factory>,
686 work_rx: CrossbeamReceiver<StorageWorkerJob>,
687 worker_id: usize,
688 available_workers: Arc<AtomicUsize>,
689 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
690 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
691 ) -> Self {
692 Self {
693 task_ctx,
694 work_rx,
695 worker_id,
696 available_workers,
697 #[cfg(feature = "metrics")]
698 metrics,
699 #[cfg(feature = "metrics")]
700 cursor_metrics,
701 }
702 }
703
704 fn run(mut self) -> ProviderResult<()> {
722 let Self {
723 task_ctx,
724 work_rx,
725 worker_id,
726 available_workers,
727 #[cfg(feature = "metrics")]
728 metrics,
729 #[cfg(feature = "metrics")]
730 ref mut cursor_metrics,
731 } = self;
732
733 let provider = task_ctx.factory.database_provider_ro()?;
735 let proof_tx = ProofTaskTx::new(provider, worker_id);
736
737 trace!(
738 target: "trie::proof_task",
739 worker_id,
740 "Storage worker started"
741 );
742
743 let mut storage_proofs_processed = 0u64;
744 let mut storage_nodes_processed = 0u64;
745 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
746
747 available_workers.fetch_add(1, Ordering::Relaxed);
749
750 while let Ok(job) = work_rx.recv() {
751 available_workers.fetch_sub(1, Ordering::Relaxed);
753
754 match job {
755 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
756 Self::process_storage_proof(
757 worker_id,
758 &proof_tx,
759 input,
760 proof_result_sender,
761 &mut storage_proofs_processed,
762 &mut cursor_metrics_cache,
763 );
764 }
765
766 StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
767 Self::process_blinded_node(
768 worker_id,
769 &proof_tx,
770 account,
771 path,
772 result_sender,
773 &mut storage_nodes_processed,
774 );
775 }
776 }
777
778 available_workers.fetch_add(1, Ordering::Relaxed);
780 }
781
782 trace!(
783 target: "trie::proof_task",
784 worker_id,
785 storage_proofs_processed,
786 storage_nodes_processed,
787 "Storage worker shutting down"
788 );
789
790 #[cfg(feature = "metrics")]
791 {
792 metrics.record_storage_nodes(storage_nodes_processed as usize);
793 cursor_metrics.record(&mut cursor_metrics_cache);
794 }
795
796 Ok(())
797 }
798
799 fn process_storage_proof<Provider>(
801 worker_id: usize,
802 proof_tx: &ProofTaskTx<Provider>,
803 input: StorageProofInput,
804 proof_result_sender: ProofResultContext,
805 storage_proofs_processed: &mut u64,
806 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
807 ) where
808 Provider: TrieCursorFactory + HashedCursorFactory,
809 {
810 let hashed_address = input.hashed_address;
811 let ProofResultContext { sender, sequence_number: seq, state, start_time } =
812 proof_result_sender;
813
814 let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
815 let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
816
817 trace!(
818 target: "trie::proof_task",
819 worker_id,
820 hashed_address = ?hashed_address,
821 prefix_set_len = input.prefix_set.len(),
822 target_slots_len = input.target_slots.len(),
823 "Processing storage proof"
824 );
825
826 let proof_start = Instant::now();
827 let result = proof_tx.compute_storage_proof(
828 input,
829 &mut trie_cursor_metrics,
830 &mut hashed_cursor_metrics,
831 );
832
833 let proof_elapsed = proof_start.elapsed();
834 *storage_proofs_processed += 1;
835
836 let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
837 hashed_address,
838 proof: storage_proof,
839 });
840
841 if sender
842 .send(ProofResultMessage {
843 sequence_number: seq,
844 result: result_msg,
845 elapsed: start_time.elapsed(),
846 state,
847 })
848 .is_err()
849 {
850 trace!(
851 target: "trie::proof_task",
852 worker_id,
853 hashed_address = ?hashed_address,
854 storage_proofs_processed,
855 "Proof result receiver dropped, discarding result"
856 );
857 }
858
859 trace!(
860 target: "trie::proof_task",
861 worker_id,
862 hashed_address = ?hashed_address,
863 proof_time_us = proof_elapsed.as_micros(),
864 total_processed = storage_proofs_processed,
865 trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
866 hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
867 ?trie_cursor_metrics,
868 ?hashed_cursor_metrics,
869 "Storage proof completed"
870 );
871
872 #[cfg(feature = "metrics")]
873 {
874 let per_proof_cache = ProofTaskCursorMetricsCache {
876 account_trie_cursor: TrieCursorMetricsCache::default(),
877 account_hashed_cursor: HashedCursorMetricsCache::default(),
878 storage_trie_cursor: trie_cursor_metrics,
879 storage_hashed_cursor: hashed_cursor_metrics,
880 };
881 cursor_metrics_cache.extend(&per_proof_cache);
882 }
883 }
884
885 fn process_blinded_node<Provider>(
887 worker_id: usize,
888 proof_tx: &ProofTaskTx<Provider>,
889 account: B256,
890 path: Nibbles,
891 result_sender: Sender<TrieNodeProviderResult>,
892 storage_nodes_processed: &mut u64,
893 ) where
894 Provider: TrieCursorFactory + HashedCursorFactory,
895 {
896 trace!(
897 target: "trie::proof_task",
898 worker_id,
899 ?account,
900 ?path,
901 "Processing blinded storage node"
902 );
903
904 let start = Instant::now();
905 let result = proof_tx.process_blinded_storage_node(account, &path);
906 let elapsed = start.elapsed();
907
908 *storage_nodes_processed += 1;
909
910 if result_sender.send(result).is_err() {
911 trace!(
912 target: "trie::proof_task",
913 worker_id,
914 ?account,
915 ?path,
916 storage_nodes_processed,
917 "Blinded storage node receiver dropped, discarding result"
918 );
919 }
920
921 trace!(
922 target: "trie::proof_task",
923 worker_id,
924 ?account,
925 ?path,
926 elapsed_us = elapsed.as_micros(),
927 total_processed = storage_nodes_processed,
928 "Blinded storage node completed"
929 );
930 }
931}
932
933struct AccountProofWorker<Factory> {
938 task_ctx: ProofTaskCtx<Factory>,
940 work_rx: CrossbeamReceiver<AccountWorkerJob>,
942 worker_id: usize,
944 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
946 available_workers: Arc<AtomicUsize>,
948 #[cfg(feature = "metrics")]
950 metrics: ProofTaskTrieMetrics,
951 #[cfg(feature = "metrics")]
953 cursor_metrics: ProofTaskCursorMetrics,
954}
955
956impl<Factory> AccountProofWorker<Factory>
957where
958 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
959{
960 const fn new(
962 task_ctx: ProofTaskCtx<Factory>,
963 work_rx: CrossbeamReceiver<AccountWorkerJob>,
964 worker_id: usize,
965 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
966 available_workers: Arc<AtomicUsize>,
967 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
968 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
969 ) -> Self {
970 Self {
971 task_ctx,
972 work_rx,
973 worker_id,
974 storage_work_tx,
975 available_workers,
976 #[cfg(feature = "metrics")]
977 metrics,
978 #[cfg(feature = "metrics")]
979 cursor_metrics,
980 }
981 }
982
983 fn run(mut self) -> ProviderResult<()> {
1001 let Self {
1002 task_ctx,
1003 work_rx,
1004 worker_id,
1005 storage_work_tx,
1006 available_workers,
1007 #[cfg(feature = "metrics")]
1008 metrics,
1009 #[cfg(feature = "metrics")]
1010 ref mut cursor_metrics,
1011 } = self;
1012
1013 let provider = task_ctx.factory.database_provider_ro()?;
1015 let proof_tx = ProofTaskTx::new(provider, worker_id);
1016
1017 trace!(
1018 target: "trie::proof_task",
1019 worker_id,
1020 "Account worker started"
1021 );
1022
1023 let mut account_proofs_processed = 0u64;
1024 let mut account_nodes_processed = 0u64;
1025 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1026
1027 available_workers.fetch_add(1, Ordering::Relaxed);
1029
1030 while let Ok(job) = work_rx.recv() {
1031 available_workers.fetch_sub(1, Ordering::Relaxed);
1033
1034 match job {
1035 AccountWorkerJob::AccountMultiproof { input } => {
1036 Self::process_account_multiproof(
1037 worker_id,
1038 &proof_tx,
1039 storage_work_tx.clone(),
1040 *input,
1041 &mut account_proofs_processed,
1042 &mut cursor_metrics_cache,
1043 );
1044 }
1045
1046 AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1047 Self::process_blinded_node(
1048 worker_id,
1049 &proof_tx,
1050 path,
1051 result_sender,
1052 &mut account_nodes_processed,
1053 );
1054 }
1055 }
1056
1057 available_workers.fetch_add(1, Ordering::Relaxed);
1059 }
1060
1061 trace!(
1062 target: "trie::proof_task",
1063 worker_id,
1064 account_proofs_processed,
1065 account_nodes_processed,
1066 "Account worker shutting down"
1067 );
1068
1069 #[cfg(feature = "metrics")]
1070 {
1071 metrics.record_account_nodes(account_nodes_processed as usize);
1072 cursor_metrics.record(&mut cursor_metrics_cache);
1073 }
1074
1075 Ok(())
1076 }
1077
1078 fn process_account_multiproof<Provider>(
1080 worker_id: usize,
1081 proof_tx: &ProofTaskTx<Provider>,
1082 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1083 input: AccountMultiproofInput,
1084 account_proofs_processed: &mut u64,
1085 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1086 ) where
1087 Provider: TrieCursorFactory + HashedCursorFactory,
1088 {
1089 let AccountMultiproofInput {
1090 targets,
1091 mut prefix_sets,
1092 collect_branch_node_masks,
1093 multi_added_removed_keys,
1094 missed_leaves_storage_roots,
1095 proof_result_sender:
1096 ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
1097 } = input;
1098
1099 let span = debug_span!(
1100 target: "trie::proof_task",
1101 "Account multiproof calculation",
1102 targets = targets.len(),
1103 worker_id,
1104 );
1105 let _span_guard = span.enter();
1106
1107 trace!(
1108 target: "trie::proof_task",
1109 "Processing account multiproof"
1110 );
1111
1112 let proof_start = Instant::now();
1113
1114 let mut tracker = ParallelTrieTracker::default();
1115
1116 let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1117
1118 let storage_root_targets_len =
1119 StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1120
1121 tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1122
1123 let storage_proof_receivers = match dispatch_storage_proofs(
1124 &storage_work_tx,
1125 &targets,
1126 &mut storage_prefix_sets,
1127 collect_branch_node_masks,
1128 multi_added_removed_keys.as_ref(),
1129 ) {
1130 Ok(receivers) => receivers,
1131 Err(error) => {
1132 error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
1134 let _ = result_tx.send(ProofResultMessage {
1135 sequence_number: seq,
1136 result: Err(error),
1137 elapsed: start.elapsed(),
1138 state,
1139 });
1140 return;
1141 }
1142 };
1143
1144 let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1146
1147 let ctx = AccountMultiproofParams {
1148 targets: &targets,
1149 prefix_set: account_prefix_set,
1150 collect_branch_node_masks,
1151 multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1152 storage_proof_receivers,
1153 missed_leaves_storage_roots: missed_leaves_storage_roots.as_ref(),
1154 };
1155
1156 let result =
1157 build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
1158
1159 let proof_elapsed = proof_start.elapsed();
1160 let total_elapsed = start.elapsed();
1161 let proof_cursor_metrics = tracker.cursor_metrics;
1162 proof_cursor_metrics.record_spans();
1163
1164 let stats = tracker.finish();
1165 let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
1166 *account_proofs_processed += 1;
1167
1168 if result_tx
1170 .send(ProofResultMessage {
1171 sequence_number: seq,
1172 result,
1173 elapsed: total_elapsed,
1174 state,
1175 })
1176 .is_err()
1177 {
1178 trace!(
1179 target: "trie::proof_task",
1180 worker_id,
1181 account_proofs_processed,
1182 "Account multiproof receiver dropped, discarding result"
1183 );
1184 }
1185
1186 trace!(
1187 target: "trie::proof_task",
1188 proof_time_us = proof_elapsed.as_micros(),
1189 total_elapsed_us = total_elapsed.as_micros(),
1190 total_processed = account_proofs_processed,
1191 account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1192 account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1193 storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1194 storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1195 account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1196 account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1197 storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1198 storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1199 "Account multiproof completed"
1200 );
1201
1202 #[cfg(feature = "metrics")]
1203 cursor_metrics_cache.extend(&proof_cursor_metrics);
1205 }
1206
1207 fn process_blinded_node<Provider>(
1209 worker_id: usize,
1210 proof_tx: &ProofTaskTx<Provider>,
1211 path: Nibbles,
1212 result_sender: Sender<TrieNodeProviderResult>,
1213 account_nodes_processed: &mut u64,
1214 ) where
1215 Provider: TrieCursorFactory + HashedCursorFactory,
1216 {
1217 let span = debug_span!(
1218 target: "trie::proof_task",
1219 "Blinded account node calculation",
1220 ?path,
1221 worker_id,
1222 );
1223 let _span_guard = span.enter();
1224
1225 trace!(
1226 target: "trie::proof_task",
1227 "Processing blinded account node"
1228 );
1229
1230 let start = Instant::now();
1231 let result = proof_tx.process_blinded_account_node(&path);
1232 let elapsed = start.elapsed();
1233
1234 *account_nodes_processed += 1;
1235
1236 if result_sender.send(result).is_err() {
1237 trace!(
1238 target: "trie::proof_task",
1239 worker_id,
1240 ?path,
1241 account_nodes_processed,
1242 "Blinded account node receiver dropped, discarding result"
1243 );
1244 }
1245
1246 trace!(
1247 target: "trie::proof_task",
1248 node_time_us = elapsed.as_micros(),
1249 total_processed = account_nodes_processed,
1250 "Blinded account node completed"
1251 );
1252 }
1253}
1254
1255fn build_account_multiproof_with_storage_roots<P>(
1263 provider: &P,
1264 ctx: AccountMultiproofParams<'_>,
1265 tracker: &mut ParallelTrieTracker,
1266) -> Result<DecodedMultiProof, ParallelStateRootError>
1267where
1268 P: TrieCursorFactory + HashedCursorFactory,
1269{
1270 let accounts_added_removed_keys =
1271 ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1272
1273 let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default();
1276 let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default();
1277
1278 let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1280 let account_trie_cursor =
1281 InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics);
1282
1283 let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1285 .with_added_removed_keys(accounts_added_removed_keys)
1286 .with_deletions_retained(true);
1287
1288 let retainer = ctx
1290 .targets
1291 .keys()
1292 .map(Nibbles::unpack)
1293 .collect::<ProofRetainer>()
1294 .with_added_removed_keys(accounts_added_removed_keys);
1295 let mut hash_builder = HashBuilder::default()
1296 .with_proof_retainer(retainer)
1297 .with_updates(ctx.collect_branch_node_masks);
1298
1299 let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1302 B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1303 let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1304
1305 let account_hashed_cursor =
1307 provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1308 let account_hashed_cursor =
1309 InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics);
1310
1311 let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1312
1313 let mut storage_proof_receivers = ctx.storage_proof_receivers;
1314
1315 while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1316 match account_node {
1317 TrieElement::Branch(node) => {
1318 hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1319 }
1320 TrieElement::Leaf(hashed_address, account) => {
1321 let root = match storage_proof_receivers.remove(&hashed_address) {
1322 Some(receiver) => {
1323 let _guard = debug_span!(
1324 target: "trie::proof_task",
1325 "Waiting for storage proof",
1326 ?hashed_address,
1327 );
1328 let proof_msg = receiver.recv().map_err(|_| {
1331 ParallelStateRootError::StorageRoot(
1332 reth_execution_errors::StorageRootError::Database(
1333 DatabaseError::Other(format!(
1334 "Storage proof channel closed for {hashed_address}"
1335 )),
1336 ),
1337 )
1338 })?;
1339
1340 drop(_guard);
1341
1342 let proof = match proof_msg.result? {
1344 ProofResult::StorageProof { hashed_address: addr, proof } => {
1345 debug_assert_eq!(
1346 addr,
1347 hashed_address,
1348 "storage worker must return same address: expected {hashed_address}, got {addr}"
1349 );
1350 proof
1351 }
1352 ProofResult::AccountMultiproof { .. } => {
1353 unreachable!("storage worker only sends StorageProof variant")
1354 }
1355 };
1356
1357 let root = proof.root;
1358 collected_decoded_storages.insert(hashed_address, proof);
1359 root
1360 }
1361 None => {
1364 tracker.inc_missed_leaves();
1365
1366 match ctx.missed_leaves_storage_roots.entry(hashed_address) {
1367 dashmap::Entry::Occupied(occ) => *occ.get(),
1368 dashmap::Entry::Vacant(vac) => {
1369 let root =
1370 StorageProof::new_hashed(provider, provider, hashed_address)
1371 .with_prefix_set_mut(Default::default())
1372 .with_trie_cursor_metrics(
1373 &mut tracker.cursor_metrics.storage_trie_cursor,
1374 )
1375 .with_hashed_cursor_metrics(
1376 &mut tracker.cursor_metrics.storage_hashed_cursor,
1377 )
1378 .storage_multiproof(
1379 ctx.targets
1380 .get(&hashed_address)
1381 .cloned()
1382 .unwrap_or_default(),
1383 )
1384 .map_err(|e| {
1385 ParallelStateRootError::StorageRoot(
1386 reth_execution_errors::StorageRootError::Database(
1387 DatabaseError::Other(e.to_string()),
1388 ),
1389 )
1390 })?
1391 .root;
1392
1393 vac.insert(root);
1394 root
1395 }
1396 }
1397 }
1398 };
1399
1400 account_rlp.clear();
1402 let account = account.into_trie_account(root);
1403 account.encode(&mut account_rlp as &mut dyn BufMut);
1404
1405 hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1406 }
1407 }
1408 }
1409
1410 for (hashed_address, receiver) in storage_proof_receivers {
1412 if let Ok(proof_msg) = receiver.recv() {
1413 if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
1415 collected_decoded_storages.insert(hashed_address, proof);
1416 }
1417 }
1418 }
1419
1420 let _ = hash_builder.root();
1421
1422 let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1423 let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1424
1425 let branch_node_masks = if ctx.collect_branch_node_masks {
1426 let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1427 updated_branch_nodes
1428 .into_iter()
1429 .map(|(path, node)| {
1430 (path, BranchNodeMasks { hash_mask: node.hash_mask, tree_mask: node.tree_mask })
1431 })
1432 .collect()
1433 } else {
1434 BranchNodeMasksMap::default()
1435 };
1436
1437 tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics);
1439 tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics);
1440
1441 Ok(DecodedMultiProof {
1442 account_subtree: decoded_account_subtree,
1443 branch_node_masks,
1444 storages: collected_decoded_storages,
1445 })
1446}
1447fn dispatch_storage_proofs(
1455 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1456 targets: &MultiProofTargets,
1457 storage_prefix_sets: &mut B256Map<PrefixSet>,
1458 with_branch_node_masks: bool,
1459 multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1460) -> Result<B256Map<CrossbeamReceiver<ProofResultMessage>>, ParallelStateRootError> {
1461 let mut storage_proof_receivers =
1462 B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1463
1464 for (hashed_address, target_slots) in targets.iter() {
1466 let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1467
1468 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1470 let start = Instant::now();
1471
1472 let input = StorageProofInput::new(
1474 *hashed_address,
1475 prefix_set,
1476 target_slots.clone(),
1477 with_branch_node_masks,
1478 multi_added_removed_keys.cloned(),
1479 );
1480
1481 storage_work_tx
1484 .send(StorageWorkerJob::StorageProof {
1485 input,
1486 proof_result_sender: ProofResultContext::new(
1487 result_tx,
1488 0,
1489 HashedPostState::default(),
1490 start,
1491 ),
1492 })
1493 .map_err(|_| {
1494 ParallelStateRootError::Other(format!(
1495 "Failed to queue storage proof for {}: storage worker pool unavailable",
1496 hashed_address
1497 ))
1498 })?;
1499
1500 storage_proof_receivers.insert(*hashed_address, result_rx);
1501 }
1502
1503 Ok(storage_proof_receivers)
1504}
1505#[derive(Debug)]
1507pub struct StorageProofInput {
1508 hashed_address: B256,
1510 prefix_set: PrefixSet,
1512 target_slots: B256Set,
1514 with_branch_node_masks: bool,
1516 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1518}
1519
1520impl StorageProofInput {
1521 pub const fn new(
1524 hashed_address: B256,
1525 prefix_set: PrefixSet,
1526 target_slots: B256Set,
1527 with_branch_node_masks: bool,
1528 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1529 ) -> Self {
1530 Self {
1531 hashed_address,
1532 prefix_set,
1533 target_slots,
1534 with_branch_node_masks,
1535 multi_added_removed_keys,
1536 }
1537 }
1538}
1539#[derive(Debug, Clone)]
1541pub struct AccountMultiproofInput {
1542 pub targets: MultiProofTargets,
1544 pub prefix_sets: TriePrefixSets,
1546 pub collect_branch_node_masks: bool,
1548 pub multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1550 pub missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
1552 pub proof_result_sender: ProofResultContext,
1554}
1555
1556struct AccountMultiproofParams<'a> {
1558 targets: &'a MultiProofTargets,
1560 prefix_set: PrefixSet,
1562 collect_branch_node_masks: bool,
1564 multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1566 storage_proof_receivers: B256Map<CrossbeamReceiver<ProofResultMessage>>,
1568 missed_leaves_storage_roots: &'a DashMap<B256, B256>,
1570}
1571
1572#[derive(Debug)]
1574enum AccountWorkerJob {
1575 AccountMultiproof {
1577 input: Box<AccountMultiproofInput>,
1579 },
1580 BlindedAccountNode {
1582 path: Nibbles,
1584 result_sender: Sender<TrieNodeProviderResult>,
1586 },
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591 use super::*;
1592 use reth_provider::test_utils::create_test_provider_factory;
1593 use tokio::{runtime::Builder, task};
1594
1595 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1596 ProofTaskCtx::new(factory)
1597 }
1598
1599 #[test]
1601 fn spawn_proof_workers_creates_handle() {
1602 let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
1603 runtime.block_on(async {
1604 let handle = tokio::runtime::Handle::current();
1605 let provider_factory = create_test_provider_factory();
1606 let factory =
1607 reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
1608 let ctx = test_ctx(factory);
1609
1610 let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
1611
1612 let _cloned_handle = proof_handle.clone();
1614
1615 drop(proof_handle);
1617 task::yield_now().await;
1618 });
1619 }
1620}