1use crate::{
33 root::ParallelStateRootError,
34 value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
35};
36use alloy_primitives::{
37 map::{B256Map, B256Set},
38 B256, U256,
39};
40use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
41use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
42use reth_primitives_traits::{dashmap::DashMap, FastInstant as Instant};
43use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
44use reth_storage_errors::db::DatabaseError;
45use reth_tasks::Runtime;
46use reth_trie::{
47 hashed_cursor::{HashedCursorFactory, HashedStorageCursor, InstrumentedHashedCursor},
48 proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider},
49 proof_v2,
50 trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieStorageCursor},
51 DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, Nibbles, ProofTrieNodeV2,
52 ProofV2Target,
53};
54use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
55use std::{
56 cell::RefCell,
57 rc::Rc,
58 sync::{
59 atomic::{AtomicBool, AtomicUsize, Ordering},
60 mpsc::{channel, Receiver, Sender},
61 Arc,
62 },
63 time::Duration,
64};
65use tracing::{debug, debug_span, error, instrument, trace};
66
67#[cfg(feature = "metrics")]
68use crate::proof_task_metrics::{
69 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
70};
71
72type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
73
74type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
76 InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::AccountTrieCursor<'a>>,
77 InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::AccountCursor<'a>>,
78 AsyncAccountValueEncoder<
79 InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
80 InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
81 >,
82>;
83
84type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
86 InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
87 InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
88>;
89
90#[derive(Debug)]
94struct AvailabilitySheet {
95 flags: Vec<crossbeam_utils::CachePadded<AtomicBool>>,
98}
99
100impl AvailabilitySheet {
101 fn new(count: usize) -> Self {
103 let flags =
104 (0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect();
105 Self { flags }
106 }
107
108 fn has_multiple_idle(&self) -> bool {
113 let mut idle = 0u32;
114 for flag in &self.flags {
115 if flag.load(Ordering::Relaxed) {
116 idle += 1;
117 if idle > 1 {
118 return true;
119 }
120 }
121 }
122 false
123 }
124
125 fn mark_idle(&self, worker_id: usize) {
127 self.flags[worker_id].store(true, Ordering::Relaxed);
128 }
129
130 fn mark_busy(&self, worker_id: usize) {
132 self.flags[worker_id].store(false, Ordering::Relaxed);
133 }
134}
135
136#[derive(Debug, Clone)]
142pub struct ProofWorkerHandle {
143 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
145 account_work_tx: CrossbeamSender<AccountWorkerJob>,
147 storage_availability: Arc<AvailabilitySheet>,
150 account_availability: Arc<AvailabilitySheet>,
153 storage_worker_count: usize,
155 account_worker_count: usize,
157}
158
159impl ProofWorkerHandle {
160 #[instrument(
170 name = "ProofWorkerHandle::new",
171 level = "debug",
172 target = "trie::proof_task",
173 skip_all
174 )]
175 pub fn new<Factory>(
176 runtime: &Runtime,
177 task_ctx: ProofTaskCtx<Factory>,
178 halve_workers: bool,
179 ) -> Self
180 where
181 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
182 + Clone
183 + Send
184 + Sync
185 + 'static,
186 {
187 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
188 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
189
190 let cached_storage_roots = Arc::<DashMap<_, _>>::default();
191
192 let divisor = if halve_workers { 2 } else { 1 };
193 let storage_worker_count =
194 runtime.proof_storage_worker_pool().current_num_threads() / divisor;
195 let account_worker_count =
196 runtime.proof_account_worker_pool().current_num_threads() / divisor;
197
198 let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count));
199 let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count));
200
201 debug!(
202 target: "trie::proof_task",
203 storage_worker_count,
204 account_worker_count,
205 halve_workers,
206 "Spawning proof worker pools"
207 );
208
209 let storage_rt = runtime.clone();
212 let storage_task_ctx = task_ctx.clone();
213 let storage_avail = storage_availability.clone();
214 let storage_roots = cached_storage_roots.clone();
215 let storage_parent_span = tracing::Span::current();
216 runtime.spawn_blocking_named("storage-workers", move || {
217 let worker_id = AtomicUsize::new(0);
218 storage_rt.proof_storage_worker_pool().broadcast(storage_worker_count, |_| {
219 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
220 let span = debug_span!(target: "trie::proof_task", parent: storage_parent_span.clone(), "storage_worker", ?worker_id);
221 let _guard = span.enter();
222
223 #[cfg(feature = "metrics")]
224 let metrics = ProofTaskTrieMetrics::default();
225 #[cfg(feature = "metrics")]
226 let cursor_metrics = ProofTaskCursorMetrics::new();
227
228 let worker = StorageProofWorker::new(
229 storage_task_ctx.clone(),
230 storage_work_rx.clone(),
231 worker_id,
232 storage_avail.clone(),
233 storage_roots.clone(),
234 #[cfg(feature = "metrics")]
235 metrics,
236 #[cfg(feature = "metrics")]
237 cursor_metrics,
238 );
239 if let Err(error) = worker.run() {
240 error!(
241 target: "trie::proof_task",
242 worker_id,
243 ?error,
244 "Storage worker failed"
245 );
246 }
247 });
248 });
249
250 let account_rt = runtime.clone();
251 let account_tx = storage_work_tx.clone();
252 let account_avail = account_availability.clone();
253 let account_parent_span = tracing::Span::current();
254 runtime.spawn_blocking_named("account-workers", move || {
255 let worker_id = AtomicUsize::new(0);
256 account_rt.proof_account_worker_pool().broadcast(account_worker_count, |_| {
257 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
258 let span = debug_span!(target: "trie::proof_task", parent: account_parent_span.clone(), "account_worker", ?worker_id);
259 let _guard = span.enter();
260
261 #[cfg(feature = "metrics")]
262 let metrics = ProofTaskTrieMetrics::default();
263 #[cfg(feature = "metrics")]
264 let cursor_metrics = ProofTaskCursorMetrics::new();
265
266 let worker = AccountProofWorker::new(
267 task_ctx.clone(),
268 account_work_rx.clone(),
269 worker_id,
270 account_tx.clone(),
271 account_avail.clone(),
272 cached_storage_roots.clone(),
273 #[cfg(feature = "metrics")]
274 metrics,
275 #[cfg(feature = "metrics")]
276 cursor_metrics,
277 );
278 if let Err(error) = worker.run() {
279 error!(
280 target: "trie::proof_task",
281 worker_id,
282 ?error,
283 "Account worker failed"
284 );
285 }
286 });
287 });
288
289 Self {
290 storage_work_tx,
291 account_work_tx,
292 storage_availability,
293 account_availability,
294 storage_worker_count,
295 account_worker_count,
296 }
297 }
298
299 pub fn has_multiple_idle_storage_workers(&self) -> bool {
301 self.storage_availability.has_multiple_idle()
302 }
303
304 pub fn has_multiple_idle_account_workers(&self) -> bool {
306 self.account_availability.has_multiple_idle()
307 }
308
309 pub fn pending_storage_tasks(&self) -> usize {
311 self.storage_work_tx.len()
312 }
313
314 pub fn pending_account_tasks(&self) -> usize {
316 self.account_work_tx.len()
317 }
318
319 pub const fn total_storage_workers(&self) -> usize {
321 self.storage_worker_count
322 }
323
324 pub const fn total_account_workers(&self) -> usize {
326 self.account_worker_count
327 }
328
329 pub fn dispatch_storage_proof(
333 &self,
334 input: StorageProofInput,
335 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
336 ) -> Result<(), ProviderError> {
337 let hashed_address = input.hashed_address;
338 self.storage_work_tx
339 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
340 .map_err(|err| {
341 if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
342 let _ = proof_result_sender.send(StorageProofResultMessage {
343 hashed_address,
344 result: Err(DatabaseError::Other(
345 "storage workers unavailable".to_string(),
346 )
347 .into()),
348 });
349 }
350
351 ProviderError::other(std::io::Error::other("storage workers unavailable"))
352 })
353 }
354
355 pub fn dispatch_account_multiproof(
359 &self,
360 input: AccountMultiproofInput,
361 ) -> Result<(), ProviderError> {
362 self.account_work_tx
363 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
364 .map_err(|err| {
365 let error =
366 ProviderError::other(std::io::Error::other("account workers unavailable"));
367
368 if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
369 let ProofResultContext { sender: result_tx, state, start_time: start } =
370 input.into_proof_result_sender();
371
372 let _ = result_tx.send(ProofResultMessage {
373 result: Err(ParallelStateRootError::Provider(error.clone())),
374 elapsed: start.elapsed(),
375 state,
376 });
377 }
378
379 error
380 })
381 }
382
383 pub(crate) fn dispatch_blinded_storage_node(
385 &self,
386 account: B256,
387 path: Nibbles,
388 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
389 let (tx, rx) = channel();
390 self.storage_work_tx
391 .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
392 .map_err(|_| {
393 ProviderError::other(std::io::Error::other("storage workers unavailable"))
394 })?;
395
396 Ok(rx)
397 }
398
399 pub(crate) fn dispatch_blinded_account_node(
401 &self,
402 path: Nibbles,
403 ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
404 let (tx, rx) = channel();
405 self.account_work_tx
406 .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
407 .map_err(|_| {
408 ProviderError::other(std::io::Error::other("account workers unavailable"))
409 })?;
410
411 Ok(rx)
412 }
413}
414
415#[derive(Clone, Debug)]
417pub struct ProofTaskCtx<Factory> {
418 factory: Factory,
420 #[cfg(feature = "trie-debug")]
422 proof_jitter: Option<Duration>,
423}
424
425impl<Factory> ProofTaskCtx<Factory> {
426 pub const fn new(factory: Factory) -> Self {
428 Self {
429 factory,
430 #[cfg(feature = "trie-debug")]
431 proof_jitter: None,
432 }
433 }
434
435 #[cfg(feature = "trie-debug")]
437 pub const fn with_proof_jitter(mut self, jitter: Option<Duration>) -> Self {
438 self.proof_jitter = jitter;
439 self
440 }
441}
442
443#[derive(Debug)]
445pub struct ProofTaskTx<Provider> {
446 provider: Provider,
448
449 id: usize,
451}
452
453impl<Provider> ProofTaskTx<Provider> {
454 const fn new(provider: Provider, id: usize) -> Self {
456 Self { provider, id }
457 }
458}
459
460impl<Provider> ProofTaskTx<Provider>
461where
462 Provider: TrieCursorFactory + HashedCursorFactory,
463{
464 fn compute_v2_storage_proof<TC, HC>(
465 &self,
466 input: StorageProofInput,
467 calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
468 ) -> Result<StorageProofResult, StateProofError>
469 where
470 TC: TrieStorageCursor,
471 HC: HashedStorageCursor<Value = U256>,
472 {
473 let StorageProofInput { hashed_address, mut targets } = input;
474
475 let span = debug_span!(
476 target: "trie::proof_task",
477 "V2 Storage proof calculation",
478 n = %targets.len(),
479 );
480 let _span_guard = span.enter();
481
482 let proof_start = Instant::now();
483
484 let proof = if targets.is_empty() {
486 let root_node = calculator.storage_root_node(hashed_address)?;
487 vec![root_node]
488 } else {
489 calculator.storage_proof(hashed_address, &mut targets)?
490 };
491
492 let root = calculator.compute_root_hash(&proof)?;
493
494 trace!(
495 target: "trie::proof_task",
496 hashed_address = ?hashed_address,
497 proof_time_us = proof_start.elapsed().as_micros(),
498 ?root,
499 worker_id = self.id,
500 "Completed V2 storage proof calculation"
501 );
502
503 Ok(StorageProofResult { proof, root })
504 }
505
506 fn process_blinded_storage_node(
510 &self,
511 account: B256,
512 path: &Nibbles,
513 ) -> TrieNodeProviderResult {
514 let storage_node_provider =
515 ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
516 storage_node_provider.trie_node(path)
517 }
518}
519impl TrieNodeProviderFactory for ProofWorkerHandle {
520 type AccountNodeProvider = ProofTaskTrieNodeProvider;
521 type StorageNodeProvider = ProofTaskTrieNodeProvider;
522
523 fn account_node_provider(&self) -> Self::AccountNodeProvider {
524 ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
525 }
526
527 fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
528 ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
529 }
530}
531
532#[derive(Debug)]
534pub enum ProofTaskTrieNodeProvider {
535 AccountNode {
537 handle: ProofWorkerHandle,
539 },
540 StorageNode {
542 account: B256,
544 handle: ProofWorkerHandle,
546 },
547}
548
549impl TrieNodeProvider for ProofTaskTrieNodeProvider {
550 fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
551 match self {
552 Self::AccountNode { handle } => {
553 let rx = handle
554 .dispatch_blinded_account_node(*path)
555 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
556 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
557 }
558 Self::StorageNode { handle, account } => {
559 let rx = handle
560 .dispatch_blinded_storage_node(*account, *path)
561 .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
562 rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
563 }
564 }
565 }
566}
567
568pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
573
574#[derive(Debug)]
580pub struct ProofResultMessage {
581 pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
583 pub elapsed: Duration,
585 pub state: HashedPostState,
587}
588
589#[derive(Debug, Clone)]
594pub struct ProofResultContext {
595 pub sender: ProofResultSender,
597 pub state: HashedPostState,
599 pub start_time: Instant,
601}
602
603impl ProofResultContext {
604 pub const fn new(
606 sender: ProofResultSender,
607 state: HashedPostState,
608 start_time: Instant,
609 ) -> Self {
610 Self { sender, state, start_time }
611 }
612}
613
614#[derive(Debug)]
616pub(crate) struct StorageProofResult {
617 pub proof: Vec<ProofTrieNodeV2>,
619 pub root: Option<B256>,
621}
622
623impl StorageProofResult {
624 const fn root(&self) -> Option<B256> {
626 self.root
627 }
628}
629
630#[derive(Debug)]
632pub struct StorageProofResultMessage {
633 #[allow(dead_code)]
635 pub(crate) hashed_address: B256,
636 pub(crate) result: Result<StorageProofResult, StateProofError>,
638}
639
640#[derive(Debug)]
642pub(crate) enum StorageWorkerJob {
643 StorageProof {
645 input: StorageProofInput,
647 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
649 },
650 BlindedStorageNode {
652 account: B256,
654 path: Nibbles,
656 result_sender: Sender<TrieNodeProviderResult>,
658 },
659}
660
661struct StorageProofWorker<Factory> {
666 task_ctx: ProofTaskCtx<Factory>,
668 work_rx: CrossbeamReceiver<StorageWorkerJob>,
670 worker_id: usize,
672 availability: Arc<AvailabilitySheet>,
674 cached_storage_roots: Arc<DashMap<B256, B256>>,
676 #[cfg(feature = "metrics")]
678 metrics: ProofTaskTrieMetrics,
679 #[cfg(feature = "metrics")]
681 cursor_metrics: ProofTaskCursorMetrics,
682}
683
684impl<Factory> StorageProofWorker<Factory>
685where
686 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
687{
688 const fn new(
690 task_ctx: ProofTaskCtx<Factory>,
691 work_rx: CrossbeamReceiver<StorageWorkerJob>,
692 worker_id: usize,
693 availability: Arc<AvailabilitySheet>,
694 cached_storage_roots: Arc<DashMap<B256, B256>>,
695 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
696 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
697 ) -> Self {
698 Self {
699 task_ctx,
700 work_rx,
701 worker_id,
702 availability,
703 cached_storage_roots,
704 #[cfg(feature = "metrics")]
705 metrics,
706 #[cfg(feature = "metrics")]
707 cursor_metrics,
708 }
709 }
710
711 fn run(mut self) -> ProviderResult<()> {
729 let provider = self.task_ctx.factory.database_provider_ro()?;
731 let proof_tx = ProofTaskTx::new(provider, self.worker_id);
732
733 trace!(
734 target: "trie::proof_task",
735 worker_id = self.worker_id,
736 "Storage worker started"
737 );
738
739 let mut storage_proofs_processed = 0u64;
740 let mut storage_nodes_processed = 0u64;
741 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
742 let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
743 let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
744 let instrumented_trie_cursor =
745 InstrumentedTrieCursor::new(trie_cursor, &mut cursor_metrics_cache.storage_trie_cursor);
746 let instrumented_hashed_cursor = InstrumentedHashedCursor::new(
747 hashed_cursor,
748 &mut cursor_metrics_cache.storage_hashed_cursor,
749 );
750 let mut v2_calculator = proof_v2::StorageProofCalculator::new_storage(
751 instrumented_trie_cursor,
752 instrumented_hashed_cursor,
753 );
754
755 self.availability.mark_idle(self.worker_id);
757
758 let mut total_idle_time = Duration::ZERO;
759 let mut idle_start = Instant::now();
760
761 while let Ok(job) = self.work_rx.recv() {
762 total_idle_time += idle_start.elapsed();
763
764 self.availability.mark_busy(self.worker_id);
766
767 #[cfg(feature = "trie-debug")]
768 if let Some(max_jitter) = self.task_ctx.proof_jitter {
769 let jitter =
770 Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
771 trace!(
772 target: "trie::proof_task",
773 worker_id = self.worker_id,
774 jitter_us = jitter.as_micros(),
775 "Storage worker applying proof jitter"
776 );
777 std::thread::sleep(jitter);
778 }
779
780 match job {
781 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
782 self.process_storage_proof(
783 &proof_tx,
784 &mut v2_calculator,
785 input,
786 proof_result_sender,
787 &mut storage_proofs_processed,
788 );
789 }
790
791 StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
792 Self::process_blinded_node(
793 self.worker_id,
794 &proof_tx,
795 account,
796 path,
797 result_sender,
798 &mut storage_nodes_processed,
799 );
800 }
801 }
802
803 self.availability.mark_idle(self.worker_id);
805
806 idle_start = Instant::now();
807 }
808
809 drop(v2_calculator);
811
812 trace!(
813 target: "trie::proof_task",
814 worker_id = self.worker_id,
815 storage_proofs_processed,
816 storage_nodes_processed,
817 total_idle_time_us = total_idle_time.as_micros(),
818 "Storage worker shutting down"
819 );
820
821 #[cfg(feature = "metrics")]
822 {
823 self.metrics.record_storage_nodes(storage_nodes_processed as usize);
824 self.metrics.record_storage_worker_idle_time(total_idle_time);
825 self.cursor_metrics.record(&mut cursor_metrics_cache);
826 }
827
828 Ok(())
829 }
830
831 fn process_storage_proof<Provider, TC, HC>(
833 &self,
834 proof_tx: &ProofTaskTx<Provider>,
835 v2_calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
836 input: StorageProofInput,
837 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
838 storage_proofs_processed: &mut u64,
839 ) where
840 Provider: TrieCursorFactory + HashedCursorFactory,
841 TC: TrieStorageCursor,
842 HC: HashedStorageCursor<Value = U256>,
843 {
844 let hashed_address = input.hashed_address;
845 let proof_start = Instant::now();
846
847 trace!(
848 target: "trie::proof_task",
849 worker_id = self.worker_id,
850 hashed_address = ?hashed_address,
851 targets_len = input.targets.len(),
852 "Processing V2 storage proof"
853 );
854
855 let result = proof_tx.compute_v2_storage_proof(input, v2_calculator);
856
857 let proof_elapsed = proof_start.elapsed();
858 *storage_proofs_processed += 1;
859
860 let root = result.as_ref().ok().and_then(|result| result.root());
861
862 if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
863 trace!(
864 target: "trie::proof_task",
865 worker_id = self.worker_id,
866 hashed_address = ?hashed_address,
867 storage_proofs_processed,
868 "Proof result receiver dropped, discarding result"
869 );
870 }
871
872 if let Some(root) = root {
873 self.cached_storage_roots.insert(hashed_address, root);
874 }
875
876 trace!(
877 target: "trie::proof_task",
878 worker_id = self.worker_id,
879 hashed_address = ?hashed_address,
880 proof_time_us = proof_elapsed.as_micros(),
881 total_processed = storage_proofs_processed,
882 ?root,
883 "Storage proof completed"
884 );
885 }
886
887 fn process_blinded_node<Provider>(
889 worker_id: usize,
890 proof_tx: &ProofTaskTx<Provider>,
891 account: B256,
892 path: Nibbles,
893 result_sender: Sender<TrieNodeProviderResult>,
894 storage_nodes_processed: &mut u64,
895 ) where
896 Provider: TrieCursorFactory + HashedCursorFactory,
897 {
898 trace!(
899 target: "trie::proof_task",
900 worker_id,
901 ?account,
902 ?path,
903 "Processing blinded storage node"
904 );
905
906 let start = Instant::now();
907 let result = proof_tx.process_blinded_storage_node(account, &path);
908 let elapsed = start.elapsed();
909
910 *storage_nodes_processed += 1;
911
912 if result_sender.send(result).is_err() {
913 trace!(
914 target: "trie::proof_task",
915 worker_id,
916 ?account,
917 ?path,
918 storage_nodes_processed,
919 "Blinded storage node receiver dropped, discarding result"
920 );
921 }
922
923 trace!(
924 target: "trie::proof_task",
925 worker_id,
926 ?account,
927 ?path,
928 elapsed_us = elapsed.as_micros(),
929 total_processed = storage_nodes_processed,
930 "Blinded storage node completed"
931 );
932 }
933}
934
935struct AccountProofWorker<Factory> {
940 task_ctx: ProofTaskCtx<Factory>,
942 work_rx: CrossbeamReceiver<AccountWorkerJob>,
944 worker_id: usize,
946 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
948 availability: Arc<AvailabilitySheet>,
950 cached_storage_roots: Arc<DashMap<B256, B256>>,
952 #[cfg(feature = "metrics")]
954 metrics: ProofTaskTrieMetrics,
955 #[cfg(feature = "metrics")]
957 cursor_metrics: ProofTaskCursorMetrics,
958}
959
960impl<Factory> AccountProofWorker<Factory>
961where
962 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
963{
964 #[expect(clippy::too_many_arguments)]
966 const fn new(
967 task_ctx: ProofTaskCtx<Factory>,
968 work_rx: CrossbeamReceiver<AccountWorkerJob>,
969 worker_id: usize,
970 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
971 availability: Arc<AvailabilitySheet>,
972 cached_storage_roots: Arc<DashMap<B256, B256>>,
973 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
974 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
975 ) -> Self {
976 Self {
977 task_ctx,
978 work_rx,
979 worker_id,
980 storage_work_tx,
981 availability,
982 cached_storage_roots,
983 #[cfg(feature = "metrics")]
984 metrics,
985 #[cfg(feature = "metrics")]
986 cursor_metrics,
987 }
988 }
989
990 fn run(mut self) -> ProviderResult<()> {
1008 let provider = self.task_ctx.factory.database_provider_ro()?;
1009
1010 trace!(
1011 target: "trie::proof_task",
1012 worker_id=self.worker_id,
1013 "Account worker started"
1014 );
1015
1016 let mut account_proofs_processed = 0u64;
1017 let mut account_nodes_processed = 0u64;
1018 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1019
1020 let account_trie_cursor = provider.account_trie_cursor()?;
1023 let account_hashed_cursor = provider.hashed_account_cursor()?;
1024
1025 let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
1026 let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
1027
1028 let instrumented_account_trie_cursor = InstrumentedTrieCursor::new(
1029 account_trie_cursor,
1030 &mut cursor_metrics_cache.account_trie_cursor,
1031 );
1032 let instrumented_account_hashed_cursor = InstrumentedHashedCursor::new(
1033 account_hashed_cursor,
1034 &mut cursor_metrics_cache.account_hashed_cursor,
1035 );
1036 let instrumented_storage_trie_cursor = InstrumentedTrieCursor::new(
1037 storage_trie_cursor,
1038 &mut cursor_metrics_cache.storage_trie_cursor,
1039 );
1040 let instrumented_storage_hashed_cursor = InstrumentedHashedCursor::new(
1041 storage_hashed_cursor,
1042 &mut cursor_metrics_cache.storage_hashed_cursor,
1043 );
1044
1045 let mut v2_account_calculator =
1046 proof_v2::ProofCalculator::<
1047 _,
1048 _,
1049 AsyncAccountValueEncoder<
1050 InstrumentedTrieCursor<
1051 '_,
1052 <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
1053 >,
1054 InstrumentedHashedCursor<
1055 '_,
1056 <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
1057 >,
1058 >,
1059 >::new(instrumented_account_trie_cursor, instrumented_account_hashed_cursor);
1060 let v2_storage_calculator =
1061 Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
1062 instrumented_storage_trie_cursor,
1063 instrumented_storage_hashed_cursor,
1064 )));
1065
1066 self.availability.mark_idle(self.worker_id);
1068
1069 let mut total_idle_time = Duration::ZERO;
1070 let mut idle_start = Instant::now();
1071 let mut value_encoder_stats_cache = ValueEncoderStats::default();
1072
1073 while let Ok(job) = self.work_rx.recv() {
1074 total_idle_time += idle_start.elapsed();
1075
1076 self.availability.mark_busy(self.worker_id);
1078
1079 #[cfg(feature = "trie-debug")]
1080 if let Some(max_jitter) = self.task_ctx.proof_jitter {
1081 let jitter =
1082 Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
1083 trace!(
1084 target: "trie::proof_task",
1085 worker_id = self.worker_id,
1086 jitter_us = jitter.as_micros(),
1087 "Account worker applying proof jitter"
1088 );
1089 std::thread::sleep(jitter);
1090 }
1091
1092 match job {
1093 AccountWorkerJob::AccountMultiproof { input } => {
1094 let value_encoder_stats = self.process_account_multiproof::<Factory::Provider>(
1095 &mut v2_account_calculator,
1096 v2_storage_calculator.clone(),
1097 *input,
1098 &mut account_proofs_processed,
1099 );
1100 total_idle_time += value_encoder_stats.storage_wait_time;
1101 value_encoder_stats_cache.extend(&value_encoder_stats);
1102 }
1103
1104 AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1105 Self::process_blinded_node(
1106 self.worker_id,
1107 &provider,
1108 path,
1109 result_sender,
1110 &mut account_nodes_processed,
1111 );
1112 }
1113 }
1114
1115 self.availability.mark_idle(self.worker_id);
1117
1118 idle_start = Instant::now();
1119 }
1120
1121 drop(v2_account_calculator);
1123 drop(v2_storage_calculator);
1124
1125 trace!(
1126 target: "trie::proof_task",
1127 worker_id=self.worker_id,
1128 account_proofs_processed,
1129 account_nodes_processed,
1130 total_idle_time_us = total_idle_time.as_micros(),
1131 "Account worker shutting down"
1132 );
1133
1134 #[cfg(feature = "metrics")]
1135 {
1136 self.metrics.record_account_nodes(account_nodes_processed as usize);
1137 self.metrics.record_account_worker_idle_time(total_idle_time);
1138 self.cursor_metrics.record(&mut cursor_metrics_cache);
1139 self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
1140 }
1141
1142 Ok(())
1143 }
1144
1145 fn compute_v2_account_multiproof<'a, Provider>(
1146 &self,
1147 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1148 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1149 targets: MultiProofTargetsV2,
1150 ) -> Result<(DecodedMultiProofV2, ValueEncoderStats), ParallelStateRootError>
1151 where
1152 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1153 {
1154 let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1155
1156 let span = debug_span!(
1157 target: "trie::proof_task",
1158 "Account V2 multiproof calculation",
1159 account_targets = account_targets.len(),
1160 storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1161 );
1162 let _span_guard = span.enter();
1163
1164 trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1165
1166 let storage_proof_receivers =
1167 dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1168
1169 let mut value_encoder = AsyncAccountValueEncoder::new(
1170 storage_proof_receivers,
1171 self.cached_storage_roots.clone(),
1172 v2_storage_calculator,
1173 );
1174
1175 let account_proofs =
1176 v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
1177
1178 let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
1179
1180 let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
1181
1182 Ok((proof, value_encoder_stats))
1183 }
1184
1185 fn process_account_multiproof<'a, Provider>(
1189 &self,
1190 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1191 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1192 input: AccountMultiproofInput,
1193 account_proofs_processed: &mut u64,
1194 ) -> ValueEncoderStats
1195 where
1196 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1197 {
1198 let proof_start = Instant::now();
1199
1200 let AccountMultiproofInput { targets, proof_result_sender } = input;
1201 let (result, value_encoder_stats) = match self.compute_v2_account_multiproof::<Provider>(
1202 v2_account_calculator,
1203 v2_storage_calculator,
1204 targets,
1205 ) {
1206 Ok((proof, stats)) => (Ok(proof), stats),
1207 Err(e) => (Err(e), ValueEncoderStats::default()),
1208 };
1209
1210 let ProofResultContext { sender: result_tx, state, start_time: start } =
1211 proof_result_sender;
1212
1213 let proof_elapsed = proof_start.elapsed();
1214 let total_elapsed = start.elapsed();
1215 *account_proofs_processed += 1;
1216
1217 if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
1219 trace!(
1220 target: "trie::proof_task",
1221 worker_id=self.worker_id,
1222 account_proofs_processed,
1223 "Account multiproof receiver dropped, discarding result"
1224 );
1225 }
1226
1227 trace!(
1228 target: "trie::proof_task",
1229 proof_time_us = proof_elapsed.as_micros(),
1230 total_elapsed_us = total_elapsed.as_micros(),
1231 total_processed = account_proofs_processed,
1232 "Account multiproof completed"
1233 );
1234
1235 value_encoder_stats
1236 }
1237
1238 fn process_blinded_node<Provider>(
1240 worker_id: usize,
1241 provider: &Provider,
1242 path: Nibbles,
1243 result_sender: Sender<TrieNodeProviderResult>,
1244 account_nodes_processed: &mut u64,
1245 ) where
1246 Provider: TrieCursorFactory + HashedCursorFactory,
1247 {
1248 let span = debug_span!(
1249 target: "trie::proof_task",
1250 "Blinded account node calculation",
1251 ?path,
1252 );
1253 let _span_guard = span.enter();
1254
1255 trace!(
1256 target: "trie::proof_task",
1257 "Processing blinded account node"
1258 );
1259
1260 let start = Instant::now();
1261 let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
1262 let result = account_node_provider.trie_node(&path);
1263 let elapsed = start.elapsed();
1264
1265 *account_nodes_processed += 1;
1266
1267 if result_sender.send(result).is_err() {
1268 trace!(
1269 target: "trie::proof_task",
1270 worker_id,
1271 ?path,
1272 account_nodes_processed,
1273 "Blinded account node receiver dropped, discarding result"
1274 );
1275 }
1276
1277 trace!(
1278 target: "trie::proof_task",
1279 node_time_us = elapsed.as_micros(),
1280 total_processed = account_nodes_processed,
1281 "Blinded account node completed"
1282 );
1283 }
1284}
1285
1286fn dispatch_v2_storage_proofs(
1294 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1295 account_targets: &[ProofV2Target],
1296 mut storage_targets: B256Map<Vec<ProofV2Target>>,
1297) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1298 let mut storage_proof_receivers =
1299 B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1300
1301 let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1303
1304 for (hashed_address, targets) in &mut storage_targets {
1307 if account_target_addresses.contains(hashed_address) &&
1308 let Some(first) = targets.first_mut()
1309 {
1310 *first = first.with_min_len(0);
1311 }
1312 }
1313
1314 let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1318 sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1319
1320 for (hashed_address, targets) in sorted_storage_targets {
1322 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1324 let input = StorageProofInput::new(hashed_address, targets);
1325
1326 storage_work_tx
1327 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1328 .map_err(|_| {
1329 ParallelStateRootError::Other(format!(
1330 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1331 ))
1332 })?;
1333
1334 storage_proof_receivers.insert(hashed_address, result_rx);
1335 }
1336
1337 Ok(storage_proof_receivers)
1338}
1339
1340#[derive(Debug)]
1342pub struct StorageProofInput {
1343 pub hashed_address: B256,
1345 pub targets: Vec<ProofV2Target>,
1347}
1348
1349impl StorageProofInput {
1350 pub const fn new(hashed_address: B256, targets: Vec<ProofV2Target>) -> Self {
1352 Self { hashed_address, targets }
1353 }
1354}
1355
1356#[derive(Debug)]
1358pub struct AccountMultiproofInput {
1359 pub targets: MultiProofTargetsV2,
1361 pub proof_result_sender: ProofResultContext,
1363}
1364
1365impl AccountMultiproofInput {
1366 fn into_proof_result_sender(self) -> ProofResultContext {
1368 self.proof_result_sender
1369 }
1370}
1371
1372#[derive(Debug)]
1374enum AccountWorkerJob {
1375 AccountMultiproof {
1377 input: Box<AccountMultiproofInput>,
1379 },
1380 BlindedAccountNode {
1382 path: Nibbles,
1384 result_sender: Sender<TrieNodeProviderResult>,
1386 },
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391 use super::*;
1392 use reth_provider::test_utils::create_test_provider_factory;
1393
1394 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1395 ProofTaskCtx::new(factory)
1396 }
1397
1398 #[test]
1400 fn spawn_proof_workers_creates_handle() {
1401 let provider_factory = create_test_provider_factory();
1402 let changeset_cache = reth_trie_db::ChangesetCache::new();
1403 let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1404 provider_factory,
1405 changeset_cache,
1406 );
1407 let ctx = test_ctx(factory);
1408
1409 let runtime = reth_tasks::Runtime::test();
1410 let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
1411
1412 let _cloned_handle = proof_handle.clone();
1414
1415 drop(proof_handle);
1417 }
1418}