1use crate::{
32 root::ParallelStateRootError,
33 value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
34};
35use alloy_primitives::{
36 map::{B256Map, B256Set},
37 B256,
38};
39use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
40use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
41use reth_primitives_traits::{dashmap::DashMap, FastInstant as Instant};
42use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
43use reth_storage_errors::db::DatabaseError;
44use reth_tasks::Runtime;
45use reth_trie::{
46 hashed_cursor::HashedCursorFactory,
47 proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider},
48 proof_v2,
49 trie_cursor::TrieCursorFactory,
50 DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, Nibbles, ProofTrieNodeV2,
51 ProofV2Target,
52};
53use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
54use std::{
55 cell::RefCell,
56 rc::Rc,
57 sync::{
58 atomic::{AtomicUsize, Ordering},
59 mpsc::{channel, Receiver, Sender},
60 Arc,
61 },
62 time::Duration,
63};
64use tracing::{debug, debug_span, error, instrument, trace};
65
66#[cfg(feature = "metrics")]
67use crate::proof_task_metrics::{
68 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
69};
70
71type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
72
73type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
75 <Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
76 <Provider as HashedCursorFactory>::AccountCursor<'a>,
77 AsyncAccountValueEncoder<
78 <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
79 <Provider as HashedCursorFactory>::StorageCursor<'a>,
80 >,
81>;
82
83type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
85 <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
86 <Provider as HashedCursorFactory>::StorageCursor<'a>,
87>;
88
89#[derive(Debug, Clone)]
95pub struct ProofWorkerHandle {
96 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
98 account_work_tx: CrossbeamSender<AccountWorkerJob>,
100 storage_available_workers: Arc<AtomicUsize>,
103 account_available_workers: Arc<AtomicUsize>,
106 storage_worker_count: usize,
108 account_worker_count: usize,
110}
111
112impl ProofWorkerHandle {
113 #[instrument(
123 name = "ProofWorkerHandle::new",
124 level = "debug",
125 target = "trie::proof_task",
126 skip_all
127 )]
128 pub fn new<Factory>(
129 runtime: &Runtime,
130 task_ctx: ProofTaskCtx<Factory>,
131 halve_workers: bool,
132 ) -> Self
133 where
134 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
135 + Clone
136 + Send
137 + Sync
138 + 'static,
139 {
140 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
141 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
142
143 let storage_available_workers = Arc::<AtomicUsize>::default();
144 let account_available_workers = Arc::<AtomicUsize>::default();
145
146 let cached_storage_roots = Arc::<DashMap<_, _>>::default();
147
148 let divisor = if halve_workers { 2 } else { 1 };
149 let storage_worker_count =
150 runtime.proof_storage_worker_pool().current_num_threads() / divisor;
151 let account_worker_count =
152 runtime.proof_account_worker_pool().current_num_threads() / divisor;
153
154 debug!(
155 target: "trie::proof_task",
156 storage_worker_count,
157 account_worker_count,
158 halve_workers,
159 "Spawning proof worker pools"
160 );
161
162 let storage_rt = runtime.clone();
165 let storage_task_ctx = task_ctx.clone();
166 let storage_avail = storage_available_workers.clone();
167 let storage_roots = cached_storage_roots.clone();
168 let storage_parent_span = tracing::Span::current();
169 runtime.spawn_blocking_named("storage-workers", move || {
170 let worker_id = AtomicUsize::new(0);
171 storage_rt.proof_storage_worker_pool().broadcast(storage_worker_count, |_| {
172 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
173 let span = debug_span!(target: "trie::proof_task", parent: storage_parent_span.clone(), "storage_worker", ?worker_id);
174 let _guard = span.enter();
175
176 #[cfg(feature = "metrics")]
177 let metrics = ProofTaskTrieMetrics::default();
178 #[cfg(feature = "metrics")]
179 let cursor_metrics = ProofTaskCursorMetrics::new();
180
181 let worker = StorageProofWorker::new(
182 storage_task_ctx.clone(),
183 storage_work_rx.clone(),
184 worker_id,
185 storage_avail.clone(),
186 storage_roots.clone(),
187 #[cfg(feature = "metrics")]
188 metrics,
189 #[cfg(feature = "metrics")]
190 cursor_metrics,
191 );
192 if let Err(error) = worker.run() {
193 error!(
194 target: "trie::proof_task",
195 worker_id,
196 ?error,
197 "Storage worker failed"
198 );
199 }
200 });
201 });
202
203 let account_rt = runtime.clone();
204 let account_tx = storage_work_tx.clone();
205 let account_avail = account_available_workers.clone();
206 let account_parent_span = tracing::Span::current();
207 runtime.spawn_blocking_named("account-workers", move || {
208 let worker_id = AtomicUsize::new(0);
209 account_rt.proof_account_worker_pool().broadcast(account_worker_count, |_| {
210 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
211 let span = debug_span!(target: "trie::proof_task", parent: account_parent_span.clone(), "account_worker", ?worker_id);
212 let _guard = span.enter();
213
214 #[cfg(feature = "metrics")]
215 let metrics = ProofTaskTrieMetrics::default();
216 #[cfg(feature = "metrics")]
217 let cursor_metrics = ProofTaskCursorMetrics::new();
218
219 let worker = AccountProofWorker::new(
220 task_ctx.clone(),
221 account_work_rx.clone(),
222 worker_id,
223 account_tx.clone(),
224 account_avail.clone(),
225 cached_storage_roots.clone(),
226 #[cfg(feature = "metrics")]
227 metrics,
228 #[cfg(feature = "metrics")]
229 cursor_metrics,
230 );
231 if let Err(error) = worker.run() {
232 error!(
233 target: "trie::proof_task",
234 worker_id,
235 ?error,
236 "Account worker failed"
237 );
238 }
239 });
240 });
241
242 Self {
243 storage_work_tx,
244 account_work_tx,
245 storage_available_workers,
246 account_available_workers,
247 storage_worker_count,
248 account_worker_count,
249 }
250 }
251
252 pub fn available_storage_workers(&self) -> usize {
254 self.storage_available_workers.load(Ordering::Relaxed)
255 }
256
257 pub fn available_account_workers(&self) -> usize {
259 self.account_available_workers.load(Ordering::Relaxed)
260 }
261
262 pub fn pending_storage_tasks(&self) -> usize {
264 self.storage_work_tx.len()
265 }
266
267 pub fn pending_account_tasks(&self) -> usize {
269 self.account_work_tx.len()
270 }
271
272 pub const fn total_storage_workers(&self) -> usize {
274 self.storage_worker_count
275 }
276
277 pub const fn total_account_workers(&self) -> usize {
279 self.account_worker_count
280 }
281
282 pub fn active_storage_workers(&self) -> usize {
286 self.storage_worker_count.saturating_sub(self.available_storage_workers())
287 }
288
289 pub fn active_account_workers(&self) -> usize {
293 self.account_worker_count.saturating_sub(self.available_account_workers())
294 }
295
296 pub fn dispatch_storage_proof(
300 &self,
301 input: StorageProofInput,
302 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
303 ) -> Result<(), ProviderError> {
304 let hashed_address = input.hashed_address;
305 self.storage_work_tx
306 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
307 .map_err(|err| {
308 if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
309 let _ = proof_result_sender.send(StorageProofResultMessage {
310 hashed_address,
311 result: Err(DatabaseError::Other(
312 "storage workers unavailable".to_string(),
313 )
314 .into()),
315 });
316 }
317
318 ProviderError::other(std::io::Error::other("storage workers unavailable"))
319 })
320 }
321
322 pub fn dispatch_account_multiproof(
326 &self,
327 input: AccountMultiproofInput,
328 ) -> Result<(), ProviderError> {
329 self.account_work_tx
330 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
331 .map_err(|err| {
332 let error =
333 ProviderError::other(std::io::Error::other("account workers unavailable"));
334
335 if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
336 let ProofResultContext { sender: result_tx, state, start_time: start } =
337 input.into_proof_result_sender();
338
339 let _ = result_tx.send(ProofResultMessage {
340 result: Err(ParallelStateRootError::Provider(error.clone())),
341 elapsed: start.elapsed(),
342 state,
343 });
344 }
345
346 error
347 })
348 }
349
350 pub(crate) fn dispatch_blinded_storage_node(
352 &self,
353 account: B256,
354 path: Nibbles,
355 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
356 let (tx, rx) = channel();
357 self.storage_work_tx
358 .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
359 .map_err(|_| {
360 ProviderError::other(std::io::Error::other("storage workers unavailable"))
361 })?;
362
363 Ok(rx)
364 }
365
366 pub(crate) fn dispatch_blinded_account_node(
368 &self,
369 path: Nibbles,
370 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
371 let (tx, rx) = channel();
372 self.account_work_tx
373 .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
374 .map_err(|_| {
375 ProviderError::other(std::io::Error::other("account workers unavailable"))
376 })?;
377
378 Ok(rx)
379 }
380}
381
382#[derive(Clone, Debug)]
384pub struct ProofTaskCtx<Factory> {
385 factory: Factory,
387 #[cfg(feature = "trie-debug")]
389 proof_jitter: Option<Duration>,
390}
391
392impl<Factory> ProofTaskCtx<Factory> {
393 pub const fn new(factory: Factory) -> Self {
395 Self {
396 factory,
397 #[cfg(feature = "trie-debug")]
398 proof_jitter: None,
399 }
400 }
401
402 #[cfg(feature = "trie-debug")]
404 pub const fn with_proof_jitter(mut self, jitter: Option<Duration>) -> Self {
405 self.proof_jitter = jitter;
406 self
407 }
408}
409
410#[derive(Debug)]
412pub struct ProofTaskTx<Provider> {
413 provider: Provider,
415
416 id: usize,
418}
419
420impl<Provider> ProofTaskTx<Provider> {
421 const fn new(provider: Provider, id: usize) -> Self {
423 Self { provider, id }
424 }
425}
426
427impl<Provider> ProofTaskTx<Provider>
428where
429 Provider: TrieCursorFactory + HashedCursorFactory,
430{
431 fn compute_v2_storage_proof(
432 &self,
433 input: StorageProofInput,
434 calculator: &mut proof_v2::StorageProofCalculator<
435 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
436 <Provider as HashedCursorFactory>::StorageCursor<'_>,
437 >,
438 ) -> Result<StorageProofResult, StateProofError> {
439 let StorageProofInput { hashed_address, mut targets } = input;
440
441 let span = debug_span!(
442 target: "trie::proof_task",
443 "V2 Storage proof calculation",
444 n = %targets.len(),
445 );
446 let _span_guard = span.enter();
447
448 let proof_start = Instant::now();
449
450 let proof = if targets.is_empty() {
452 let root_node = calculator.storage_root_node(hashed_address)?;
453 vec![root_node]
454 } else {
455 calculator.storage_proof(hashed_address, &mut targets)?
456 };
457
458 let root = calculator.compute_root_hash(&proof)?;
459
460 trace!(
461 target: "trie::proof_task",
462 hashed_address = ?hashed_address,
463 proof_time_us = proof_start.elapsed().as_micros(),
464 ?root,
465 worker_id = self.id,
466 "Completed V2 storage proof calculation"
467 );
468
469 Ok(StorageProofResult { proof, root })
470 }
471
472 fn process_blinded_storage_node(
476 &self,
477 account: B256,
478 path: &Nibbles,
479 ) -> TrieNodeProviderResult {
480 let storage_node_provider =
481 ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
482 storage_node_provider.trie_node(path)
483 }
484}
485impl TrieNodeProviderFactory for ProofWorkerHandle {
486 type AccountNodeProvider = ProofTaskTrieNodeProvider;
487 type StorageNodeProvider = ProofTaskTrieNodeProvider;
488
489 fn account_node_provider(&self) -> Self::AccountNodeProvider {
490 ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
491 }
492
493 fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
494 ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
495 }
496}
497
498#[derive(Debug)]
500pub enum ProofTaskTrieNodeProvider {
501 AccountNode {
503 handle: ProofWorkerHandle,
505 },
506 StorageNode {
508 account: B256,
510 handle: ProofWorkerHandle,
512 },
513}
514
515impl TrieNodeProvider for ProofTaskTrieNodeProvider {
516 fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
517 match self {
518 Self::AccountNode { handle } => {
519 let rx = handle
520 .dispatch_blinded_account_node(*path)
521 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
522 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
523 }
524 Self::StorageNode { handle, account } => {
525 let rx = handle
526 .dispatch_blinded_storage_node(*account, *path)
527 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
528 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
529 }
530 }
531 }
532}
533
534pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
539
540#[derive(Debug)]
545pub struct ProofResultMessage {
546 pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
548 pub elapsed: Duration,
550 pub state: HashedPostState,
552}
553
554#[derive(Debug, Clone)]
559pub struct ProofResultContext {
560 pub sender: ProofResultSender,
562 pub state: HashedPostState,
564 pub start_time: Instant,
566}
567
568impl ProofResultContext {
569 pub const fn new(
571 sender: ProofResultSender,
572 state: HashedPostState,
573 start_time: Instant,
574 ) -> Self {
575 Self { sender, state, start_time }
576 }
577}
578
579#[derive(Debug)]
581pub(crate) struct StorageProofResult {
582 pub proof: Vec<ProofTrieNodeV2>,
584 pub root: Option<B256>,
586}
587
588impl StorageProofResult {
589 const fn root(&self) -> Option<B256> {
591 self.root
592 }
593}
594
595#[derive(Debug)]
597pub struct StorageProofResultMessage {
598 #[allow(dead_code)]
600 pub(crate) hashed_address: B256,
601 pub(crate) result: Result<StorageProofResult, StateProofError>,
603}
604
605#[derive(Debug)]
607pub(crate) enum StorageWorkerJob {
608 StorageProof {
610 input: StorageProofInput,
612 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
614 },
615 BlindedStorageNode {
617 account: B256,
619 path: Nibbles,
621 result_sender: Sender<TrieNodeProviderResult>,
623 },
624}
625
626struct StorageProofWorker<Factory> {
631 task_ctx: ProofTaskCtx<Factory>,
633 work_rx: CrossbeamReceiver<StorageWorkerJob>,
635 worker_id: usize,
637 available_workers: Arc<AtomicUsize>,
639 cached_storage_roots: Arc<DashMap<B256, B256>>,
641 #[cfg(feature = "metrics")]
643 metrics: ProofTaskTrieMetrics,
644 #[cfg(feature = "metrics")]
646 cursor_metrics: ProofTaskCursorMetrics,
647}
648
649impl<Factory> StorageProofWorker<Factory>
650where
651 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
652{
653 const fn new(
655 task_ctx: ProofTaskCtx<Factory>,
656 work_rx: CrossbeamReceiver<StorageWorkerJob>,
657 worker_id: usize,
658 available_workers: Arc<AtomicUsize>,
659 cached_storage_roots: Arc<DashMap<B256, B256>>,
660 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
661 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
662 ) -> Self {
663 Self {
664 task_ctx,
665 work_rx,
666 worker_id,
667 available_workers,
668 cached_storage_roots,
669 #[cfg(feature = "metrics")]
670 metrics,
671 #[cfg(feature = "metrics")]
672 cursor_metrics,
673 }
674 }
675
676 fn run(mut self) -> ProviderResult<()> {
694 let provider = self.task_ctx.factory.database_provider_ro()?;
696 let proof_tx = ProofTaskTx::new(provider, self.worker_id);
697
698 trace!(
699 target: "trie::proof_task",
700 worker_id = self.worker_id,
701 "Storage worker started"
702 );
703
704 let mut storage_proofs_processed = 0u64;
705 let mut storage_nodes_processed = 0u64;
706 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
707 let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
708 let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
709 let mut v2_calculator =
710 proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor);
711
712 self.available_workers.fetch_add(1, Ordering::Relaxed);
714
715 let mut total_idle_time = Duration::ZERO;
716 let mut idle_start = Instant::now();
717
718 while let Ok(job) = self.work_rx.recv() {
719 total_idle_time += idle_start.elapsed();
720
721 self.available_workers.fetch_sub(1, Ordering::Relaxed);
723
724 #[cfg(feature = "trie-debug")]
725 if let Some(max_jitter) = self.task_ctx.proof_jitter {
726 let jitter =
727 Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
728 trace!(
729 target: "trie::proof_task",
730 worker_id = self.worker_id,
731 jitter_us = jitter.as_micros(),
732 "Storage worker applying proof jitter"
733 );
734 std::thread::sleep(jitter);
735 }
736
737 match job {
738 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
739 self.process_storage_proof(
740 &proof_tx,
741 &mut v2_calculator,
742 input,
743 proof_result_sender,
744 &mut storage_proofs_processed,
745 );
746 }
747
748 StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
749 Self::process_blinded_node(
750 self.worker_id,
751 &proof_tx,
752 account,
753 path,
754 result_sender,
755 &mut storage_nodes_processed,
756 );
757 }
758 }
759
760 self.available_workers.fetch_add(1, Ordering::Relaxed);
762
763 idle_start = Instant::now();
764 }
765
766 trace!(
767 target: "trie::proof_task",
768 worker_id = self.worker_id,
769 storage_proofs_processed,
770 storage_nodes_processed,
771 total_idle_time_us = total_idle_time.as_micros(),
772 "Storage worker shutting down"
773 );
774
775 #[cfg(feature = "metrics")]
776 {
777 self.metrics.record_storage_nodes(storage_nodes_processed as usize);
778 self.metrics.record_storage_worker_idle_time(total_idle_time);
779 self.cursor_metrics.record(&mut cursor_metrics_cache);
780 }
781
782 Ok(())
783 }
784
785 fn process_storage_proof<Provider>(
787 &self,
788 proof_tx: &ProofTaskTx<Provider>,
789 v2_calculator: &mut proof_v2::StorageProofCalculator<
790 <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
791 <Provider as HashedCursorFactory>::StorageCursor<'_>,
792 >,
793 input: StorageProofInput,
794 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
795 storage_proofs_processed: &mut u64,
796 ) where
797 Provider: TrieCursorFactory + HashedCursorFactory,
798 {
799 let hashed_address = input.hashed_address;
800 let proof_start = Instant::now();
801
802 trace!(
803 target: "trie::proof_task",
804 worker_id = self.worker_id,
805 hashed_address = ?hashed_address,
806 targets_len = input.targets.len(),
807 "Processing V2 storage proof"
808 );
809
810 let result = proof_tx.compute_v2_storage_proof(input, v2_calculator);
811
812 let proof_elapsed = proof_start.elapsed();
813 *storage_proofs_processed += 1;
814
815 let root = result.as_ref().ok().and_then(|result| result.root());
816
817 if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
818 trace!(
819 target: "trie::proof_task",
820 worker_id = self.worker_id,
821 hashed_address = ?hashed_address,
822 storage_proofs_processed,
823 "Proof result receiver dropped, discarding result"
824 );
825 }
826
827 if let Some(root) = root {
828 self.cached_storage_roots.insert(hashed_address, root);
829 }
830
831 trace!(
832 target: "trie::proof_task",
833 worker_id = self.worker_id,
834 hashed_address = ?hashed_address,
835 proof_time_us = proof_elapsed.as_micros(),
836 total_processed = storage_proofs_processed,
837 ?root,
838 "Storage proof completed"
839 );
840 }
841
842 fn process_blinded_node<Provider>(
844 worker_id: usize,
845 proof_tx: &ProofTaskTx<Provider>,
846 account: B256,
847 path: Nibbles,
848 result_sender: Sender<TrieNodeProviderResult>,
849 storage_nodes_processed: &mut u64,
850 ) where
851 Provider: TrieCursorFactory + HashedCursorFactory,
852 {
853 trace!(
854 target: "trie::proof_task",
855 worker_id,
856 ?account,
857 ?path,
858 "Processing blinded storage node"
859 );
860
861 let start = Instant::now();
862 let result = proof_tx.process_blinded_storage_node(account, &path);
863 let elapsed = start.elapsed();
864
865 *storage_nodes_processed += 1;
866
867 if result_sender.send(result).is_err() {
868 trace!(
869 target: "trie::proof_task",
870 worker_id,
871 ?account,
872 ?path,
873 storage_nodes_processed,
874 "Blinded storage node receiver dropped, discarding result"
875 );
876 }
877
878 trace!(
879 target: "trie::proof_task",
880 worker_id,
881 ?account,
882 ?path,
883 elapsed_us = elapsed.as_micros(),
884 total_processed = storage_nodes_processed,
885 "Blinded storage node completed"
886 );
887 }
888}
889
890struct AccountProofWorker<Factory> {
895 task_ctx: ProofTaskCtx<Factory>,
897 work_rx: CrossbeamReceiver<AccountWorkerJob>,
899 worker_id: usize,
901 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
903 available_workers: Arc<AtomicUsize>,
905 cached_storage_roots: Arc<DashMap<B256, B256>>,
907 #[cfg(feature = "metrics")]
909 metrics: ProofTaskTrieMetrics,
910 #[cfg(feature = "metrics")]
912 cursor_metrics: ProofTaskCursorMetrics,
913}
914
915impl<Factory> AccountProofWorker<Factory>
916where
917 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
918{
919 #[allow(clippy::too_many_arguments)]
921 const fn new(
922 task_ctx: ProofTaskCtx<Factory>,
923 work_rx: CrossbeamReceiver<AccountWorkerJob>,
924 worker_id: usize,
925 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
926 available_workers: Arc<AtomicUsize>,
927 cached_storage_roots: Arc<DashMap<B256, B256>>,
928 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
929 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
930 ) -> Self {
931 Self {
932 task_ctx,
933 work_rx,
934 worker_id,
935 storage_work_tx,
936 available_workers,
937 cached_storage_roots,
938 #[cfg(feature = "metrics")]
939 metrics,
940 #[cfg(feature = "metrics")]
941 cursor_metrics,
942 }
943 }
944
945 fn run(mut self) -> ProviderResult<()> {
963 let provider = self.task_ctx.factory.database_provider_ro()?;
964
965 trace!(
966 target: "trie::proof_task",
967 worker_id=self.worker_id,
968 "Account worker started"
969 );
970
971 let mut account_proofs_processed = 0u64;
972 let mut account_nodes_processed = 0u64;
973 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
974
975 let account_trie_cursor = provider.account_trie_cursor()?;
978 let account_hashed_cursor = provider.hashed_account_cursor()?;
979
980 let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
981 let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
982
983 let mut v2_account_calculator = proof_v2::ProofCalculator::<
984 _,
985 _,
986 AsyncAccountValueEncoder<
987 <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
988 <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
989 >,
990 >::new(account_trie_cursor, account_hashed_cursor);
991 let v2_storage_calculator =
992 Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
993 storage_trie_cursor,
994 storage_hashed_cursor,
995 )));
996
997 self.available_workers.fetch_add(1, Ordering::Relaxed);
999
1000 let mut total_idle_time = Duration::ZERO;
1001 let mut idle_start = Instant::now();
1002 let mut value_encoder_stats_cache = ValueEncoderStats::default();
1003
1004 while let Ok(job) = self.work_rx.recv() {
1005 total_idle_time += idle_start.elapsed();
1006
1007 self.available_workers.fetch_sub(1, Ordering::Relaxed);
1009
1010 #[cfg(feature = "trie-debug")]
1011 if let Some(max_jitter) = self.task_ctx.proof_jitter {
1012 let jitter =
1013 Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
1014 trace!(
1015 target: "trie::proof_task",
1016 worker_id = self.worker_id,
1017 jitter_us = jitter.as_micros(),
1018 "Account worker applying proof jitter"
1019 );
1020 std::thread::sleep(jitter);
1021 }
1022
1023 match job {
1024 AccountWorkerJob::AccountMultiproof { input } => {
1025 let value_encoder_stats = self.process_account_multiproof::<Factory::Provider>(
1026 &mut v2_account_calculator,
1027 v2_storage_calculator.clone(),
1028 *input,
1029 &mut account_proofs_processed,
1030 &mut cursor_metrics_cache,
1031 );
1032 total_idle_time += value_encoder_stats.storage_wait_time;
1033 value_encoder_stats_cache.extend(&value_encoder_stats);
1034 }
1035
1036 AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1037 Self::process_blinded_node(
1038 self.worker_id,
1039 &provider,
1040 path,
1041 result_sender,
1042 &mut account_nodes_processed,
1043 );
1044 }
1045 }
1046
1047 self.available_workers.fetch_add(1, Ordering::Relaxed);
1049
1050 idle_start = Instant::now();
1051 }
1052
1053 trace!(
1054 target: "trie::proof_task",
1055 worker_id=self.worker_id,
1056 account_proofs_processed,
1057 account_nodes_processed,
1058 total_idle_time_us = total_idle_time.as_micros(),
1059 "Account worker shutting down"
1060 );
1061
1062 #[cfg(feature = "metrics")]
1063 {
1064 self.metrics.record_account_nodes(account_nodes_processed as usize);
1065 self.metrics.record_account_worker_idle_time(total_idle_time);
1066 self.cursor_metrics.record(&mut cursor_metrics_cache);
1067 self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
1068 }
1069
1070 Ok(())
1071 }
1072
1073 fn compute_v2_account_multiproof<'a, Provider>(
1074 &self,
1075 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1076 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1077 targets: MultiProofTargetsV2,
1078 ) -> Result<(DecodedMultiProofV2, ValueEncoderStats), ParallelStateRootError>
1079 where
1080 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1081 {
1082 let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1083
1084 let span = debug_span!(
1085 target: "trie::proof_task",
1086 "Account V2 multiproof calculation",
1087 account_targets = account_targets.len(),
1088 storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1089 );
1090 let _span_guard = span.enter();
1091
1092 trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1093
1094 let storage_proof_receivers =
1095 dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1096
1097 let mut value_encoder = AsyncAccountValueEncoder::new(
1098 storage_proof_receivers,
1099 self.cached_storage_roots.clone(),
1100 v2_storage_calculator,
1101 );
1102
1103 let account_proofs =
1104 v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
1105
1106 let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
1107
1108 let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
1109
1110 Ok((proof, value_encoder_stats))
1111 }
1112
1113 fn process_account_multiproof<'a, Provider>(
1117 &self,
1118 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1119 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1120 input: AccountMultiproofInput,
1121 account_proofs_processed: &mut u64,
1122 cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1123 ) -> ValueEncoderStats
1124 where
1125 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1126 {
1127 let proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
1128 let proof_start = Instant::now();
1129
1130 let AccountMultiproofInput { targets, proof_result_sender } = input;
1131 let (result, value_encoder_stats) = match self.compute_v2_account_multiproof::<Provider>(
1132 v2_account_calculator,
1133 v2_storage_calculator,
1134 targets,
1135 ) {
1136 Ok((proof, stats)) => (Ok(proof), stats),
1137 Err(e) => (Err(e), ValueEncoderStats::default()),
1138 };
1139
1140 let ProofResultContext { sender: result_tx, state, start_time: start } =
1141 proof_result_sender;
1142
1143 let proof_elapsed = proof_start.elapsed();
1144 let total_elapsed = start.elapsed();
1145 *account_proofs_processed += 1;
1146
1147 if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
1149 trace!(
1150 target: "trie::proof_task",
1151 worker_id=self.worker_id,
1152 account_proofs_processed,
1153 "Account multiproof receiver dropped, discarding result"
1154 );
1155 }
1156
1157 proof_cursor_metrics.record_spans();
1158
1159 trace!(
1160 target: "trie::proof_task",
1161 proof_time_us = proof_elapsed.as_micros(),
1162 total_elapsed_us = total_elapsed.as_micros(),
1163 total_processed = account_proofs_processed,
1164 account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1165 account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1166 storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1167 storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1168 account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1169 account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1170 storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1171 storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1172 "Account multiproof completed"
1173 );
1174
1175 #[cfg(feature = "metrics")]
1176 cursor_metrics_cache.extend(&proof_cursor_metrics);
1178
1179 value_encoder_stats
1180 }
1181
1182 fn process_blinded_node<Provider>(
1184 worker_id: usize,
1185 provider: &Provider,
1186 path: Nibbles,
1187 result_sender: Sender<TrieNodeProviderResult>,
1188 account_nodes_processed: &mut u64,
1189 ) where
1190 Provider: TrieCursorFactory + HashedCursorFactory,
1191 {
1192 let span = debug_span!(
1193 target: "trie::proof_task",
1194 "Blinded account node calculation",
1195 ?path,
1196 );
1197 let _span_guard = span.enter();
1198
1199 trace!(
1200 target: "trie::proof_task",
1201 "Processing blinded account node"
1202 );
1203
1204 let start = Instant::now();
1205 let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
1206 let result = account_node_provider.trie_node(&path);
1207 let elapsed = start.elapsed();
1208
1209 *account_nodes_processed += 1;
1210
1211 if result_sender.send(result).is_err() {
1212 trace!(
1213 target: "trie::proof_task",
1214 worker_id,
1215 ?path,
1216 account_nodes_processed,
1217 "Blinded account node receiver dropped, discarding result"
1218 );
1219 }
1220
1221 trace!(
1222 target: "trie::proof_task",
1223 node_time_us = elapsed.as_micros(),
1224 total_processed = account_nodes_processed,
1225 "Blinded account node completed"
1226 );
1227 }
1228}
1229
1230fn dispatch_v2_storage_proofs(
1238 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1239 account_targets: &[ProofV2Target],
1240 mut storage_targets: B256Map<Vec<ProofV2Target>>,
1241) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1242 let mut storage_proof_receivers =
1243 B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1244
1245 let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1247
1248 for (hashed_address, targets) in &mut storage_targets {
1251 if account_target_addresses.contains(hashed_address) &&
1252 let Some(first) = targets.first_mut()
1253 {
1254 *first = first.with_min_len(0);
1255 }
1256 }
1257
1258 let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1262 sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1263
1264 for (hashed_address, targets) in sorted_storage_targets {
1266 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1268 let input = StorageProofInput::new(hashed_address, targets);
1269
1270 storage_work_tx
1271 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1272 .map_err(|_| {
1273 ParallelStateRootError::Other(format!(
1274 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1275 ))
1276 })?;
1277
1278 storage_proof_receivers.insert(hashed_address, result_rx);
1279 }
1280
1281 Ok(storage_proof_receivers)
1282}
1283
1284#[derive(Debug)]
1286pub struct StorageProofInput {
1287 pub hashed_address: B256,
1289 pub targets: Vec<ProofV2Target>,
1291}
1292
1293impl StorageProofInput {
1294 pub const fn new(hashed_address: B256, targets: Vec<ProofV2Target>) -> Self {
1296 Self { hashed_address, targets }
1297 }
1298}
1299
1300#[derive(Debug)]
1302pub struct AccountMultiproofInput {
1303 pub targets: MultiProofTargetsV2,
1305 pub proof_result_sender: ProofResultContext,
1307}
1308
1309impl AccountMultiproofInput {
1310 fn into_proof_result_sender(self) -> ProofResultContext {
1312 self.proof_result_sender
1313 }
1314}
1315
1316#[derive(Debug)]
1318enum AccountWorkerJob {
1319 AccountMultiproof {
1321 input: Box<AccountMultiproofInput>,
1323 },
1324 BlindedAccountNode {
1326 path: Nibbles,
1328 result_sender: Sender<TrieNodeProviderResult>,
1330 },
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335 use super::*;
1336 use reth_provider::test_utils::create_test_provider_factory;
1337
1338 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1339 ProofTaskCtx::new(factory)
1340 }
1341
1342 #[test]
1344 fn spawn_proof_workers_creates_handle() {
1345 let provider_factory = create_test_provider_factory();
1346 let changeset_cache = reth_trie_db::ChangesetCache::new();
1347 let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1348 provider_factory,
1349 changeset_cache,
1350 );
1351 let ctx = test_ctx(factory);
1352
1353 let runtime = reth_tasks::Runtime::test();
1354 let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
1355
1356 let _cloned_handle = proof_handle.clone();
1358
1359 drop(proof_handle);
1361 }
1362}