1use crate::{
33 root::ParallelStateRootError,
34 value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
35};
36use alloy_primitives::{
37 map::{B256Map, B256Set},
38 B256, U256,
39};
40use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
41use reth_execution_errors::StateProofError;
42use reth_primitives_traits::{dashmap::DashMap, FastInstant as Instant};
43use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
44use reth_storage_errors::db::DatabaseError;
45use reth_tasks::Runtime;
46use reth_trie::{
47 hashed_cursor::{HashedCursorFactory, HashedStorageCursor, InstrumentedHashedCursor},
48 proof_v2,
49 trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieStorageCursor},
50 DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, ProofTrieNodeV2, ProofV2Target,
51};
52use std::{
53 cell::RefCell,
54 rc::Rc,
55 sync::{
56 atomic::{AtomicBool, AtomicUsize, Ordering},
57 Arc,
58 },
59 time::Duration,
60};
61use tracing::{debug, debug_span, error, instrument, trace};
62
63#[cfg(feature = "metrics")]
64use crate::proof_task_metrics::{
65 ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
66};
67
68type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
70 InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::AccountTrieCursor<'a>>,
71 InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::AccountCursor<'a>>,
72 AsyncAccountValueEncoder<
73 InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
74 InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
75 >,
76>;
77
78type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
80 InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
81 InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
82>;
83
84#[derive(Debug)]
88struct AvailabilitySheet {
89 flags: Vec<crossbeam_utils::CachePadded<AtomicBool>>,
92}
93
94impl AvailabilitySheet {
95 fn new(count: usize) -> Self {
97 let flags =
98 (0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect();
99 Self { flags }
100 }
101
102 fn has_multiple_idle(&self) -> bool {
107 let mut idle = 0u32;
108 for flag in &self.flags {
109 if flag.load(Ordering::Relaxed) {
110 idle += 1;
111 if idle > 1 {
112 return true;
113 }
114 }
115 }
116 false
117 }
118
119 fn mark_idle(&self, worker_id: usize) {
121 self.flags[worker_id].store(true, Ordering::Relaxed);
122 }
123
124 fn mark_busy(&self, worker_id: usize) {
126 self.flags[worker_id].store(false, Ordering::Relaxed);
127 }
128}
129
130#[derive(Debug, Clone)]
136pub struct ProofWorkerHandle {
137 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
139 account_work_tx: CrossbeamSender<AccountWorkerJob>,
141 storage_availability: Arc<AvailabilitySheet>,
144 account_availability: Arc<AvailabilitySheet>,
147 storage_worker_count: usize,
149 account_worker_count: usize,
151}
152
153impl ProofWorkerHandle {
154 #[instrument(
164 name = "ProofWorkerHandle::new",
165 level = "debug",
166 target = "trie::proof_task",
167 skip_all
168 )]
169 pub fn new<Factory>(
170 runtime: &Runtime,
171 task_ctx: ProofTaskCtx<Factory>,
172 halve_workers: bool,
173 ) -> Self
174 where
175 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
176 + Clone
177 + Send
178 + Sync
179 + 'static,
180 {
181 let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
182 let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
183
184 let cached_storage_roots = Arc::<DashMap<_, _>>::default();
185
186 let divisor = if halve_workers { 2 } else { 1 };
187 let storage_worker_count =
188 runtime.proof_storage_worker_pool().current_num_threads() / divisor;
189 let account_worker_count =
190 runtime.proof_account_worker_pool().current_num_threads() / divisor;
191
192 let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count));
193 let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count));
194
195 debug!(
196 target: "trie::proof_task",
197 storage_worker_count,
198 account_worker_count,
199 halve_workers,
200 "Spawning proof worker pools"
201 );
202
203 let storage_rt = runtime.clone();
206 let storage_task_ctx = task_ctx.clone();
207 let storage_avail = storage_availability.clone();
208 let storage_roots = cached_storage_roots.clone();
209 let storage_parent_span = tracing::Span::current();
210 runtime.spawn_blocking_named("storage-workers", move || {
211 let worker_id = AtomicUsize::new(0);
212 storage_rt.proof_storage_worker_pool().broadcast(storage_worker_count, |_| {
213 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
214 let span = debug_span!(target: "trie::proof_task", parent: storage_parent_span.clone(), "storage_worker", ?worker_id);
215 let _guard = span.enter();
216
217 #[cfg(feature = "metrics")]
218 let metrics = ProofTaskTrieMetrics::default();
219 #[cfg(feature = "metrics")]
220 let cursor_metrics = ProofTaskCursorMetrics::new();
221
222 let worker = StorageProofWorker::new(
223 storage_task_ctx.clone(),
224 storage_work_rx.clone(),
225 worker_id,
226 storage_avail.clone(),
227 storage_roots.clone(),
228 #[cfg(feature = "metrics")]
229 metrics,
230 #[cfg(feature = "metrics")]
231 cursor_metrics,
232 );
233 if let Err(error) = worker.run() {
234 error!(
235 target: "trie::proof_task",
236 worker_id,
237 ?error,
238 "Storage worker failed"
239 );
240 }
241 });
242 });
243
244 let account_rt = runtime.clone();
245 let account_tx = storage_work_tx.clone();
246 let account_avail = account_availability.clone();
247 let account_parent_span = tracing::Span::current();
248 runtime.spawn_blocking_named("account-workers", move || {
249 let worker_id = AtomicUsize::new(0);
250 account_rt.proof_account_worker_pool().broadcast(account_worker_count, |_| {
251 let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
252 let span = debug_span!(target: "trie::proof_task", parent: account_parent_span.clone(), "account_worker", ?worker_id);
253 let _guard = span.enter();
254
255 #[cfg(feature = "metrics")]
256 let metrics = ProofTaskTrieMetrics::default();
257 #[cfg(feature = "metrics")]
258 let cursor_metrics = ProofTaskCursorMetrics::new();
259
260 let worker = AccountProofWorker::new(
261 task_ctx.clone(),
262 account_work_rx.clone(),
263 worker_id,
264 account_tx.clone(),
265 account_avail.clone(),
266 cached_storage_roots.clone(),
267 #[cfg(feature = "metrics")]
268 metrics,
269 #[cfg(feature = "metrics")]
270 cursor_metrics,
271 );
272 if let Err(error) = worker.run() {
273 error!(
274 target: "trie::proof_task",
275 worker_id,
276 ?error,
277 "Account worker failed"
278 );
279 }
280 });
281 });
282
283 Self {
284 storage_work_tx,
285 account_work_tx,
286 storage_availability,
287 account_availability,
288 storage_worker_count,
289 account_worker_count,
290 }
291 }
292
293 pub fn has_multiple_idle_storage_workers(&self) -> bool {
295 self.storage_availability.has_multiple_idle()
296 }
297
298 pub fn has_multiple_idle_account_workers(&self) -> bool {
300 self.account_availability.has_multiple_idle()
301 }
302
303 pub fn pending_storage_tasks(&self) -> usize {
305 self.storage_work_tx.len()
306 }
307
308 pub fn pending_account_tasks(&self) -> usize {
310 self.account_work_tx.len()
311 }
312
313 pub const fn total_storage_workers(&self) -> usize {
315 self.storage_worker_count
316 }
317
318 pub const fn total_account_workers(&self) -> usize {
320 self.account_worker_count
321 }
322
323 pub fn dispatch_storage_proof(
327 &self,
328 input: StorageProofInput,
329 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
330 ) -> Result<(), ProviderError> {
331 let hashed_address = input.hashed_address;
332 self.storage_work_tx
333 .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
334 .map_err(|err| {
335 let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0;
336 let _ = proof_result_sender.send(StorageProofResultMessage {
337 hashed_address,
338 result: Err(
339 DatabaseError::Other("storage workers unavailable".to_string()).into()
340 ),
341 });
342
343 ProviderError::other(std::io::Error::other("storage workers unavailable"))
344 })
345 }
346
347 pub fn dispatch_account_multiproof(
351 &self,
352 input: AccountMultiproofInput,
353 ) -> Result<(), ProviderError> {
354 self.account_work_tx
355 .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
356 .map_err(|err| {
357 let error =
358 ProviderError::other(std::io::Error::other("account workers unavailable"));
359
360 let AccountWorkerJob::AccountMultiproof { input } = err.0;
361 let ProofResultContext { sender: result_tx, state, start_time: start } =
362 input.into_proof_result_sender();
363
364 let _ = result_tx.send(ProofResultMessage {
365 result: Err(ParallelStateRootError::Provider(error.clone())),
366 elapsed: start.elapsed(),
367 state,
368 });
369
370 error
371 })
372 }
373}
374
375#[derive(Clone, Debug)]
377pub struct ProofTaskCtx<Factory> {
378 factory: Factory,
380 #[cfg(feature = "trie-debug")]
382 proof_jitter: Option<Duration>,
383}
384
385impl<Factory> ProofTaskCtx<Factory> {
386 pub const fn new(factory: Factory) -> Self {
388 Self {
389 factory,
390 #[cfg(feature = "trie-debug")]
391 proof_jitter: None,
392 }
393 }
394
395 #[cfg(feature = "trie-debug")]
397 pub const fn with_proof_jitter(mut self, jitter: Option<Duration>) -> Self {
398 self.proof_jitter = jitter;
399 self
400 }
401}
402
403#[derive(Debug)]
405pub struct ProofTaskTx<Provider> {
406 provider: Provider,
408
409 id: usize,
411}
412
413impl<Provider> ProofTaskTx<Provider> {
414 const fn new(provider: Provider, id: usize) -> Self {
416 Self { provider, id }
417 }
418}
419
420impl<Provider> ProofTaskTx<Provider>
421where
422 Provider: TrieCursorFactory + HashedCursorFactory,
423{
424 fn compute_v2_storage_proof<TC, HC>(
425 &self,
426 input: StorageProofInput,
427 calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
428 ) -> Result<StorageProofResult, StateProofError>
429 where
430 TC: TrieStorageCursor,
431 HC: HashedStorageCursor<Value = U256>,
432 {
433 let StorageProofInput { hashed_address, mut targets } = input;
434
435 let span = debug_span!(
436 target: "trie::proof_task",
437 "V2 Storage proof calculation",
438 n = %targets.len(),
439 );
440 let _span_guard = span.enter();
441
442 let proof_start = Instant::now();
443
444 let proof = if targets.is_empty() {
446 let root_node = calculator.storage_root_node(hashed_address)?;
447 vec![root_node]
448 } else {
449 calculator.storage_proof(hashed_address, &mut targets)?
450 };
451
452 let root = calculator.compute_root_hash(&proof)?;
453
454 trace!(
455 target: "trie::proof_task",
456 hashed_address = ?hashed_address,
457 proof_time_us = proof_start.elapsed().as_micros(),
458 ?root,
459 worker_id = self.id,
460 "Completed V2 storage proof calculation"
461 );
462
463 Ok(StorageProofResult { proof, root })
464 }
465}
466
467pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
472
473#[derive(Debug)]
479pub struct ProofResultMessage {
480 pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
482 pub elapsed: Duration,
484 pub state: HashedPostState,
486}
487
488#[derive(Debug, Clone)]
493pub struct ProofResultContext {
494 pub sender: ProofResultSender,
496 pub state: HashedPostState,
498 pub start_time: Instant,
500}
501
502impl ProofResultContext {
503 pub const fn new(
505 sender: ProofResultSender,
506 state: HashedPostState,
507 start_time: Instant,
508 ) -> Self {
509 Self { sender, state, start_time }
510 }
511}
512
513#[derive(Debug)]
515pub(crate) struct StorageProofResult {
516 pub proof: Vec<ProofTrieNodeV2>,
518 pub root: Option<B256>,
520}
521
522impl StorageProofResult {
523 const fn root(&self) -> Option<B256> {
525 self.root
526 }
527}
528
529#[derive(Debug)]
531pub struct StorageProofResultMessage {
532 #[allow(dead_code)]
534 pub(crate) hashed_address: B256,
535 pub(crate) result: Result<StorageProofResult, StateProofError>,
537}
538
539#[derive(Debug)]
541pub(crate) enum StorageWorkerJob {
542 StorageProof {
544 input: StorageProofInput,
546 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
548 },
549}
550
551struct StorageProofWorker<Factory> {
556 task_ctx: ProofTaskCtx<Factory>,
558 work_rx: CrossbeamReceiver<StorageWorkerJob>,
560 worker_id: usize,
562 availability: Arc<AvailabilitySheet>,
564 cached_storage_roots: Arc<DashMap<B256, B256>>,
566 #[cfg(feature = "metrics")]
568 metrics: ProofTaskTrieMetrics,
569 #[cfg(feature = "metrics")]
571 cursor_metrics: ProofTaskCursorMetrics,
572}
573
574impl<Factory> StorageProofWorker<Factory>
575where
576 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
577{
578 const fn new(
580 task_ctx: ProofTaskCtx<Factory>,
581 work_rx: CrossbeamReceiver<StorageWorkerJob>,
582 worker_id: usize,
583 availability: Arc<AvailabilitySheet>,
584 cached_storage_roots: Arc<DashMap<B256, B256>>,
585 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
586 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
587 ) -> Self {
588 Self {
589 task_ctx,
590 work_rx,
591 worker_id,
592 availability,
593 cached_storage_roots,
594 #[cfg(feature = "metrics")]
595 metrics,
596 #[cfg(feature = "metrics")]
597 cursor_metrics,
598 }
599 }
600
601 fn run(mut self) -> ProviderResult<()> {
619 let provider = self.task_ctx.factory.database_provider_ro()?;
621 let proof_tx = ProofTaskTx::new(provider, self.worker_id);
622
623 trace!(
624 target: "trie::proof_task",
625 worker_id = self.worker_id,
626 "Storage worker started"
627 );
628
629 let mut storage_proofs_processed = 0u64;
630 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
631 let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
632 let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
633 let instrumented_trie_cursor =
634 InstrumentedTrieCursor::new(trie_cursor, &mut cursor_metrics_cache.storage_trie_cursor);
635 let instrumented_hashed_cursor = InstrumentedHashedCursor::new(
636 hashed_cursor,
637 &mut cursor_metrics_cache.storage_hashed_cursor,
638 );
639 let mut v2_calculator = proof_v2::StorageProofCalculator::new_storage(
640 instrumented_trie_cursor,
641 instrumented_hashed_cursor,
642 );
643
644 self.availability.mark_idle(self.worker_id);
646
647 let mut total_idle_time = Duration::ZERO;
648 let mut idle_start = Instant::now();
649
650 while let Ok(job) = self.work_rx.recv() {
651 total_idle_time += idle_start.elapsed();
652
653 self.availability.mark_busy(self.worker_id);
655
656 #[cfg(feature = "trie-debug")]
657 if let Some(max_jitter) = self.task_ctx.proof_jitter {
658 let jitter =
659 Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
660 trace!(
661 target: "trie::proof_task",
662 worker_id = self.worker_id,
663 jitter_us = jitter.as_micros(),
664 "Storage worker applying proof jitter"
665 );
666 std::thread::sleep(jitter);
667 }
668
669 match job {
670 StorageWorkerJob::StorageProof { input, proof_result_sender } => {
671 self.process_storage_proof(
672 &proof_tx,
673 &mut v2_calculator,
674 input,
675 proof_result_sender,
676 &mut storage_proofs_processed,
677 );
678 }
679 }
680
681 self.availability.mark_idle(self.worker_id);
683
684 idle_start = Instant::now();
685 }
686
687 drop(v2_calculator);
689
690 trace!(
691 target: "trie::proof_task",
692 worker_id = self.worker_id,
693 storage_proofs_processed,
694 total_idle_time_us = total_idle_time.as_micros(),
695 "Storage worker shutting down"
696 );
697
698 #[cfg(feature = "metrics")]
699 {
700 self.metrics.record_storage_worker_idle_time(total_idle_time);
701 self.cursor_metrics.record(&mut cursor_metrics_cache);
702 }
703
704 Ok(())
705 }
706
707 fn process_storage_proof<Provider, TC, HC>(
709 &self,
710 proof_tx: &ProofTaskTx<Provider>,
711 v2_calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
712 input: StorageProofInput,
713 proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
714 storage_proofs_processed: &mut u64,
715 ) where
716 Provider: TrieCursorFactory + HashedCursorFactory,
717 TC: TrieStorageCursor,
718 HC: HashedStorageCursor<Value = U256>,
719 {
720 let hashed_address = input.hashed_address;
721 let proof_start = Instant::now();
722
723 trace!(
724 target: "trie::proof_task",
725 worker_id = self.worker_id,
726 hashed_address = ?hashed_address,
727 targets_len = input.targets.len(),
728 "Processing V2 storage proof"
729 );
730
731 let result = proof_tx.compute_v2_storage_proof(input, v2_calculator);
732
733 let proof_elapsed = proof_start.elapsed();
734 *storage_proofs_processed += 1;
735
736 let root = result.as_ref().ok().and_then(|result| result.root());
737
738 if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
739 trace!(
740 target: "trie::proof_task",
741 worker_id = self.worker_id,
742 hashed_address = ?hashed_address,
743 storage_proofs_processed,
744 "Proof result receiver dropped, discarding result"
745 );
746 }
747
748 if let Some(root) = root {
749 self.cached_storage_roots.insert(hashed_address, root);
750 }
751
752 trace!(
753 target: "trie::proof_task",
754 worker_id = self.worker_id,
755 hashed_address = ?hashed_address,
756 proof_time_us = proof_elapsed.as_micros(),
757 total_processed = storage_proofs_processed,
758 ?root,
759 "Storage proof completed"
760 );
761 }
762}
763
764struct AccountProofWorker<Factory> {
769 task_ctx: ProofTaskCtx<Factory>,
771 work_rx: CrossbeamReceiver<AccountWorkerJob>,
773 worker_id: usize,
775 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
777 availability: Arc<AvailabilitySheet>,
779 cached_storage_roots: Arc<DashMap<B256, B256>>,
781 #[cfg(feature = "metrics")]
783 metrics: ProofTaskTrieMetrics,
784 #[cfg(feature = "metrics")]
786 cursor_metrics: ProofTaskCursorMetrics,
787}
788
789impl<Factory> AccountProofWorker<Factory>
790where
791 Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
792{
793 #[expect(clippy::too_many_arguments)]
795 const fn new(
796 task_ctx: ProofTaskCtx<Factory>,
797 work_rx: CrossbeamReceiver<AccountWorkerJob>,
798 worker_id: usize,
799 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
800 availability: Arc<AvailabilitySheet>,
801 cached_storage_roots: Arc<DashMap<B256, B256>>,
802 #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
803 #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
804 ) -> Self {
805 Self {
806 task_ctx,
807 work_rx,
808 worker_id,
809 storage_work_tx,
810 availability,
811 cached_storage_roots,
812 #[cfg(feature = "metrics")]
813 metrics,
814 #[cfg(feature = "metrics")]
815 cursor_metrics,
816 }
817 }
818
819 fn run(mut self) -> ProviderResult<()> {
837 let provider = self.task_ctx.factory.database_provider_ro()?;
838
839 trace!(
840 target: "trie::proof_task",
841 worker_id=self.worker_id,
842 "Account worker started"
843 );
844
845 let mut account_proofs_processed = 0u64;
846 let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
847
848 let account_trie_cursor = provider.account_trie_cursor()?;
851 let account_hashed_cursor = provider.hashed_account_cursor()?;
852
853 let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
854 let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
855
856 let instrumented_account_trie_cursor = InstrumentedTrieCursor::new(
857 account_trie_cursor,
858 &mut cursor_metrics_cache.account_trie_cursor,
859 );
860 let instrumented_account_hashed_cursor = InstrumentedHashedCursor::new(
861 account_hashed_cursor,
862 &mut cursor_metrics_cache.account_hashed_cursor,
863 );
864 let instrumented_storage_trie_cursor = InstrumentedTrieCursor::new(
865 storage_trie_cursor,
866 &mut cursor_metrics_cache.storage_trie_cursor,
867 );
868 let instrumented_storage_hashed_cursor = InstrumentedHashedCursor::new(
869 storage_hashed_cursor,
870 &mut cursor_metrics_cache.storage_hashed_cursor,
871 );
872
873 let mut v2_account_calculator =
874 proof_v2::ProofCalculator::<
875 _,
876 _,
877 AsyncAccountValueEncoder<
878 InstrumentedTrieCursor<
879 '_,
880 <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
881 >,
882 InstrumentedHashedCursor<
883 '_,
884 <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
885 >,
886 >,
887 >::new(instrumented_account_trie_cursor, instrumented_account_hashed_cursor);
888 let v2_storage_calculator =
889 Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
890 instrumented_storage_trie_cursor,
891 instrumented_storage_hashed_cursor,
892 )));
893
894 self.availability.mark_idle(self.worker_id);
896
897 let mut total_idle_time = Duration::ZERO;
898 let mut idle_start = Instant::now();
899 let mut value_encoder_stats_cache = ValueEncoderStats::default();
900
901 while let Ok(job) = self.work_rx.recv() {
902 total_idle_time += idle_start.elapsed();
903
904 self.availability.mark_busy(self.worker_id);
906
907 #[cfg(feature = "trie-debug")]
908 if let Some(max_jitter) = self.task_ctx.proof_jitter {
909 let jitter =
910 Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
911 trace!(
912 target: "trie::proof_task",
913 worker_id = self.worker_id,
914 jitter_us = jitter.as_micros(),
915 "Account worker applying proof jitter"
916 );
917 std::thread::sleep(jitter);
918 }
919
920 match job {
921 AccountWorkerJob::AccountMultiproof { input } => {
922 let value_encoder_stats = self.process_account_multiproof::<Factory::Provider>(
923 &mut v2_account_calculator,
924 v2_storage_calculator.clone(),
925 *input,
926 &mut account_proofs_processed,
927 );
928 total_idle_time += value_encoder_stats.storage_wait_time;
929 value_encoder_stats_cache.extend(&value_encoder_stats);
930 }
931 }
932
933 self.availability.mark_idle(self.worker_id);
935
936 idle_start = Instant::now();
937 }
938
939 drop(v2_account_calculator);
941 drop(v2_storage_calculator);
942
943 trace!(
944 target: "trie::proof_task",
945 worker_id=self.worker_id,
946 account_proofs_processed,
947 total_idle_time_us = total_idle_time.as_micros(),
948 "Account worker shutting down"
949 );
950
951 #[cfg(feature = "metrics")]
952 {
953 self.metrics.record_account_worker_idle_time(total_idle_time);
954 self.cursor_metrics.record(&mut cursor_metrics_cache);
955 self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
956 }
957
958 Ok(())
959 }
960
961 fn compute_v2_account_multiproof<'a, Provider>(
962 &self,
963 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
964 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
965 targets: MultiProofTargetsV2,
966 ) -> Result<(DecodedMultiProofV2, ValueEncoderStats), ParallelStateRootError>
967 where
968 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
969 {
970 let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
971
972 let span = debug_span!(
973 target: "trie::proof_task",
974 "Account V2 multiproof calculation",
975 account_targets = account_targets.len(),
976 storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
977 );
978 let _span_guard = span.enter();
979
980 trace!(target: "trie::proof_task", "Processing V2 account multiproof");
981
982 let storage_proof_receivers =
983 dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
984
985 let mut value_encoder = AsyncAccountValueEncoder::new(
986 storage_proof_receivers,
987 self.cached_storage_roots.clone(),
988 v2_storage_calculator,
989 );
990
991 let account_proofs =
992 v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
993
994 let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
995
996 let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
997
998 Ok((proof, value_encoder_stats))
999 }
1000
1001 fn process_account_multiproof<'a, Provider>(
1005 &self,
1006 v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1007 v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1008 input: AccountMultiproofInput,
1009 account_proofs_processed: &mut u64,
1010 ) -> ValueEncoderStats
1011 where
1012 Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1013 {
1014 let proof_start = Instant::now();
1015
1016 let AccountMultiproofInput { targets, proof_result_sender } = input;
1017 let (result, value_encoder_stats) = match self.compute_v2_account_multiproof::<Provider>(
1018 v2_account_calculator,
1019 v2_storage_calculator,
1020 targets,
1021 ) {
1022 Ok((proof, stats)) => (Ok(proof), stats),
1023 Err(e) => (Err(e), ValueEncoderStats::default()),
1024 };
1025
1026 let ProofResultContext { sender: result_tx, state, start_time: start } =
1027 proof_result_sender;
1028
1029 let proof_elapsed = proof_start.elapsed();
1030 let total_elapsed = start.elapsed();
1031 *account_proofs_processed += 1;
1032
1033 if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
1035 trace!(
1036 target: "trie::proof_task",
1037 worker_id=self.worker_id,
1038 account_proofs_processed,
1039 "Account multiproof receiver dropped, discarding result"
1040 );
1041 }
1042
1043 trace!(
1044 target: "trie::proof_task",
1045 proof_time_us = proof_elapsed.as_micros(),
1046 total_elapsed_us = total_elapsed.as_micros(),
1047 total_processed = account_proofs_processed,
1048 "Account multiproof completed"
1049 );
1050
1051 value_encoder_stats
1052 }
1053}
1054
1055fn dispatch_v2_storage_proofs(
1063 storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1064 account_targets: &[ProofV2Target],
1065 mut storage_targets: B256Map<Vec<ProofV2Target>>,
1066) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1067 let mut storage_proof_receivers =
1068 B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1069
1070 let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1072
1073 for (hashed_address, targets) in &mut storage_targets {
1076 if account_target_addresses.contains(hashed_address) &&
1077 let Some(first) = targets.first_mut()
1078 {
1079 *first = first.with_min_len(0);
1080 }
1081 }
1082
1083 let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1087 sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1088
1089 for (hashed_address, targets) in sorted_storage_targets {
1091 let (result_tx, result_rx) = crossbeam_channel::unbounded();
1093 let input = StorageProofInput::new(hashed_address, targets);
1094
1095 storage_work_tx
1096 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1097 .map_err(|_| {
1098 ParallelStateRootError::Other(format!(
1099 "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1100 ))
1101 })?;
1102
1103 storage_proof_receivers.insert(hashed_address, result_rx);
1104 }
1105
1106 Ok(storage_proof_receivers)
1107}
1108
1109#[derive(Debug)]
1111pub struct StorageProofInput {
1112 pub hashed_address: B256,
1114 pub targets: Vec<ProofV2Target>,
1116}
1117
1118impl StorageProofInput {
1119 pub const fn new(hashed_address: B256, targets: Vec<ProofV2Target>) -> Self {
1121 Self { hashed_address, targets }
1122 }
1123}
1124
1125#[derive(Debug)]
1127pub struct AccountMultiproofInput {
1128 pub targets: MultiProofTargetsV2,
1130 pub proof_result_sender: ProofResultContext,
1132}
1133
1134impl AccountMultiproofInput {
1135 fn into_proof_result_sender(self) -> ProofResultContext {
1137 self.proof_result_sender
1138 }
1139}
1140
1141#[derive(Debug)]
1143enum AccountWorkerJob {
1144 AccountMultiproof {
1146 input: Box<AccountMultiproofInput>,
1148 },
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use super::*;
1154 use reth_chainspec::ChainSpec;
1155 use reth_provider::test_utils::create_test_provider_factory_with_chain_spec;
1156 use std::sync::Arc;
1157
1158 fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1159 ProofTaskCtx::new(factory)
1160 }
1161
1162 #[test]
1164 fn spawn_proof_workers_creates_handle() {
1165 let chain_spec = Arc::new(ChainSpec::default());
1166 let anchor_hash = chain_spec.genesis_hash();
1167 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
1168 let changeset_cache = reth_trie_db::ChangesetCache::new();
1169 let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1170 provider_factory,
1171 reth_provider::providers::OverlayBuilder::<reth_ethereum_primitives::EthPrimitives>::new(
1172 anchor_hash,
1173 changeset_cache,
1174 ),
1175 );
1176 let ctx = test_ctx(factory);
1177
1178 let runtime = reth_tasks::Runtime::test();
1179 let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
1180
1181 let _cloned_handle = proof_handle.clone();
1183
1184 drop(proof_handle);
1186 }
1187}