1use crate::{
33 root::ParallelStateRootError,
34 stats::{ParallelTrieStats, ParallelTrieTracker},
35 StorageRootTargets,
36};
37use alloy_primitives::{
38 map::{B256Map, B256Set},
39 B256,
40};
41use alloy_rlp::{BufMut, Encodable};
42use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
43use dashmap::DashMap;
44use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
45use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
46use reth_storage_errors::db::DatabaseError;
47use reth_trie::{
48 hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
49 node_iter::{TrieElement, TrieNodeIter},
50 prefix_set::TriePrefixSets,
51 proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
52 trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
53 walker::TrieWalker,
54 DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets,
55 Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE,
56};
57use reth_trie_common::{
58 added_removed_keys::MultiAddedRemovedKeys,
59 prefix_set::{PrefixSet, PrefixSetMut},
60 proof::{DecodedProofNodes, ProofRetainer},
61};
62use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
63use std::{
64 sync::{
65 atomic::{AtomicUsize, Ordering},
66 mpsc::{channel, Receiver, Sender},
67 Arc,
68 },
69 time::{Duration, Instant},
70};
71use tokio::runtime::Handle;
72use tracing::{debug, debug_span, error, trace};
73
74#[cfg(feature = "metrics")]
75use crate::proof_task_metrics::{
76 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
77};
78
79type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
80type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
81
82#[derive(Debug, Clone)]
88pub struct ProofWorkerHandle {
89 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
91 account_work_tx: CrossbeamSender<AccountWorkerJob>,
93 storage_available_workers: Arc<AtomicUsize>,
96 account_available_workers: Arc<AtomicUsize>,
99 storage_worker_count: usize,
101 account_worker_count: usize,
103}
104
105impl ProofWorkerHandle {
106 pub fn new<Factory>(
117 executor: Handle,
118 task_ctx: ProofTaskCtx<Factory>,
119 storage_worker_count: usize,
120 account_worker_count: usize,
121 ) -> Self
122 where
123 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
124 + Clone
125 + Send
126 + 'static,
127 {
128 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
129 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
130
131 let storage_available_workers = Arc::new(AtomicUsize::new(0));
134 let account_available_workers = Arc::new(AtomicUsize::new(0));
135
136 debug!(
137 target: "trie::proof_task",
138 storage_worker_count,
139 account_worker_count,
140 "Spawning proof worker pools"
141 );
142
143 let parent_span =
144 debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
145 .entered();
146 for worker_id in 0..storage_worker_count {
148 let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
149 let task_ctx_clone = task_ctx.clone();
150 let work_rx_clone = storage_work_rx.clone();
151 let storage_available_workers_clone = storage_available_workers.clone();
152
153 executor.spawn_blocking(move || {
154 #[cfg(feature = "metrics")]
155 let metrics = ProofTaskTrieMetrics::default();
156 #[cfg(feature = "metrics")]
157 let cursor_metrics = ProofTaskCursorMetrics::new();
158
159 let _guard = span.enter();
160 let worker = StorageProofWorker::new(
161 task_ctx_clone,
162 work_rx_clone,
163 worker_id,
164 storage_available_workers_clone,
165 #[cfg(feature = "metrics")]
166 metrics,
167 #[cfg(feature = "metrics")]
168 cursor_metrics,
169 );
170 if let Err(error) = worker.run() {
171 error!(
172 target: "trie::proof_task",
173 worker_id,
174 ?error,
175 "Storage worker failed"
176 );
177 }
178 });
179 }
180 drop(parent_span);
181
182 let parent_span =
183 debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
184 .entered();
185 for worker_id in 0..account_worker_count {
187 let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
188 let task_ctx_clone = task_ctx.clone();
189 let work_rx_clone = account_work_rx.clone();
190 let storage_work_tx_clone = storage_work_tx.clone();
191 let account_available_workers_clone = account_available_workers.clone();
192
193 executor.spawn_blocking(move || {
194 #[cfg(feature = "metrics")]
195 let metrics = ProofTaskTrieMetrics::default();
196 #[cfg(feature = "metrics")]
197 let cursor_metrics = ProofTaskCursorMetrics::new();
198
199 let _guard = span.enter();
200 let worker = AccountProofWorker::new(
201 task_ctx_clone,
202 work_rx_clone,
203 worker_id,
204 storage_work_tx_clone,
205 account_available_workers_clone,
206 #[cfg(feature = "metrics")]
207 metrics,
208 #[cfg(feature = "metrics")]
209 cursor_metrics,
210 );
211 if let Err(error) = worker.run() {
212 error!(
213 target: "trie::proof_task",
214 worker_id,
215 ?error,
216 "Account worker failed"
217 );
218 }
219 });
220 }
221 drop(parent_span);
222
223 Self {
224 storage_work_tx,
225 account_work_tx,
226 storage_available_workers,
227 account_available_workers,
228 storage_worker_count,
229 account_worker_count,
230 }
231 }
232
233 pub fn available_storage_workers(&self) -> usize {
235 self.storage_available_workers.load(Ordering::Relaxed)
236 }
237
238 pub fn available_account_workers(&self) -> usize {
240 self.account_available_workers.load(Ordering::Relaxed)
241 }
242
243 pub fn pending_storage_tasks(&self) -> usize {
245 self.storage_work_tx.len()
246 }
247
248 pub fn pending_account_tasks(&self) -> usize {
250 self.account_work_tx.len()
251 }
252
253 pub const fn total_storage_workers(&self) -> usize {
255 self.storage_worker_count
256 }
257
258 pub const fn total_account_workers(&self) -> usize {
260 self.account_worker_count
261 }
262
263 pub fn active_storage_workers(&self) -> usize {
267 self.storage_worker_count.saturating_sub(self.available_storage_workers())
268 }
269
270 pub fn active_account_workers(&self) -> usize {
274 self.account_worker_count.saturating_sub(self.available_account_workers())
275 }
276
277 pub fn dispatch_storage_proof(
281 &self,
282 input: StorageProofInput,
283 proof_result_sender: ProofResultContext,
284 ) -> Result<(), ProviderError> {
285 self.storage_work_tx
286 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
287 .map_err(|err| {
288 let error =
289 ProviderError::other(std::io::Error::other("storage workers unavailable"));
290
291 if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
292 let ProofResultContext {
293 sender: result_tx,
294 sequence_number: seq,
295 state,
296 start_time: start,
297 } = proof_result_sender;
298
299 let _ = result_tx.send(ProofResultMessage {
300 sequence_number: seq,
301 result: Err(ParallelStateRootError::Provider(error.clone())),
302 elapsed: start.elapsed(),
303 state,
304 });
305 }
306
307 error
308 })
309 }
310
311 pub fn dispatch_account_multiproof(
315 &self,
316 input: AccountMultiproofInput,
317 ) -> Result<(), ProviderError> {
318 self.account_work_tx
319 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
320 .map_err(|err| {
321 let error =
322 ProviderError::other(std::io::Error::other("account workers unavailable"));
323
324 if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
325 let AccountMultiproofInput {
326 proof_result_sender:
327 ProofResultContext {
328 sender: result_tx,
329 sequence_number: seq,
330 state,
331 start_time: start,
332 },
333 ..
334 } = *input;
335
336 let _ = result_tx.send(ProofResultMessage {
337 sequence_number: seq,
338 result: Err(ParallelStateRootError::Provider(error.clone())),
339 elapsed: start.elapsed(),
340 state,
341 });
342 }
343
344 error
345 })
346 }
347
348 pub(crate) fn dispatch_blinded_storage_node(
350 &self,
351 account: B256,
352 path: Nibbles,
353 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
354 let (tx, rx) = channel();
355 self.storage_work_tx
356 .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
357 .map_err(|_| {
358 ProviderError::other(std::io::Error::other("storage workers unavailable"))
359 })?;
360
361 Ok(rx)
362 }
363
364 pub(crate) fn dispatch_blinded_account_node(
366 &self,
367 path: Nibbles,
368 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
369 let (tx, rx) = channel();
370 self.account_work_tx
371 .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
372 .map_err(|_| {
373 ProviderError::other(std::io::Error::other("account workers unavailable"))
374 })?;
375
376 Ok(rx)
377 }
378}
379
380#[derive(Clone, Debug)]
382pub struct ProofTaskCtx<Factory> {
383 factory: Factory,
385}
386
387impl<Factory> ProofTaskCtx<Factory> {
388 pub const fn new(factory: Factory) -> Self {
390 Self { factory }
391 }
392}
393
394#[derive(Debug)]
396pub struct ProofTaskTx<Provider> {
397 provider: Provider,
399
400 id: usize,
402}
403
404impl<Provider> ProofTaskTx<Provider> {
405 const fn new(provider: Provider, id: usize) -> Self {
407 Self { provider, id }
408 }
409}
410
411impl<Provider> ProofTaskTx<Provider>
412where
413 Provider: TrieCursorFactory + HashedCursorFactory,
414{
415 #[inline]
419 fn compute_storage_proof(
420 &self,
421 input: StorageProofInput,
422 trie_cursor_metrics: &mut TrieCursorMetricsCache,
423 hashed_cursor_metrics: &mut HashedCursorMetricsCache,
424 ) -> StorageProofResult {
425 let StorageProofInput {
427 hashed_address,
428 prefix_set,
429 target_slots,
430 with_branch_node_masks,
431 multi_added_removed_keys,
432 } = input;
433
434 let multi_added_removed_keys =
436 multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
437 let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
438
439 let span = debug_span!(
440 target: "trie::proof_task",
441 "Storage proof calculation",
442 ?hashed_address,
443 target_slots = ?target_slots.len(),
444 worker_id = self.id,
445 );
446 let _span_guard = span.enter();
447
448 let proof_start = Instant::now();
449
450 let raw_proof_result =
452 StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
453 .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
454 .with_branch_node_masks(with_branch_node_masks)
455 .with_added_removed_keys(added_removed_keys)
456 .with_trie_cursor_metrics(trie_cursor_metrics)
457 .with_hashed_cursor_metrics(hashed_cursor_metrics)
458 .storage_multiproof(target_slots)
459 .map_err(|e| ParallelStateRootError::Other(e.to_string()));
460 trie_cursor_metrics.record_span("trie_cursor");
461 hashed_cursor_metrics.record_span("hashed_cursor");
462
463 let decoded_result = raw_proof_result.and_then(|raw_proof| {
465 raw_proof.try_into().map_err(|e: alloy_rlp::Error| {
466 ParallelStateRootError::Other(format!(
467 "Failed to decode storage proof for {}: {}",
468 hashed_address, e
469 ))
470 })
471 });
472
473 trace!(
474 target: "trie::proof_task",
475 hashed_address = ?hashed_address,
476 proof_time_us = proof_start.elapsed().as_micros(),
477 worker_id = self.id,
478 "Completed storage proof calculation"
479 );
480
481 decoded_result
482 }
483
484 fn process_blinded_storage_node(
488 &self,
489 account: B256,
490 path: &Nibbles,
491 ) -> TrieNodeProviderResult {
492 let storage_node_provider =
493 ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
494 storage_node_provider.trie_node(path)
495 }
496
497 fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
501 let account_node_provider =
502 ProofBlindedAccountProvider::new(&self.provider, &self.provider);
503 account_node_provider.trie_node(path)
504 }
505}
506impl TrieNodeProviderFactory for ProofWorkerHandle {
507 type AccountNodeProvider = ProofTaskTrieNodeProvider;
508 type StorageNodeProvider = ProofTaskTrieNodeProvider;
509
510 fn account_node_provider(&self) -> Self::AccountNodeProvider {
511 ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
512 }
513
514 fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
515 ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
516 }
517}
518
519#[derive(Debug)]
521pub enum ProofTaskTrieNodeProvider {
522 AccountNode {
524 handle: ProofWorkerHandle,
526 },
527 StorageNode {
529 account: B256,
531 handle: ProofWorkerHandle,
533 },
534}
535
536impl TrieNodeProvider for ProofTaskTrieNodeProvider {
537 fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
538 match self {
539 Self::AccountNode { handle } => {
540 let rx = handle
541 .dispatch_blinded_account_node(*path)
542 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
543 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
544 }
545 Self::StorageNode { handle, account } => {
546 let rx = handle
547 .dispatch_blinded_storage_node(*account, *path)
548 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
549 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
550 }
551 }
552 }
553}
554#[derive(Debug)]
556pub enum ProofResult {
557 AccountMultiproof {
559 proof: DecodedMultiProof,
561 stats: ParallelTrieStats,
563 },
564 StorageProof {
566 hashed_address: B256,
568 proof: DecodedStorageMultiProof,
570 },
571}
572
573impl ProofResult {
574 pub fn into_multiproof(self) -> DecodedMultiProof {
579 match self {
580 Self::AccountMultiproof { proof, stats: _ } => proof,
581 Self::StorageProof { hashed_address, proof } => {
582 DecodedMultiProof::from_storage_proof(hashed_address, proof)
583 }
584 }
585 }
586}
587pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
592
593#[derive(Debug)]
598pub struct ProofResultMessage {
599 pub sequence_number: u64,
601 pub result: Result<ProofResult, ParallelStateRootError>,
603 pub elapsed: Duration,
605 pub state: HashedPostState,
607}
608
609#[derive(Debug, Clone)]
614pub struct ProofResultContext {
615 pub sender: ProofResultSender,
617 pub sequence_number: u64,
619 pub state: HashedPostState,
621 pub start_time: Instant,
623}
624
625impl ProofResultContext {
626 pub const fn new(
628 sender: ProofResultSender,
629 sequence_number: u64,
630 state: HashedPostState,
631 start_time: Instant,
632 ) -> Self {
633 Self { sender, sequence_number, state, start_time }
634 }
635}
636#[derive(Debug)]
638enum StorageWorkerJob {
639 StorageProof {
641 input: StorageProofInput,
643 proof_result_sender: ProofResultContext,
645 },
646 BlindedStorageNode {
648 account: B256,
650 path: Nibbles,
652 result_sender: Sender<TrieNodeProviderResult>,
654 },
655}
656
657struct StorageProofWorker<Factory> {
662 task_ctx: ProofTaskCtx<Factory>,
664 work_rx: CrossbeamReceiver<StorageWorkerJob>,
666 worker_id: usize,
668 available_workers: Arc<AtomicUsize>,
670 #[cfg(feature = "metrics")]
672 metrics: ProofTaskTrieMetrics,
673 #[cfg(feature = "metrics")]
675 cursor_metrics: ProofTaskCursorMetrics,
676}
677
678impl<Factory> StorageProofWorker<Factory>
679where
680 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
681{
682 const fn new(
684 task_ctx: ProofTaskCtx<Factory>,
685 work_rx: CrossbeamReceiver<StorageWorkerJob>,
686 worker_id: usize,
687 available_workers: Arc<AtomicUsize>,
688 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
689 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
690 ) -> Self {
691 Self {
692 task_ctx,
693 work_rx,
694 worker_id,
695 available_workers,
696 #[cfg(feature = "metrics")]
697 metrics,
698 #[cfg(feature = "metrics")]
699 cursor_metrics,
700 }
701 }
702
703 fn run(mut self) -> ProviderResult<()> {
721 let Self {
722 task_ctx,
723 work_rx,
724 worker_id,
725 available_workers,
726 #[cfg(feature = "metrics")]
727 metrics,
728 #[cfg(feature = "metrics")]
729 ref mut cursor_metrics,
730 } = self;
731
732 let provider = task_ctx.factory.database_provider_ro()?;
734 let proof_tx = ProofTaskTx::new(provider, worker_id);
735
736 trace!(
737 target: "trie::proof_task",
738 worker_id,
739 "Storage worker started"
740 );
741
742 let mut storage_proofs_processed = 0u64;
743 let mut storage_nodes_processed = 0u64;
744 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
745
746 available_workers.fetch_add(1, Ordering::Relaxed);
748
749 while let Ok(job) = work_rx.recv() {
750 available_workers.fetch_sub(1, Ordering::Relaxed);
752
753 match job {
754 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
755 Self::process_storage_proof(
756 worker_id,
757 &proof_tx,
758 input,
759 proof_result_sender,
760 &mut storage_proofs_processed,
761 &mut cursor_metrics_cache,
762 );
763 }
764
765 StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
766 Self::process_blinded_node(
767 worker_id,
768 &proof_tx,
769 account,
770 path,
771 result_sender,
772 &mut storage_nodes_processed,
773 );
774 }
775 }
776
777 available_workers.fetch_add(1, Ordering::Relaxed);
779 }
780
781 trace!(
782 target: "trie::proof_task",
783 worker_id,
784 storage_proofs_processed,
785 storage_nodes_processed,
786 "Storage worker shutting down"
787 );
788
789 #[cfg(feature = "metrics")]
790 {
791 metrics.record_storage_nodes(storage_nodes_processed as usize);
792 cursor_metrics.record(&mut cursor_metrics_cache);
793 }
794
795 Ok(())
796 }
797
798 fn process_storage_proof<Provider>(
800 worker_id: usize,
801 proof_tx: &ProofTaskTx<Provider>,
802 input: StorageProofInput,
803 proof_result_sender: ProofResultContext,
804 storage_proofs_processed: &mut u64,
805 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
806 ) where
807 Provider: TrieCursorFactory + HashedCursorFactory,
808 {
809 let hashed_address = input.hashed_address;
810 let ProofResultContext { sender, sequence_number: seq, state, start_time } =
811 proof_result_sender;
812
813 let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
814 let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
815
816 trace!(
817 target: "trie::proof_task",
818 worker_id,
819 hashed_address = ?hashed_address,
820 prefix_set_len = input.prefix_set.len(),
821 target_slots_len = input.target_slots.len(),
822 "Processing storage proof"
823 );
824
825 let proof_start = Instant::now();
826 let result = proof_tx.compute_storage_proof(
827 input,
828 &mut trie_cursor_metrics,
829 &mut hashed_cursor_metrics,
830 );
831
832 let proof_elapsed = proof_start.elapsed();
833 *storage_proofs_processed += 1;
834
835 let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
836 hashed_address,
837 proof: storage_proof,
838 });
839
840 if sender
841 .send(ProofResultMessage {
842 sequence_number: seq,
843 result: result_msg,
844 elapsed: start_time.elapsed(),
845 state,
846 })
847 .is_err()
848 {
849 trace!(
850 target: "trie::proof_task",
851 worker_id,
852 hashed_address = ?hashed_address,
853 storage_proofs_processed,
854 "Proof result receiver dropped, discarding result"
855 );
856 }
857
858 trace!(
859 target: "trie::proof_task",
860 worker_id,
861 hashed_address = ?hashed_address,
862 proof_time_us = proof_elapsed.as_micros(),
863 total_processed = storage_proofs_processed,
864 trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
865 hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
866 ?trie_cursor_metrics,
867 ?hashed_cursor_metrics,
868 "Storage proof completed"
869 );
870
871 #[cfg(feature = "metrics")]
872 {
873 let per_proof_cache = ProofTaskCursorMetricsCache {
875 account_trie_cursor: TrieCursorMetricsCache::default(),
876 account_hashed_cursor: HashedCursorMetricsCache::default(),
877 storage_trie_cursor: trie_cursor_metrics,
878 storage_hashed_cursor: hashed_cursor_metrics,
879 };
880 cursor_metrics_cache.extend(&per_proof_cache);
881 }
882 }
883
884 fn process_blinded_node<Provider>(
886 worker_id: usize,
887 proof_tx: &ProofTaskTx<Provider>,
888 account: B256,
889 path: Nibbles,
890 result_sender: Sender<TrieNodeProviderResult>,
891 storage_nodes_processed: &mut u64,
892 ) where
893 Provider: TrieCursorFactory + HashedCursorFactory,
894 {
895 trace!(
896 target: "trie::proof_task",
897 worker_id,
898 ?account,
899 ?path,
900 "Processing blinded storage node"
901 );
902
903 let start = Instant::now();
904 let result = proof_tx.process_blinded_storage_node(account, &path);
905 let elapsed = start.elapsed();
906
907 *storage_nodes_processed += 1;
908
909 if result_sender.send(result).is_err() {
910 trace!(
911 target: "trie::proof_task",
912 worker_id,
913 ?account,
914 ?path,
915 storage_nodes_processed,
916 "Blinded storage node receiver dropped, discarding result"
917 );
918 }
919
920 trace!(
921 target: "trie::proof_task",
922 worker_id,
923 ?account,
924 ?path,
925 elapsed_us = elapsed.as_micros(),
926 total_processed = storage_nodes_processed,
927 "Blinded storage node completed"
928 );
929 }
930}
931
932struct AccountProofWorker<Factory> {
937 task_ctx: ProofTaskCtx<Factory>,
939 work_rx: CrossbeamReceiver<AccountWorkerJob>,
941 worker_id: usize,
943 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
945 available_workers: Arc<AtomicUsize>,
947 #[cfg(feature = "metrics")]
949 metrics: ProofTaskTrieMetrics,
950 #[cfg(feature = "metrics")]
952 cursor_metrics: ProofTaskCursorMetrics,
953}
954
955impl<Factory> AccountProofWorker<Factory>
956where
957 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
958{
959 const fn new(
961 task_ctx: ProofTaskCtx<Factory>,
962 work_rx: CrossbeamReceiver<AccountWorkerJob>,
963 worker_id: usize,
964 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
965 available_workers: Arc<AtomicUsize>,
966 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
967 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
968 ) -> Self {
969 Self {
970 task_ctx,
971 work_rx,
972 worker_id,
973 storage_work_tx,
974 available_workers,
975 #[cfg(feature = "metrics")]
976 metrics,
977 #[cfg(feature = "metrics")]
978 cursor_metrics,
979 }
980 }
981
982 fn run(mut self) -> ProviderResult<()> {
1000 let Self {
1001 task_ctx,
1002 work_rx,
1003 worker_id,
1004 storage_work_tx,
1005 available_workers,
1006 #[cfg(feature = "metrics")]
1007 metrics,
1008 #[cfg(feature = "metrics")]
1009 ref mut cursor_metrics,
1010 } = self;
1011
1012 let provider = task_ctx.factory.database_provider_ro()?;
1014 let proof_tx = ProofTaskTx::new(provider, worker_id);
1015
1016 trace!(
1017 target: "trie::proof_task",
1018 worker_id,
1019 "Account worker started"
1020 );
1021
1022 let mut account_proofs_processed = 0u64;
1023 let mut account_nodes_processed = 0u64;
1024 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1025
1026 available_workers.fetch_add(1, Ordering::Relaxed);
1028
1029 while let Ok(job) = work_rx.recv() {
1030 available_workers.fetch_sub(1, Ordering::Relaxed);
1032
1033 match job {
1034 AccountWorkerJob::AccountMultiproof { input } => {
1035 Self::process_account_multiproof(
1036 worker_id,
1037 &proof_tx,
1038 storage_work_tx.clone(),
1039 *input,
1040 &mut account_proofs_processed,
1041 &mut cursor_metrics_cache,
1042 );
1043 }
1044
1045 AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1046 Self::process_blinded_node(
1047 worker_id,
1048 &proof_tx,
1049 path,
1050 result_sender,
1051 &mut account_nodes_processed,
1052 );
1053 }
1054 }
1055
1056 available_workers.fetch_add(1, Ordering::Relaxed);
1058 }
1059
1060 trace!(
1061 target: "trie::proof_task",
1062 worker_id,
1063 account_proofs_processed,
1064 account_nodes_processed,
1065 "Account worker shutting down"
1066 );
1067
1068 #[cfg(feature = "metrics")]
1069 {
1070 metrics.record_account_nodes(account_nodes_processed as usize);
1071 cursor_metrics.record(&mut cursor_metrics_cache);
1072 }
1073
1074 Ok(())
1075 }
1076
1077 fn process_account_multiproof<Provider>(
1079 worker_id: usize,
1080 proof_tx: &ProofTaskTx<Provider>,
1081 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1082 input: AccountMultiproofInput,
1083 account_proofs_processed: &mut u64,
1084 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1085 ) where
1086 Provider: TrieCursorFactory + HashedCursorFactory,
1087 {
1088 let AccountMultiproofInput {
1089 targets,
1090 mut prefix_sets,
1091 collect_branch_node_masks,
1092 multi_added_removed_keys,
1093 missed_leaves_storage_roots,
1094 proof_result_sender:
1095 ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
1096 } = input;
1097
1098 let span = debug_span!(
1099 target: "trie::proof_task",
1100 "Account multiproof calculation",
1101 targets = targets.len(),
1102 worker_id,
1103 );
1104 let _span_guard = span.enter();
1105
1106 trace!(
1107 target: "trie::proof_task",
1108 "Processing account multiproof"
1109 );
1110
1111 let proof_start = Instant::now();
1112
1113 let mut tracker = ParallelTrieTracker::default();
1114
1115 let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1116
1117 let storage_root_targets_len =
1118 StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1119
1120 tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1121
1122 let storage_proof_receivers = match dispatch_storage_proofs(
1123 &storage_work_tx,
1124 &targets,
1125 &mut storage_prefix_sets,
1126 collect_branch_node_masks,
1127 multi_added_removed_keys.as_ref(),
1128 ) {
1129 Ok(receivers) => receivers,
1130 Err(error) => {
1131 error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
1133 let _ = result_tx.send(ProofResultMessage {
1134 sequence_number: seq,
1135 result: Err(error),
1136 elapsed: start.elapsed(),
1137 state,
1138 });
1139 return;
1140 }
1141 };
1142
1143 let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1145
1146 let ctx = AccountMultiproofParams {
1147 targets: &targets,
1148 prefix_set: account_prefix_set,
1149 collect_branch_node_masks,
1150 multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1151 storage_proof_receivers,
1152 missed_leaves_storage_roots: missed_leaves_storage_roots.as_ref(),
1153 };
1154
1155 let result =
1156 build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
1157
1158 let proof_elapsed = proof_start.elapsed();
1159 let total_elapsed = start.elapsed();
1160 let proof_cursor_metrics = tracker.cursor_metrics;
1161 proof_cursor_metrics.record_spans();
1162
1163 let stats = tracker.finish();
1164 let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
1165 *account_proofs_processed += 1;
1166
1167 if result_tx
1169 .send(ProofResultMessage {
1170 sequence_number: seq,
1171 result,
1172 elapsed: total_elapsed,
1173 state,
1174 })
1175 .is_err()
1176 {
1177 trace!(
1178 target: "trie::proof_task",
1179 worker_id,
1180 account_proofs_processed,
1181 "Account multiproof receiver dropped, discarding result"
1182 );
1183 }
1184
1185 trace!(
1186 target: "trie::proof_task",
1187 proof_time_us = proof_elapsed.as_micros(),
1188 total_elapsed_us = total_elapsed.as_micros(),
1189 total_processed = account_proofs_processed,
1190 account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1191 account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1192 storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1193 storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1194 account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1195 account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1196 storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1197 storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1198 "Account multiproof completed"
1199 );
1200
1201 #[cfg(feature = "metrics")]
1202 cursor_metrics_cache.extend(&proof_cursor_metrics);
1204 }
1205
1206 fn process_blinded_node<Provider>(
1208 worker_id: usize,
1209 proof_tx: &ProofTaskTx<Provider>,
1210 path: Nibbles,
1211 result_sender: Sender<TrieNodeProviderResult>,
1212 account_nodes_processed: &mut u64,
1213 ) where
1214 Provider: TrieCursorFactory + HashedCursorFactory,
1215 {
1216 let span = debug_span!(
1217 target: "trie::proof_task",
1218 "Blinded account node calculation",
1219 ?path,
1220 worker_id,
1221 );
1222 let _span_guard = span.enter();
1223
1224 trace!(
1225 target: "trie::proof_task",
1226 "Processing blinded account node"
1227 );
1228
1229 let start = Instant::now();
1230 let result = proof_tx.process_blinded_account_node(&path);
1231 let elapsed = start.elapsed();
1232
1233 *account_nodes_processed += 1;
1234
1235 if result_sender.send(result).is_err() {
1236 trace!(
1237 target: "trie::proof_task",
1238 worker_id,
1239 ?path,
1240 account_nodes_processed,
1241 "Blinded account node receiver dropped, discarding result"
1242 );
1243 }
1244
1245 trace!(
1246 target: "trie::proof_task",
1247 node_time_us = elapsed.as_micros(),
1248 total_processed = account_nodes_processed,
1249 "Blinded account node completed"
1250 );
1251 }
1252}
1253
1254fn build_account_multiproof_with_storage_roots<P>(
1262 provider: &P,
1263 ctx: AccountMultiproofParams<'_>,
1264 tracker: &mut ParallelTrieTracker,
1265) -> Result<DecodedMultiProof, ParallelStateRootError>
1266where
1267 P: TrieCursorFactory + HashedCursorFactory,
1268{
1269 let accounts_added_removed_keys =
1270 ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1271
1272 let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default();
1275 let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default();
1276
1277 let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1279 let account_trie_cursor =
1280 InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics);
1281
1282 let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1284 .with_added_removed_keys(accounts_added_removed_keys)
1285 .with_deletions_retained(true);
1286
1287 let retainer = ctx
1289 .targets
1290 .keys()
1291 .map(Nibbles::unpack)
1292 .collect::<ProofRetainer>()
1293 .with_added_removed_keys(accounts_added_removed_keys);
1294 let mut hash_builder = HashBuilder::default()
1295 .with_proof_retainer(retainer)
1296 .with_updates(ctx.collect_branch_node_masks);
1297
1298 let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1301 B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1302 let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1303
1304 let account_hashed_cursor =
1306 provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1307 let account_hashed_cursor =
1308 InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics);
1309
1310 let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1311
1312 let mut storage_proof_receivers = ctx.storage_proof_receivers;
1313
1314 while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1315 match account_node {
1316 TrieElement::Branch(node) => {
1317 hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1318 }
1319 TrieElement::Leaf(hashed_address, account) => {
1320 let root = match storage_proof_receivers.remove(&hashed_address) {
1321 Some(receiver) => {
1322 let _guard = debug_span!(
1323 target: "trie::proof_task",
1324 "Waiting for storage proof",
1325 ?hashed_address,
1326 );
1327 let proof_msg = receiver.recv().map_err(|_| {
1330 ParallelStateRootError::StorageRoot(
1331 reth_execution_errors::StorageRootError::Database(
1332 DatabaseError::Other(format!(
1333 "Storage proof channel closed for {hashed_address}"
1334 )),
1335 ),
1336 )
1337 })?;
1338
1339 drop(_guard);
1340
1341 let proof = match proof_msg.result? {
1343 ProofResult::StorageProof { hashed_address: addr, proof } => {
1344 debug_assert_eq!(
1345 addr,
1346 hashed_address,
1347 "storage worker must return same address: expected {hashed_address}, got {addr}"
1348 );
1349 proof
1350 }
1351 ProofResult::AccountMultiproof { .. } => {
1352 unreachable!("storage worker only sends StorageProof variant")
1353 }
1354 };
1355
1356 let root = proof.root;
1357 collected_decoded_storages.insert(hashed_address, proof);
1358 root
1359 }
1360 None => {
1363 tracker.inc_missed_leaves();
1364
1365 match ctx.missed_leaves_storage_roots.entry(hashed_address) {
1366 dashmap::Entry::Occupied(occ) => *occ.get(),
1367 dashmap::Entry::Vacant(vac) => {
1368 let root =
1369 StorageProof::new_hashed(provider, provider, hashed_address)
1370 .with_prefix_set_mut(Default::default())
1371 .with_trie_cursor_metrics(
1372 &mut tracker.cursor_metrics.storage_trie_cursor,
1373 )
1374 .with_hashed_cursor_metrics(
1375 &mut tracker.cursor_metrics.storage_hashed_cursor,
1376 )
1377 .storage_multiproof(
1378 ctx.targets
1379 .get(&hashed_address)
1380 .cloned()
1381 .unwrap_or_default(),
1382 )
1383 .map_err(|e| {
1384 ParallelStateRootError::StorageRoot(
1385 reth_execution_errors::StorageRootError::Database(
1386 DatabaseError::Other(e.to_string()),
1387 ),
1388 )
1389 })?
1390 .root;
1391
1392 vac.insert(root);
1393 root
1394 }
1395 }
1396 }
1397 };
1398
1399 account_rlp.clear();
1401 let account = account.into_trie_account(root);
1402 account.encode(&mut account_rlp as &mut dyn BufMut);
1403
1404 hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1405 }
1406 }
1407 }
1408
1409 for (hashed_address, receiver) in storage_proof_receivers {
1411 if let Ok(proof_msg) = receiver.recv() {
1412 if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
1414 collected_decoded_storages.insert(hashed_address, proof);
1415 }
1416 }
1417 }
1418
1419 let _ = hash_builder.root();
1420
1421 let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1422 let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1423
1424 let (branch_node_hash_masks, branch_node_tree_masks) = if ctx.collect_branch_node_masks {
1425 let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1426 (
1427 updated_branch_nodes.iter().map(|(path, node)| (*path, node.hash_mask)).collect(),
1428 updated_branch_nodes.into_iter().map(|(path, node)| (path, node.tree_mask)).collect(),
1429 )
1430 } else {
1431 (Default::default(), Default::default())
1432 };
1433
1434 tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics);
1436 tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics);
1437
1438 Ok(DecodedMultiProof {
1439 account_subtree: decoded_account_subtree,
1440 branch_node_hash_masks,
1441 branch_node_tree_masks,
1442 storages: collected_decoded_storages,
1443 })
1444}
1445fn dispatch_storage_proofs(
1453 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1454 targets: &MultiProofTargets,
1455 storage_prefix_sets: &mut B256Map<PrefixSet>,
1456 with_branch_node_masks: bool,
1457 multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1458) -> Result<B256Map<CrossbeamReceiver<ProofResultMessage>>, ParallelStateRootError> {
1459 let mut storage_proof_receivers =
1460 B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1461
1462 for (hashed_address, target_slots) in targets.iter() {
1464 let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1465
1466 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1468 let start = Instant::now();
1469
1470 let input = StorageProofInput::new(
1472 *hashed_address,
1473 prefix_set,
1474 target_slots.clone(),
1475 with_branch_node_masks,
1476 multi_added_removed_keys.cloned(),
1477 );
1478
1479 storage_work_tx
1482 .send(StorageWorkerJob::StorageProof {
1483 input,
1484 proof_result_sender: ProofResultContext::new(
1485 result_tx,
1486 0,
1487 HashedPostState::default(),
1488 start,
1489 ),
1490 })
1491 .map_err(|_| {
1492 ParallelStateRootError::Other(format!(
1493 "Failed to queue storage proof for {}: storage worker pool unavailable",
1494 hashed_address
1495 ))
1496 })?;
1497
1498 storage_proof_receivers.insert(*hashed_address, result_rx);
1499 }
1500
1501 Ok(storage_proof_receivers)
1502}
1503#[derive(Debug)]
1505pub struct StorageProofInput {
1506 hashed_address: B256,
1508 prefix_set: PrefixSet,
1510 target_slots: B256Set,
1512 with_branch_node_masks: bool,
1514 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1516}
1517
1518impl StorageProofInput {
1519 pub const fn new(
1522 hashed_address: B256,
1523 prefix_set: PrefixSet,
1524 target_slots: B256Set,
1525 with_branch_node_masks: bool,
1526 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1527 ) -> Self {
1528 Self {
1529 hashed_address,
1530 prefix_set,
1531 target_slots,
1532 with_branch_node_masks,
1533 multi_added_removed_keys,
1534 }
1535 }
1536}
1537#[derive(Debug, Clone)]
1539pub struct AccountMultiproofInput {
1540 pub targets: MultiProofTargets,
1542 pub prefix_sets: TriePrefixSets,
1544 pub collect_branch_node_masks: bool,
1546 pub multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1548 pub missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
1550 pub proof_result_sender: ProofResultContext,
1552}
1553
1554struct AccountMultiproofParams<'a> {
1556 targets: &'a MultiProofTargets,
1558 prefix_set: PrefixSet,
1560 collect_branch_node_masks: bool,
1562 multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1564 storage_proof_receivers: B256Map<CrossbeamReceiver<ProofResultMessage>>,
1566 missed_leaves_storage_roots: &'a DashMap<B256, B256>,
1568}
1569
1570#[derive(Debug)]
1572enum AccountWorkerJob {
1573 AccountMultiproof {
1575 input: Box<AccountMultiproofInput>,
1577 },
1578 BlindedAccountNode {
1580 path: Nibbles,
1582 result_sender: Sender<TrieNodeProviderResult>,
1584 },
1585}
1586
1587#[cfg(test)]
1588mod tests {
1589 use super::*;
1590 use reth_provider::test_utils::create_test_provider_factory;
1591 use tokio::{runtime::Builder, task};
1592
1593 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1594 ProofTaskCtx::new(factory)
1595 }
1596
1597 #[test]
1599 fn spawn_proof_workers_creates_handle() {
1600 let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
1601 runtime.block_on(async {
1602 let handle = tokio::runtime::Handle::current();
1603 let provider_factory = create_test_provider_factory();
1604 let factory =
1605 reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
1606 let ctx = test_ctx(factory);
1607
1608 let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
1609
1610 let _cloned_handle = proof_handle.clone();
1612
1613 drop(proof_handle);
1615 task::yield_now().await;
1616 });
1617 }
1618}