1use crate::{
32 root::ParallelStateRootError,
33 value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
34};
35use alloy_primitives::{
36 map::{B256Map, B256Set},
37 B256,
38};
39use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
40use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
41use reth_primitives_traits::{dashmap::DashMap, FastInstant as Instant};
42use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
43use reth_storage_errors::db::DatabaseError;
44use reth_tasks::Runtime;
45use reth_trie::{
46 hashed_cursor::HashedCursorFactory,
47 proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider},
48 proof_v2,
49 trie_cursor::TrieCursorFactory,
50 DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, Nibbles, ProofTrieNodeV2,
51 ProofV2Target,
52};
53use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
54use std::{
55 cell::RefCell,
56 rc::Rc,
57 sync::{
58 atomic::{AtomicUsize, Ordering},
59 mpsc::{channel, Receiver, Sender},
60 Arc,
61 },
62 time::Duration,
63};
64use tracing::{debug, debug_span, error, instrument, trace};
65
66#[cfg(feature = "metrics")]
67use crate::proof_task_metrics::{
68 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
69};
70
71type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
72
73type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
75 <Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
76 <Provider as HashedCursorFactory>::AccountCursor<'a>,
77 AsyncAccountValueEncoder<
78 <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
79 <Provider as HashedCursorFactory>::StorageCursor<'a>,
80 >,
81>;
82
83type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
85 <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
86 <Provider as HashedCursorFactory>::StorageCursor<'a>,
87>;
88
89#[derive(Debug, Clone)]
95pub struct ProofWorkerHandle {
96 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
98 account_work_tx: CrossbeamSender<AccountWorkerJob>,
100 storage_available_workers: Arc<AtomicUsize>,
103 account_available_workers: Arc<AtomicUsize>,
106 storage_worker_count: usize,
108 account_worker_count: usize,
110}
111
112impl ProofWorkerHandle {
113 #[instrument(
123 name = "ProofWorkerHandle::new",
124 level = "debug",
125 target = "trie::proof_task",
126 skip_all
127 )]
128 pub fn new<Factory>(
129 runtime: &Runtime,
130 task_ctx: ProofTaskCtx<Factory>,
131 halve_workers: bool,
132 ) -> Self
133 where
134 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
135 + Clone
136 + Send
137 + Sync
138 + 'static,
139 {
140 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
141 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
142
143 let storage_available_workers = Arc::<AtomicUsize>::default();
144 let account_available_workers = Arc::<AtomicUsize>::default();
145
146 let cached_storage_roots = Arc::<DashMap<_, _>>::default();
147
148 let divisor = if halve_workers { 2 } else { 1 };
149 let storage_worker_count =
150 runtime.proof_storage_worker_pool().current_num_threads() / divisor;
151 let account_worker_count =
152 runtime.proof_account_worker_pool().current_num_threads() / divisor;
153
154 debug!(
155 target: "trie::proof_task",
156 storage_worker_count,
157 account_worker_count,
158 halve_workers,
159 "Spawning proof worker pools"
160 );
161
162 let storage_rt = runtime.clone();
165 let storage_task_ctx = task_ctx.clone();
166 let storage_avail = storage_available_workers.clone();
167 let storage_roots = cached_storage_roots.clone();
168 let storage_parent_span = tracing::Span::current();
169 runtime.spawn_blocking_named("storage-workers", move || {
170 let worker_id = AtomicUsize::new(0);
171 storage_rt.proof_storage_worker_pool().broadcast(storage_worker_count, |_| {
172 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
173 let span = debug_span!(target: "trie::proof_task", parent: storage_parent_span.clone(), "storage_worker", ?worker_id);
174 let _guard = span.enter();
175
176 #[cfg(feature = "metrics")]
177 let metrics = ProofTaskTrieMetrics::default();
178 #[cfg(feature = "metrics")]
179 let cursor_metrics = ProofTaskCursorMetrics::new();
180
181 let worker = StorageProofWorker::new(
182 storage_task_ctx.clone(),
183 storage_work_rx.clone(),
184 worker_id,
185 storage_avail.clone(),
186 storage_roots.clone(),
187 #[cfg(feature = "metrics")]
188 metrics,
189 #[cfg(feature = "metrics")]
190 cursor_metrics,
191 );
192 if let Err(error) = worker.run() {
193 error!(
194 target: "trie::proof_task",
195 worker_id,
196 ?error,
197 "Storage worker failed"
198 );
199 }
200 });
201 });
202
203 let account_rt = runtime.clone();
204 let account_tx = storage_work_tx.clone();
205 let account_avail = account_available_workers.clone();
206 let account_parent_span = tracing::Span::current();
207 runtime.spawn_blocking_named("account-workers", move || {
208 let worker_id = AtomicUsize::new(0);
209 account_rt.proof_account_worker_pool().broadcast(account_worker_count, |_| {
210 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
211 let span = debug_span!(target: "trie::proof_task", parent: account_parent_span.clone(), "account_worker", ?worker_id);
212 let _guard = span.enter();
213
214 #[cfg(feature = "metrics")]
215 let metrics = ProofTaskTrieMetrics::default();
216 #[cfg(feature = "metrics")]
217 let cursor_metrics = ProofTaskCursorMetrics::new();
218
219 let worker = AccountProofWorker::new(
220 task_ctx.clone(),
221 account_work_rx.clone(),
222 worker_id,
223 account_tx.clone(),
224 account_avail.clone(),
225 cached_storage_roots.clone(),
226 #[cfg(feature = "metrics")]
227 metrics,
228 #[cfg(feature = "metrics")]
229 cursor_metrics,
230 );
231 if let Err(error) = worker.run() {
232 error!(
233 target: "trie::proof_task",
234 worker_id,
235 ?error,
236 "Account worker failed"
237 );
238 }
239 });
240 });
241
242 Self {
243 storage_work_tx,
244 account_work_tx,
245 storage_available_workers,
246 account_available_workers,
247 storage_worker_count,
248 account_worker_count,
249 }
250 }
251
252 pub fn available_storage_workers(&self) -> usize {
254 self.storage_available_workers.load(Ordering::Relaxed)
255 }
256
257 pub fn available_account_workers(&self) -> usize {
259 self.account_available_workers.load(Ordering::Relaxed)
260 }
261
262 pub fn pending_storage_tasks(&self) -> usize {
264 self.storage_work_tx.len()
265 }
266
267 pub fn pending_account_tasks(&self) -> usize {
269 self.account_work_tx.len()
270 }
271
272 pub const fn total_storage_workers(&self) -> usize {
274 self.storage_worker_count
275 }
276
277 pub const fn total_account_workers(&self) -> usize {
279 self.account_worker_count
280 }
281
282 pub fn active_storage_workers(&self) -> usize {
286 self.storage_worker_count.saturating_sub(self.available_storage_workers())
287 }
288
289 pub fn active_account_workers(&self) -> usize {
293 self.account_worker_count.saturating_sub(self.available_account_workers())
294 }
295
296 pub fn dispatch_storage_proof(
300 &self,
301 input: StorageProofInput,
302 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
303 ) -> Result<(), ProviderError> {
304 let hashed_address = input.hashed_address;
305 self.storage_work_tx
306 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
307 .map_err(|err| {
308 if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
309 let _ = proof_result_sender.send(StorageProofResultMessage {
310 hashed_address,
311 result: Err(DatabaseError::Other(
312 "storage workers unavailable".to_string(),
313 )
314 .into()),
315 });
316 }
317
318 ProviderError::other(std::io::Error::other("storage workers unavailable"))
319 })
320 }
321
322 pub fn dispatch_account_multiproof(
326 &self,
327 input: AccountMultiproofInput,
328 ) -> Result<(), ProviderError> {
329 self.account_work_tx
330 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
331 .map_err(|err| {
332 let error =
333 ProviderError::other(std::io::Error::other("account workers unavailable"));
334
335 if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
336 let ProofResultContext { sender: result_tx, state, start_time: start } =
337 input.into_proof_result_sender();
338
339 let _ = result_tx.send(ProofResultMessage {
340 result: Err(ParallelStateRootError::Provider(error.clone())),
341 elapsed: start.elapsed(),
342 state,
343 });
344 }
345
346 error
347 })
348 }
349
350 pub(crate) fn dispatch_blinded_storage_node(
352 &self,
353 account: B256,
354 path: Nibbles,
355 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
356 let (tx, rx) = channel();
357 self.storage_work_tx
358 .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
359 .map_err(|_| {
360 ProviderError::other(std::io::Error::other("storage workers unavailable"))
361 })?;
362
363 Ok(rx)
364 }
365
366 pub(crate) fn dispatch_blinded_account_node(
368 &self,
369 path: Nibbles,
370 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
371 let (tx, rx) = channel();
372 self.account_work_tx
373 .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
374 .map_err(|_| {
375 ProviderError::other(std::io::Error::other("account workers unavailable"))
376 })?;
377
378 Ok(rx)
379 }
380}
381
382#[derive(Clone, Debug)]
384pub struct ProofTaskCtx<Factory> {
385 factory: Factory,
387}
388
389impl<Factory> ProofTaskCtx<Factory> {
390 pub const fn new(factory: Factory) -> Self {
392 Self { factory }
393 }
394}
395
396#[derive(Debug)]
398pub struct ProofTaskTx<Provider> {
399 provider: Provider,
401
402 id: usize,
404}
405
406impl<Provider> ProofTaskTx<Provider> {
407 const fn new(provider: Provider, id: usize) -> Self {
409 Self { provider, id }
410 }
411}
412
413impl<Provider> ProofTaskTx<Provider>
414where
415 Provider: TrieCursorFactory + HashedCursorFactory,
416{
417 fn compute_v2_storage_proof(
418 &self,
419 input: StorageProofInput,
420 calculator: &mut proof_v2::StorageProofCalculator<
421 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
422 <Provider as HashedCursorFactory>::StorageCursor<'_>,
423 >,
424 ) -> Result<StorageProofResult, StateProofError> {
425 let StorageProofInput { hashed_address, mut targets } = input;
426
427 let span = debug_span!(
428 target: "trie::proof_task",
429 "V2 Storage proof calculation",
430 n = %targets.len(),
431 );
432 let _span_guard = span.enter();
433
434 let proof_start = Instant::now();
435
436 let proof = if targets.is_empty() {
438 let root_node = calculator.storage_root_node(hashed_address)?;
439 vec![root_node]
440 } else {
441 calculator.storage_proof(hashed_address, &mut targets)?
442 };
443
444 let root = calculator.compute_root_hash(&proof)?;
445
446 trace!(
447 target: "trie::proof_task",
448 hashed_address = ?hashed_address,
449 proof_time_us = proof_start.elapsed().as_micros(),
450 ?root,
451 worker_id = self.id,
452 "Completed V2 storage proof calculation"
453 );
454
455 Ok(StorageProofResult { proof, root })
456 }
457
458 fn process_blinded_storage_node(
462 &self,
463 account: B256,
464 path: &Nibbles,
465 ) -> TrieNodeProviderResult {
466 let storage_node_provider =
467 ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
468 storage_node_provider.trie_node(path)
469 }
470}
471impl TrieNodeProviderFactory for ProofWorkerHandle {
472 type AccountNodeProvider = ProofTaskTrieNodeProvider;
473 type StorageNodeProvider = ProofTaskTrieNodeProvider;
474
475 fn account_node_provider(&self) -> Self::AccountNodeProvider {
476 ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
477 }
478
479 fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
480 ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
481 }
482}
483
484#[derive(Debug)]
486pub enum ProofTaskTrieNodeProvider {
487 AccountNode {
489 handle: ProofWorkerHandle,
491 },
492 StorageNode {
494 account: B256,
496 handle: ProofWorkerHandle,
498 },
499}
500
501impl TrieNodeProvider for ProofTaskTrieNodeProvider {
502 fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
503 match self {
504 Self::AccountNode { handle } => {
505 let rx = handle
506 .dispatch_blinded_account_node(*path)
507 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
508 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
509 }
510 Self::StorageNode { handle, account } => {
511 let rx = handle
512 .dispatch_blinded_storage_node(*account, *path)
513 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
514 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
515 }
516 }
517 }
518}
519
520pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
525
526#[derive(Debug)]
531pub struct ProofResultMessage {
532 pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
534 pub elapsed: Duration,
536 pub state: HashedPostState,
538}
539
540#[derive(Debug, Clone)]
545pub struct ProofResultContext {
546 pub sender: ProofResultSender,
548 pub state: HashedPostState,
550 pub start_time: Instant,
552}
553
554impl ProofResultContext {
555 pub const fn new(
557 sender: ProofResultSender,
558 state: HashedPostState,
559 start_time: Instant,
560 ) -> Self {
561 Self { sender, state, start_time }
562 }
563}
564
565#[derive(Debug)]
567pub(crate) struct StorageProofResult {
568 pub proof: Vec<ProofTrieNodeV2>,
570 pub root: Option<B256>,
572}
573
574impl StorageProofResult {
575 const fn root(&self) -> Option<B256> {
577 self.root
578 }
579}
580
581#[derive(Debug)]
583pub struct StorageProofResultMessage {
584 #[allow(dead_code)]
586 pub(crate) hashed_address: B256,
587 pub(crate) result: Result<StorageProofResult, StateProofError>,
589}
590
591#[derive(Debug)]
593pub(crate) enum StorageWorkerJob {
594 StorageProof {
596 input: StorageProofInput,
598 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
600 },
601 BlindedStorageNode {
603 account: B256,
605 path: Nibbles,
607 result_sender: Sender<TrieNodeProviderResult>,
609 },
610}
611
612struct StorageProofWorker<Factory> {
617 task_ctx: ProofTaskCtx<Factory>,
619 work_rx: CrossbeamReceiver<StorageWorkerJob>,
621 worker_id: usize,
623 available_workers: Arc<AtomicUsize>,
625 cached_storage_roots: Arc<DashMap<B256, B256>>,
627 #[cfg(feature = "metrics")]
629 metrics: ProofTaskTrieMetrics,
630 #[cfg(feature = "metrics")]
632 cursor_metrics: ProofTaskCursorMetrics,
633}
634
635impl<Factory> StorageProofWorker<Factory>
636where
637 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
638{
639 const fn new(
641 task_ctx: ProofTaskCtx<Factory>,
642 work_rx: CrossbeamReceiver<StorageWorkerJob>,
643 worker_id: usize,
644 available_workers: Arc<AtomicUsize>,
645 cached_storage_roots: Arc<DashMap<B256, B256>>,
646 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
647 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
648 ) -> Self {
649 Self {
650 task_ctx,
651 work_rx,
652 worker_id,
653 available_workers,
654 cached_storage_roots,
655 #[cfg(feature = "metrics")]
656 metrics,
657 #[cfg(feature = "metrics")]
658 cursor_metrics,
659 }
660 }
661
662 fn run(mut self) -> ProviderResult<()> {
680 let provider = self.task_ctx.factory.database_provider_ro()?;
682 let proof_tx = ProofTaskTx::new(provider, self.worker_id);
683
684 trace!(
685 target: "trie::proof_task",
686 worker_id = self.worker_id,
687 "Storage worker started"
688 );
689
690 let mut storage_proofs_processed = 0u64;
691 let mut storage_nodes_processed = 0u64;
692 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
693 let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
694 let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
695 let mut v2_calculator =
696 proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor);
697
698 self.available_workers.fetch_add(1, Ordering::Relaxed);
700
701 let mut total_idle_time = Duration::ZERO;
702 let mut idle_start = Instant::now();
703
704 while let Ok(job) = self.work_rx.recv() {
705 total_idle_time += idle_start.elapsed();
706
707 self.available_workers.fetch_sub(1, Ordering::Relaxed);
709
710 match job {
711 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
712 self.process_storage_proof(
713 &proof_tx,
714 &mut v2_calculator,
715 input,
716 proof_result_sender,
717 &mut storage_proofs_processed,
718 );
719 }
720
721 StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
722 Self::process_blinded_node(
723 self.worker_id,
724 &proof_tx,
725 account,
726 path,
727 result_sender,
728 &mut storage_nodes_processed,
729 );
730 }
731 }
732
733 self.available_workers.fetch_add(1, Ordering::Relaxed);
735
736 idle_start = Instant::now();
737 }
738
739 trace!(
740 target: "trie::proof_task",
741 worker_id = self.worker_id,
742 storage_proofs_processed,
743 storage_nodes_processed,
744 total_idle_time_us = total_idle_time.as_micros(),
745 "Storage worker shutting down"
746 );
747
748 #[cfg(feature = "metrics")]
749 {
750 self.metrics.record_storage_nodes(storage_nodes_processed as usize);
751 self.metrics.record_storage_worker_idle_time(total_idle_time);
752 self.cursor_metrics.record(&mut cursor_metrics_cache);
753 }
754
755 Ok(())
756 }
757
758 fn process_storage_proof<Provider>(
760 &self,
761 proof_tx: &ProofTaskTx<Provider>,
762 v2_calculator: &mut proof_v2::StorageProofCalculator<
763 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
764 <Provider as HashedCursorFactory>::StorageCursor<'_>,
765 >,
766 input: StorageProofInput,
767 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
768 storage_proofs_processed: &mut u64,
769 ) where
770 Provider: TrieCursorFactory + HashedCursorFactory,
771 {
772 let hashed_address = input.hashed_address;
773 let proof_start = Instant::now();
774
775 trace!(
776 target: "trie::proof_task",
777 worker_id = self.worker_id,
778 hashed_address = ?hashed_address,
779 targets_len = input.targets.len(),
780 "Processing V2 storage proof"
781 );
782
783 let result = proof_tx.compute_v2_storage_proof(input, v2_calculator);
784
785 let proof_elapsed = proof_start.elapsed();
786 *storage_proofs_processed += 1;
787
788 let root = result.as_ref().ok().and_then(|result| result.root());
789
790 if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
791 trace!(
792 target: "trie::proof_task",
793 worker_id = self.worker_id,
794 hashed_address = ?hashed_address,
795 storage_proofs_processed,
796 "Proof result receiver dropped, discarding result"
797 );
798 }
799
800 if let Some(root) = root {
801 self.cached_storage_roots.insert(hashed_address, root);
802 }
803
804 trace!(
805 target: "trie::proof_task",
806 worker_id = self.worker_id,
807 hashed_address = ?hashed_address,
808 proof_time_us = proof_elapsed.as_micros(),
809 total_processed = storage_proofs_processed,
810 ?root,
811 "Storage proof completed"
812 );
813 }
814
815 fn process_blinded_node<Provider>(
817 worker_id: usize,
818 proof_tx: &ProofTaskTx<Provider>,
819 account: B256,
820 path: Nibbles,
821 result_sender: Sender<TrieNodeProviderResult>,
822 storage_nodes_processed: &mut u64,
823 ) where
824 Provider: TrieCursorFactory + HashedCursorFactory,
825 {
826 trace!(
827 target: "trie::proof_task",
828 worker_id,
829 ?account,
830 ?path,
831 "Processing blinded storage node"
832 );
833
834 let start = Instant::now();
835 let result = proof_tx.process_blinded_storage_node(account, &path);
836 let elapsed = start.elapsed();
837
838 *storage_nodes_processed += 1;
839
840 if result_sender.send(result).is_err() {
841 trace!(
842 target: "trie::proof_task",
843 worker_id,
844 ?account,
845 ?path,
846 storage_nodes_processed,
847 "Blinded storage node receiver dropped, discarding result"
848 );
849 }
850
851 trace!(
852 target: "trie::proof_task",
853 worker_id,
854 ?account,
855 ?path,
856 elapsed_us = elapsed.as_micros(),
857 total_processed = storage_nodes_processed,
858 "Blinded storage node completed"
859 );
860 }
861}
862
863struct AccountProofWorker<Factory> {
868 task_ctx: ProofTaskCtx<Factory>,
870 work_rx: CrossbeamReceiver<AccountWorkerJob>,
872 worker_id: usize,
874 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
876 available_workers: Arc<AtomicUsize>,
878 cached_storage_roots: Arc<DashMap<B256, B256>>,
880 #[cfg(feature = "metrics")]
882 metrics: ProofTaskTrieMetrics,
883 #[cfg(feature = "metrics")]
885 cursor_metrics: ProofTaskCursorMetrics,
886}
887
888impl<Factory> AccountProofWorker<Factory>
889where
890 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
891{
892 #[allow(clippy::too_many_arguments)]
894 const fn new(
895 task_ctx: ProofTaskCtx<Factory>,
896 work_rx: CrossbeamReceiver<AccountWorkerJob>,
897 worker_id: usize,
898 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
899 available_workers: Arc<AtomicUsize>,
900 cached_storage_roots: Arc<DashMap<B256, B256>>,
901 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
902 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
903 ) -> Self {
904 Self {
905 task_ctx,
906 work_rx,
907 worker_id,
908 storage_work_tx,
909 available_workers,
910 cached_storage_roots,
911 #[cfg(feature = "metrics")]
912 metrics,
913 #[cfg(feature = "metrics")]
914 cursor_metrics,
915 }
916 }
917
918 fn run(mut self) -> ProviderResult<()> {
936 let provider = self.task_ctx.factory.database_provider_ro()?;
937
938 trace!(
939 target: "trie::proof_task",
940 worker_id=self.worker_id,
941 "Account worker started"
942 );
943
944 let mut account_proofs_processed = 0u64;
945 let mut account_nodes_processed = 0u64;
946 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
947
948 let account_trie_cursor = provider.account_trie_cursor()?;
951 let account_hashed_cursor = provider.hashed_account_cursor()?;
952
953 let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
954 let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
955
956 let mut v2_account_calculator = proof_v2::ProofCalculator::<
957 _,
958 _,
959 AsyncAccountValueEncoder<
960 <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
961 <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
962 >,
963 >::new(account_trie_cursor, account_hashed_cursor);
964 let v2_storage_calculator =
965 Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
966 storage_trie_cursor,
967 storage_hashed_cursor,
968 )));
969
970 self.available_workers.fetch_add(1, Ordering::Relaxed);
972
973 let mut total_idle_time = Duration::ZERO;
974 let mut idle_start = Instant::now();
975 let mut value_encoder_stats_cache = ValueEncoderStats::default();
976
977 while let Ok(job) = self.work_rx.recv() {
978 total_idle_time += idle_start.elapsed();
979
980 self.available_workers.fetch_sub(1, Ordering::Relaxed);
982
983 match job {
984 AccountWorkerJob::AccountMultiproof { input } => {
985 let value_encoder_stats = self.process_account_multiproof::<Factory::Provider>(
986 &mut v2_account_calculator,
987 v2_storage_calculator.clone(),
988 *input,
989 &mut account_proofs_processed,
990 &mut cursor_metrics_cache,
991 );
992 total_idle_time += value_encoder_stats.storage_wait_time;
993 value_encoder_stats_cache.extend(&value_encoder_stats);
994 }
995
996 AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
997 Self::process_blinded_node(
998 self.worker_id,
999 &provider,
1000 path,
1001 result_sender,
1002 &mut account_nodes_processed,
1003 );
1004 }
1005 }
1006
1007 self.available_workers.fetch_add(1, Ordering::Relaxed);
1009
1010 idle_start = Instant::now();
1011 }
1012
1013 trace!(
1014 target: "trie::proof_task",
1015 worker_id=self.worker_id,
1016 account_proofs_processed,
1017 account_nodes_processed,
1018 total_idle_time_us = total_idle_time.as_micros(),
1019 "Account worker shutting down"
1020 );
1021
1022 #[cfg(feature = "metrics")]
1023 {
1024 self.metrics.record_account_nodes(account_nodes_processed as usize);
1025 self.metrics.record_account_worker_idle_time(total_idle_time);
1026 self.cursor_metrics.record(&mut cursor_metrics_cache);
1027 self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
1028 }
1029
1030 Ok(())
1031 }
1032
1033 fn compute_v2_account_multiproof<'a, Provider>(
1034 &self,
1035 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1036 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1037 targets: MultiProofTargetsV2,
1038 ) -> Result<(DecodedMultiProofV2, ValueEncoderStats), ParallelStateRootError>
1039 where
1040 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1041 {
1042 let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1043
1044 let span = debug_span!(
1045 target: "trie::proof_task",
1046 "Account V2 multiproof calculation",
1047 account_targets = account_targets.len(),
1048 storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1049 );
1050 let _span_guard = span.enter();
1051
1052 trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1053
1054 let storage_proof_receivers =
1055 dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1056
1057 let mut value_encoder = AsyncAccountValueEncoder::new(
1058 storage_proof_receivers,
1059 self.cached_storage_roots.clone(),
1060 v2_storage_calculator,
1061 );
1062
1063 let account_proofs =
1064 v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
1065
1066 let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
1067
1068 let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
1069
1070 Ok((proof, value_encoder_stats))
1071 }
1072
1073 fn process_account_multiproof<'a, Provider>(
1077 &self,
1078 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1079 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1080 input: AccountMultiproofInput,
1081 account_proofs_processed: &mut u64,
1082 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1083 ) -> ValueEncoderStats
1084 where
1085 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1086 {
1087 let proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
1088 let proof_start = Instant::now();
1089
1090 let AccountMultiproofInput { targets, proof_result_sender } = input;
1091 let (result, value_encoder_stats) = match self.compute_v2_account_multiproof::<Provider>(
1092 v2_account_calculator,
1093 v2_storage_calculator,
1094 targets,
1095 ) {
1096 Ok((proof, stats)) => (Ok(proof), stats),
1097 Err(e) => (Err(e), ValueEncoderStats::default()),
1098 };
1099
1100 let ProofResultContext { sender: result_tx, state, start_time: start } =
1101 proof_result_sender;
1102
1103 let proof_elapsed = proof_start.elapsed();
1104 let total_elapsed = start.elapsed();
1105 *account_proofs_processed += 1;
1106
1107 if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
1109 trace!(
1110 target: "trie::proof_task",
1111 worker_id=self.worker_id,
1112 account_proofs_processed,
1113 "Account multiproof receiver dropped, discarding result"
1114 );
1115 }
1116
1117 proof_cursor_metrics.record_spans();
1118
1119 trace!(
1120 target: "trie::proof_task",
1121 proof_time_us = proof_elapsed.as_micros(),
1122 total_elapsed_us = total_elapsed.as_micros(),
1123 total_processed = account_proofs_processed,
1124 account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1125 account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1126 storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1127 storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1128 account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1129 account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1130 storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1131 storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1132 "Account multiproof completed"
1133 );
1134
1135 #[cfg(feature = "metrics")]
1136 cursor_metrics_cache.extend(&proof_cursor_metrics);
1138
1139 value_encoder_stats
1140 }
1141
1142 fn process_blinded_node<Provider>(
1144 worker_id: usize,
1145 provider: &Provider,
1146 path: Nibbles,
1147 result_sender: Sender<TrieNodeProviderResult>,
1148 account_nodes_processed: &mut u64,
1149 ) where
1150 Provider: TrieCursorFactory + HashedCursorFactory,
1151 {
1152 let span = debug_span!(
1153 target: "trie::proof_task",
1154 "Blinded account node calculation",
1155 ?path,
1156 );
1157 let _span_guard = span.enter();
1158
1159 trace!(
1160 target: "trie::proof_task",
1161 "Processing blinded account node"
1162 );
1163
1164 let start = Instant::now();
1165 let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
1166 let result = account_node_provider.trie_node(&path);
1167 let elapsed = start.elapsed();
1168
1169 *account_nodes_processed += 1;
1170
1171 if result_sender.send(result).is_err() {
1172 trace!(
1173 target: "trie::proof_task",
1174 worker_id,
1175 ?path,
1176 account_nodes_processed,
1177 "Blinded account node receiver dropped, discarding result"
1178 );
1179 }
1180
1181 trace!(
1182 target: "trie::proof_task",
1183 node_time_us = elapsed.as_micros(),
1184 total_processed = account_nodes_processed,
1185 "Blinded account node completed"
1186 );
1187 }
1188}
1189
1190fn dispatch_v2_storage_proofs(
1198 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1199 account_targets: &[ProofV2Target],
1200 mut storage_targets: B256Map<Vec<ProofV2Target>>,
1201) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1202 let mut storage_proof_receivers =
1203 B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1204
1205 let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1207
1208 for (hashed_address, targets) in &mut storage_targets {
1211 if account_target_addresses.contains(hashed_address) &&
1212 let Some(first) = targets.first_mut()
1213 {
1214 *first = first.with_min_len(0);
1215 }
1216 }
1217
1218 let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1222 sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1223
1224 for (hashed_address, targets) in sorted_storage_targets {
1226 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1228 let input = StorageProofInput::new(hashed_address, targets);
1229
1230 storage_work_tx
1231 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1232 .map_err(|_| {
1233 ParallelStateRootError::Other(format!(
1234 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1235 ))
1236 })?;
1237
1238 storage_proof_receivers.insert(hashed_address, result_rx);
1239 }
1240
1241 Ok(storage_proof_receivers)
1242}
1243
1244#[derive(Debug)]
1246pub struct StorageProofInput {
1247 pub hashed_address: B256,
1249 pub targets: Vec<ProofV2Target>,
1251}
1252
1253impl StorageProofInput {
1254 pub const fn new(hashed_address: B256, targets: Vec<ProofV2Target>) -> Self {
1256 Self { hashed_address, targets }
1257 }
1258}
1259
1260#[derive(Debug)]
1262pub struct AccountMultiproofInput {
1263 pub targets: MultiProofTargetsV2,
1265 pub proof_result_sender: ProofResultContext,
1267}
1268
1269impl AccountMultiproofInput {
1270 fn into_proof_result_sender(self) -> ProofResultContext {
1272 self.proof_result_sender
1273 }
1274}
1275
1276#[derive(Debug)]
1278enum AccountWorkerJob {
1279 AccountMultiproof {
1281 input: Box<AccountMultiproofInput>,
1283 },
1284 BlindedAccountNode {
1286 path: Nibbles,
1288 result_sender: Sender<TrieNodeProviderResult>,
1290 },
1291}
1292
1293#[cfg(test)]
1294mod tests {
1295 use super::*;
1296 use reth_provider::test_utils::create_test_provider_factory;
1297
1298 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1299 ProofTaskCtx::new(factory)
1300 }
1301
1302 #[test]
1304 fn spawn_proof_workers_creates_handle() {
1305 let provider_factory = create_test_provider_factory();
1306 let changeset_cache = reth_trie_db::ChangesetCache::new();
1307 let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1308 provider_factory,
1309 changeset_cache,
1310 );
1311 let ctx = test_ctx(factory);
1312
1313 let runtime = reth_tasks::Runtime::test();
1314 let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
1315
1316 let _cloned_handle = proof_handle.clone();
1318
1319 drop(proof_handle);
1321 }
1322}