reth_trie_parallel/
proof_task.rs

1//! Parallel proof computation using worker pools with dedicated database transactions.
2//!
3//!
4//! # Architecture
5//!
6//! - **Worker Pools**: Pre-spawned workers with dedicated database transactions
7//!   - Storage pool: Handles storage proofs and blinded storage node requests
8//!   - Account pool: Handles account multiproofs and blinded account node requests
9//! - **Direct Channel Access**: [`ProofWorkerHandle`] provides type-safe queue methods with direct
10//!   access to worker channels, eliminating routing overhead
11//! - **Automatic Shutdown**: Workers terminate gracefully when all handles are dropped
12//!
13//! # Message Flow
14//!
15//! 1. `MultiProofTask` prepares a storage or account job and hands it to [`ProofWorkerHandle`]. The
16//!    job carries a [`ProofResultContext`] so the worker knows how to send the result back.
17//! 2. A worker receives the job, runs the proof, and sends a [`ProofResultMessage`] through the
18//!    provided [`ProofResultSender`].
19//! 3. `MultiProofTask` receives the message, uses `sequence_number` to keep proofs in order, and
20//!    proceeds with its state-root logic.
21//!
22//! Each job gets its own direct channel so results go straight back to `MultiProofTask`. That keeps
23//! ordering decisions in one place and lets workers run independently.
24//!
25//! ```text
26//! MultiProofTask -> MultiproofManager -> ProofWorkerHandle -> Storage/Account Worker
27//!        ^                                          |
28//!        |                                          v
29//! ProofResultMessage <-------- ProofResultSender ---
30//! ```
31
32use crate::{
33    root::ParallelStateRootError,
34    stats::{ParallelTrieStats, ParallelTrieTracker},
35    targets_v2::MultiProofTargetsV2,
36    value_encoder::AsyncAccountValueEncoder,
37    StorageRootTargets,
38};
39use alloy_primitives::{
40    map::{B256Map, B256Set},
41    B256,
42};
43use alloy_rlp::{BufMut, Encodable};
44use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
45use dashmap::DashMap;
46use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
47use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
48use reth_storage_errors::db::DatabaseError;
49use reth_trie::{
50    hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
51    node_iter::{TrieElement, TrieNodeIter},
52    prefix_set::TriePrefixSets,
53    proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
54    proof_v2,
55    trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
56    walker::TrieWalker,
57    DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState,
58    MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
59};
60use reth_trie_common::{
61    added_removed_keys::MultiAddedRemovedKeys,
62    prefix_set::{PrefixSet, PrefixSetMut},
63    proof::{DecodedProofNodes, ProofRetainer},
64    BranchNodeMasks, BranchNodeMasksMap,
65};
66use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
67use std::{
68    sync::{
69        atomic::{AtomicUsize, Ordering},
70        mpsc::{channel, Receiver, Sender},
71        Arc,
72    },
73    time::{Duration, Instant},
74};
75use tokio::runtime::Handle;
76use tracing::{debug, debug_span, error, trace};
77
78#[cfg(feature = "metrics")]
79use crate::proof_task_metrics::{
80    ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
81};
82
83type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
84
85/// A handle that provides type-safe access to proof worker pools.
86///
87/// The handle stores direct senders to both storage and account worker pools,
88/// eliminating the need for a routing thread. All handles share reference-counted
89/// channels, and workers shut down gracefully when all handles are dropped.
90#[derive(Debug, Clone)]
91pub struct ProofWorkerHandle {
92    /// Direct sender to storage worker pool
93    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
94    /// Direct sender to account worker pool
95    account_work_tx: CrossbeamSender<AccountWorkerJob>,
96    /// Counter tracking available storage workers. Workers decrement when starting work,
97    /// increment when finishing. Used to determine whether to chunk multiproofs.
98    storage_available_workers: Arc<AtomicUsize>,
99    /// Counter tracking available account workers. Workers decrement when starting work,
100    /// increment when finishing. Used to determine whether to chunk multiproofs.
101    account_available_workers: Arc<AtomicUsize>,
102    /// Total number of storage workers spawned
103    storage_worker_count: usize,
104    /// Total number of account workers spawned
105    account_worker_count: usize,
106    /// Whether V2 storage proofs are enabled
107    v2_proofs_enabled: bool,
108}
109
110impl ProofWorkerHandle {
111    /// Spawns storage and account worker pools with dedicated database transactions.
112    ///
113    /// Returns a handle for submitting proof tasks to the worker pools.
114    /// Workers run until the last handle is dropped.
115    ///
116    /// # Parameters
117    /// - `executor`: Tokio runtime handle for spawning blocking tasks
118    /// - `task_ctx`: Shared context with database view and prefix sets
119    /// - `storage_worker_count`: Number of storage workers to spawn
120    /// - `account_worker_count`: Number of account workers to spawn
121    /// - `v2_proofs_enabled`: Whether to enable V2 storage proofs
122    pub fn new<Factory>(
123        executor: Handle,
124        task_ctx: ProofTaskCtx<Factory>,
125        storage_worker_count: usize,
126        account_worker_count: usize,
127        v2_proofs_enabled: bool,
128    ) -> Self
129    where
130        Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
131            + Clone
132            + Send
133            + 'static,
134    {
135        let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
136        let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
137
138        // Initialize availability counters at zero. Each worker will increment when it
139        // successfully initializes, ensuring only healthy workers are counted.
140        let storage_available_workers = Arc::new(AtomicUsize::new(0));
141        let account_available_workers = Arc::new(AtomicUsize::new(0));
142
143        let cached_storage_roots = Arc::new(DashMap::new());
144
145        debug!(
146            target: "trie::proof_task",
147            storage_worker_count,
148            account_worker_count,
149            ?v2_proofs_enabled,
150            "Spawning proof worker pools"
151        );
152
153        let parent_span =
154            debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
155                .entered();
156        // Spawn storage workers
157        for worker_id in 0..storage_worker_count {
158            let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
159            let task_ctx_clone = task_ctx.clone();
160            let work_rx_clone = storage_work_rx.clone();
161            let storage_available_workers_clone = storage_available_workers.clone();
162            let cached_storage_roots = cached_storage_roots.clone();
163
164            executor.spawn_blocking(move || {
165                #[cfg(feature = "metrics")]
166                let metrics = ProofTaskTrieMetrics::default();
167                #[cfg(feature = "metrics")]
168                let cursor_metrics = ProofTaskCursorMetrics::new();
169
170                let _guard = span.enter();
171                let worker = StorageProofWorker::new(
172                    task_ctx_clone,
173                    work_rx_clone,
174                    worker_id,
175                    storage_available_workers_clone,
176                    cached_storage_roots,
177                    #[cfg(feature = "metrics")]
178                    metrics,
179                    #[cfg(feature = "metrics")]
180                    cursor_metrics,
181                )
182                .with_v2_proofs(v2_proofs_enabled);
183                if let Err(error) = worker.run() {
184                    error!(
185                        target: "trie::proof_task",
186                        worker_id,
187                        ?error,
188                        "Storage worker failed"
189                    );
190                }
191            });
192        }
193        drop(parent_span);
194
195        let parent_span =
196            debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
197                .entered();
198        // Spawn account workers
199        for worker_id in 0..account_worker_count {
200            let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
201            let task_ctx_clone = task_ctx.clone();
202            let work_rx_clone = account_work_rx.clone();
203            let storage_work_tx_clone = storage_work_tx.clone();
204            let account_available_workers_clone = account_available_workers.clone();
205            let cached_storage_roots = cached_storage_roots.clone();
206
207            executor.spawn_blocking(move || {
208                #[cfg(feature = "metrics")]
209                let metrics = ProofTaskTrieMetrics::default();
210                #[cfg(feature = "metrics")]
211                let cursor_metrics = ProofTaskCursorMetrics::new();
212
213                let _guard = span.enter();
214                let worker = AccountProofWorker::new(
215                    task_ctx_clone,
216                    work_rx_clone,
217                    worker_id,
218                    storage_work_tx_clone,
219                    account_available_workers_clone,
220                    cached_storage_roots,
221                    #[cfg(feature = "metrics")]
222                    metrics,
223                    #[cfg(feature = "metrics")]
224                    cursor_metrics,
225                )
226                .with_v2_proofs(v2_proofs_enabled);
227                if let Err(error) = worker.run() {
228                    error!(
229                        target: "trie::proof_task",
230                        worker_id,
231                        ?error,
232                        "Account worker failed"
233                    );
234                }
235            });
236        }
237        drop(parent_span);
238
239        Self {
240            storage_work_tx,
241            account_work_tx,
242            storage_available_workers,
243            account_available_workers,
244            storage_worker_count,
245            account_worker_count,
246            v2_proofs_enabled,
247        }
248    }
249
250    /// Returns whether V2 storage proofs are enabled for this worker pool.
251    pub const fn v2_proofs_enabled(&self) -> bool {
252        self.v2_proofs_enabled
253    }
254
255    /// Returns how many storage workers are currently available/idle.
256    pub fn available_storage_workers(&self) -> usize {
257        self.storage_available_workers.load(Ordering::Relaxed)
258    }
259
260    /// Returns how many account workers are currently available/idle.
261    pub fn available_account_workers(&self) -> usize {
262        self.account_available_workers.load(Ordering::Relaxed)
263    }
264
265    /// Returns the number of pending storage tasks in the queue.
266    pub fn pending_storage_tasks(&self) -> usize {
267        self.storage_work_tx.len()
268    }
269
270    /// Returns the number of pending account tasks in the queue.
271    pub fn pending_account_tasks(&self) -> usize {
272        self.account_work_tx.len()
273    }
274
275    /// Returns the total number of storage workers in the pool.
276    pub const fn total_storage_workers(&self) -> usize {
277        self.storage_worker_count
278    }
279
280    /// Returns the total number of account workers in the pool.
281    pub const fn total_account_workers(&self) -> usize {
282        self.account_worker_count
283    }
284
285    /// Returns the number of storage workers currently processing tasks.
286    ///
287    /// This is calculated as total workers minus available workers.
288    pub fn active_storage_workers(&self) -> usize {
289        self.storage_worker_count.saturating_sub(self.available_storage_workers())
290    }
291
292    /// Returns the number of account workers currently processing tasks.
293    ///
294    /// This is calculated as total workers minus available workers.
295    pub fn active_account_workers(&self) -> usize {
296        self.account_worker_count.saturating_sub(self.available_account_workers())
297    }
298
299    /// Dispatch a storage proof computation to storage worker pool
300    ///
301    /// The result will be sent via the `proof_result_sender` channel.
302    pub fn dispatch_storage_proof(
303        &self,
304        input: StorageProofInput,
305        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
306    ) -> Result<(), ProviderError> {
307        let hashed_address = input.hashed_address();
308        self.storage_work_tx
309            .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
310            .map_err(|err| {
311                if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
312                    let _ = proof_result_sender.send(StorageProofResultMessage {
313                        hashed_address,
314                        result: Err(DatabaseError::Other(
315                            "storage workers unavailable".to_string(),
316                        )
317                        .into()),
318                    });
319                }
320
321                ProviderError::other(std::io::Error::other("storage workers unavailable"))
322            })
323    }
324
325    /// Dispatch an account multiproof computation
326    ///
327    /// The result will be sent via the `result_sender` channel included in the input.
328    pub fn dispatch_account_multiproof(
329        &self,
330        input: AccountMultiproofInput,
331    ) -> Result<(), ProviderError> {
332        self.account_work_tx
333            .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
334            .map_err(|err| {
335                let error =
336                    ProviderError::other(std::io::Error::other("account workers unavailable"));
337
338                if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
339                    let ProofResultContext {
340                        sender: result_tx,
341                        sequence_number: seq,
342                        state,
343                        start_time: start,
344                    } = input.into_proof_result_sender();
345
346                    let _ = result_tx.send(ProofResultMessage {
347                        sequence_number: seq,
348                        result: Err(ParallelStateRootError::Provider(error.clone())),
349                        elapsed: start.elapsed(),
350                        state,
351                    });
352                }
353
354                error
355            })
356    }
357
358    /// Dispatch blinded storage node request to storage worker pool
359    pub(crate) fn dispatch_blinded_storage_node(
360        &self,
361        account: B256,
362        path: Nibbles,
363    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
364        let (tx, rx) = channel();
365        self.storage_work_tx
366            .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
367            .map_err(|_| {
368                ProviderError::other(std::io::Error::other("storage workers unavailable"))
369            })?;
370
371        Ok(rx)
372    }
373
374    /// Dispatch blinded account node request to account worker pool
375    pub(crate) fn dispatch_blinded_account_node(
376        &self,
377        path: Nibbles,
378    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
379        let (tx, rx) = channel();
380        self.account_work_tx
381            .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
382            .map_err(|_| {
383                ProviderError::other(std::io::Error::other("account workers unavailable"))
384            })?;
385
386        Ok(rx)
387    }
388}
389
390/// Data used for initializing cursor factories that is shared across all proof worker instances.
391#[derive(Clone, Debug)]
392pub struct ProofTaskCtx<Factory> {
393    /// The factory for creating state providers.
394    factory: Factory,
395}
396
397impl<Factory> ProofTaskCtx<Factory> {
398    /// Creates a new [`ProofTaskCtx`] with the given factory.
399    pub const fn new(factory: Factory) -> Self {
400        Self { factory }
401    }
402}
403
404/// This contains all information shared between account proof worker instances.
405#[derive(Debug)]
406pub struct ProofTaskTx<Provider> {
407    /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
408    provider: Provider,
409
410    /// Identifier for the worker within the worker pool, used only for tracing.
411    id: usize,
412}
413
414impl<Provider> ProofTaskTx<Provider> {
415    /// Initializes a [`ProofTaskTx`] with the given provider and ID.
416    const fn new(provider: Provider, id: usize) -> Self {
417        Self { provider, id }
418    }
419}
420
421impl<Provider> ProofTaskTx<Provider>
422where
423    Provider: TrieCursorFactory + HashedCursorFactory,
424{
425    /// Compute storage proof.
426    ///
427    /// Used by storage workers in the worker pool to compute storage proofs.
428    #[inline]
429    fn compute_legacy_storage_proof(
430        &self,
431        input: StorageProofInput,
432        trie_cursor_metrics: &mut TrieCursorMetricsCache,
433        hashed_cursor_metrics: &mut HashedCursorMetricsCache,
434    ) -> Result<StorageProofResult, StateProofError> {
435        // Consume the input so we can move large collections (e.g. target slots) without cloning.
436        let StorageProofInput::Legacy {
437            hashed_address,
438            prefix_set,
439            target_slots,
440            with_branch_node_masks,
441            multi_added_removed_keys,
442        } = input
443        else {
444            panic!("compute_legacy_storage_proof only accepts StorageProofInput::Legacy")
445        };
446
447        // Get or create added/removed keys context
448        let multi_added_removed_keys =
449            multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
450        let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
451
452        let span = debug_span!(
453            target: "trie::proof_task",
454            "Storage proof calculation",
455            ?hashed_address,
456            target_slots = ?target_slots.len(),
457            worker_id = self.id,
458        );
459        let _span_guard = span.enter();
460
461        let proof_start = Instant::now();
462
463        // Compute raw storage multiproof
464        let raw_proof_result =
465            StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
466                .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
467                .with_branch_node_masks(with_branch_node_masks)
468                .with_added_removed_keys(added_removed_keys)
469                .with_trie_cursor_metrics(trie_cursor_metrics)
470                .with_hashed_cursor_metrics(hashed_cursor_metrics)
471                .storage_multiproof(target_slots);
472        trie_cursor_metrics.record_span("trie_cursor");
473        hashed_cursor_metrics.record_span("hashed_cursor");
474
475        // Decode proof into DecodedStorageMultiProof
476        let decoded_result =
477            raw_proof_result.and_then(|raw_proof| raw_proof.try_into().map_err(Into::into))?;
478
479        trace!(
480            target: "trie::proof_task",
481            hashed_address = ?hashed_address,
482            proof_time_us = proof_start.elapsed().as_micros(),
483            worker_id = self.id,
484            "Completed storage proof calculation"
485        );
486
487        Ok(StorageProofResult::Legacy { proof: decoded_result })
488    }
489
490    fn compute_v2_storage_proof(
491        &self,
492        input: StorageProofInput,
493        calculator: &mut proof_v2::StorageProofCalculator<
494            <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
495            <Provider as HashedCursorFactory>::StorageCursor<'_>,
496        >,
497    ) -> Result<StorageProofResult, StateProofError> {
498        let StorageProofInput::V2 { hashed_address, mut targets } = input else {
499            panic!("compute_v2_storage_proof only accepts StorageProofInput::V2")
500        };
501
502        // If targets is empty it means the caller only wants the root hash. The V2 proof calculator
503        // will do nothing given no targets, so instead we give it a fake target so it always
504        // returns at least the root.
505        if targets.is_empty() {
506            targets.push(proof_v2::Target::new(B256::ZERO));
507        }
508
509        let span = debug_span!(
510            target: "trie::proof_task",
511            "V2 Storage proof calculation",
512            ?hashed_address,
513            targets = ?targets.len(),
514            worker_id = self.id,
515        );
516        let _span_guard = span.enter();
517
518        let proof_start = Instant::now();
519        let proof = calculator.storage_proof(hashed_address, &mut targets)?;
520        let root = calculator.compute_root_hash(&proof)?;
521
522        trace!(
523            target: "trie::proof_task",
524            hashed_address = ?hashed_address,
525            proof_time_us = proof_start.elapsed().as_micros(),
526            ?root,
527            worker_id = self.id,
528            "Completed V2 storage proof calculation"
529        );
530
531        Ok(StorageProofResult::V2 { proof, root })
532    }
533
534    /// Process a blinded storage node request.
535    ///
536    /// Used by storage workers to retrieve blinded storage trie nodes for proof construction.
537    fn process_blinded_storage_node(
538        &self,
539        account: B256,
540        path: &Nibbles,
541    ) -> TrieNodeProviderResult {
542        let storage_node_provider =
543            ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
544        storage_node_provider.trie_node(path)
545    }
546
547    /// Process a blinded account node request.
548    ///
549    /// Used by account workers to retrieve blinded account trie nodes for proof construction.
550    fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
551        let account_node_provider =
552            ProofBlindedAccountProvider::new(&self.provider, &self.provider);
553        account_node_provider.trie_node(path)
554    }
555}
556impl TrieNodeProviderFactory for ProofWorkerHandle {
557    type AccountNodeProvider = ProofTaskTrieNodeProvider;
558    type StorageNodeProvider = ProofTaskTrieNodeProvider;
559
560    fn account_node_provider(&self) -> Self::AccountNodeProvider {
561        ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
562    }
563
564    fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
565        ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
566    }
567}
568
569/// Trie node provider for retrieving trie nodes by path.
570#[derive(Debug)]
571pub enum ProofTaskTrieNodeProvider {
572    /// Blinded account trie node provider.
573    AccountNode {
574        /// Handle to the proof worker pools.
575        handle: ProofWorkerHandle,
576    },
577    /// Blinded storage trie node provider.
578    StorageNode {
579        /// Target account.
580        account: B256,
581        /// Handle to the proof worker pools.
582        handle: ProofWorkerHandle,
583    },
584}
585
586impl TrieNodeProvider for ProofTaskTrieNodeProvider {
587    fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
588        match self {
589            Self::AccountNode { handle } => {
590                let rx = handle
591                    .dispatch_blinded_account_node(*path)
592                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
593                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
594            }
595            Self::StorageNode { handle, account } => {
596                let rx = handle
597                    .dispatch_blinded_storage_node(*account, *path)
598                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
599                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
600            }
601        }
602    }
603}
604
605/// Result of a multiproof calculation.
606#[derive(Debug)]
607pub enum ProofResult {
608    /// Legacy multiproof calculation result.
609    Legacy(DecodedMultiProof, ParallelTrieStats),
610    /// V2 multiproof calculation result.
611    V2(DecodedMultiProofV2),
612}
613
614impl ProofResult {
615    /// Creates an empty [`ProofResult`] of the appropriate variant based on `v2_enabled`.
616    ///
617    /// Use this when constructing empty proofs (e.g., for state updates where all targets
618    /// were already fetched) to ensure consistency with the proof version being used.
619    pub fn empty(v2_enabled: bool) -> Self {
620        if v2_enabled {
621            Self::V2(DecodedMultiProofV2::default())
622        } else {
623            let stats = ParallelTrieTracker::default().finish();
624            Self::Legacy(DecodedMultiProof::default(), stats)
625        }
626    }
627
628    /// Returns true if the result contains no proofs
629    pub fn is_empty(&self) -> bool {
630        match self {
631            Self::Legacy(proof, _) => proof.is_empty(),
632            Self::V2(proof) => proof.is_empty(),
633        }
634    }
635
636    /// Extends the receiver with the value of the given results.
637    ///
638    /// # Panics
639    ///
640    /// This method panics if the two [`ProofResult`]s are not the same variant.
641    pub fn extend(&mut self, other: Self) {
642        match (self, other) {
643            (Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other),
644            (Self::V2(proof), Self::V2(other)) => proof.extend(other),
645            _ => panic!("mismatched ProofResults, cannot extend one with the other"),
646        }
647    }
648
649    /// Returns the number of account proofs.
650    pub fn account_proofs_len(&self) -> usize {
651        match self {
652            Self::Legacy(proof, _) => proof.account_subtree.len(),
653            Self::V2(proof) => proof.account_proofs.len(),
654        }
655    }
656
657    /// Returns the total number of storage proofs
658    pub fn storage_proofs_len(&self) -> usize {
659        match self {
660            Self::Legacy(proof, _) => {
661                proof.storages.values().map(|p| p.subtree.len()).sum::<usize>()
662            }
663            Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::<usize>(),
664        }
665    }
666}
667
668/// Channel used by worker threads to deliver `ProofResultMessage` items back to
669/// `MultiProofTask`.
670///
671/// Workers use this sender to deliver proof results directly to `MultiProofTask`.
672pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
673
674/// Message containing a completed proof result with metadata for direct delivery to
675/// `MultiProofTask`.
676///
677/// This type enables workers to send proof results directly to the `MultiProofTask` event loop.
678#[derive(Debug)]
679pub struct ProofResultMessage {
680    /// Sequence number for ordering proofs
681    pub sequence_number: u64,
682    /// The proof calculation result (either account multiproof or storage proof)
683    pub result: Result<ProofResult, ParallelStateRootError>,
684    /// Time taken for the entire proof calculation (from dispatch to completion)
685    pub elapsed: Duration,
686    /// Original state update that triggered this proof
687    pub state: HashedPostState,
688}
689
690/// Context for sending proof calculation results back to `MultiProofTask`.
691///
692/// This struct contains all context needed to send and track proof calculation results.
693/// Workers use this to deliver completed proofs back to the main event loop.
694#[derive(Debug, Clone)]
695pub struct ProofResultContext {
696    /// Channel sender for result delivery
697    pub sender: ProofResultSender,
698    /// Sequence number for proof ordering
699    pub sequence_number: u64,
700    /// Original state update that triggered this proof
701    pub state: HashedPostState,
702    /// Calculation start time for measuring elapsed duration
703    pub start_time: Instant,
704}
705
706impl ProofResultContext {
707    /// Creates a new proof result context.
708    pub const fn new(
709        sender: ProofResultSender,
710        sequence_number: u64,
711        state: HashedPostState,
712        start_time: Instant,
713    ) -> Self {
714        Self { sender, sequence_number, state, start_time }
715    }
716}
717
718/// The results of a storage proof calculation.
719#[derive(Debug)]
720pub(crate) enum StorageProofResult {
721    Legacy {
722        /// The storage multiproof
723        proof: DecodedStorageMultiProof,
724    },
725    V2 {
726        /// The calculated V2 proof nodes
727        proof: Vec<ProofTrieNode>,
728        /// The storage root calculated by the V2 proof
729        root: Option<B256>,
730    },
731}
732
733impl StorageProofResult {
734    /// Returns the calculated root of the trie, if one can be calculated from the proof.
735    const fn root(&self) -> Option<B256> {
736        match self {
737            Self::Legacy { proof } => Some(proof.root),
738            Self::V2 { root, .. } => *root,
739        }
740    }
741}
742
743impl From<StorageProofResult> for Option<DecodedStorageMultiProof> {
744    /// Returns None if the V2 proof result doesn't have a calculated root hash.
745    fn from(proof_result: StorageProofResult) -> Self {
746        match proof_result {
747            StorageProofResult::Legacy { proof } => Some(proof),
748            StorageProofResult::V2 { proof, root } => root.map(|root| {
749                let branch_node_masks = proof
750                    .iter()
751                    .filter_map(|node| node.masks.map(|masks| (node.path, masks)))
752                    .collect();
753                let subtree = proof.into_iter().map(|node| (node.path, node.node)).collect();
754                DecodedStorageMultiProof { root, subtree, branch_node_masks }
755            }),
756        }
757    }
758}
759
760/// Message containing a completed storage proof result with metadata.
761#[derive(Debug)]
762pub struct StorageProofResultMessage {
763    /// The hashed address this storage proof belongs to
764    pub(crate) hashed_address: B256,
765    /// The storage proof calculation result
766    pub(crate) result: Result<StorageProofResult, StateProofError>,
767}
768
769/// Internal message for storage workers.
770#[derive(Debug)]
771pub(crate) enum StorageWorkerJob {
772    /// Storage proof computation request
773    StorageProof {
774        /// Storage proof input parameters
775        input: StorageProofInput,
776        /// Context for sending the proof result.
777        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
778    },
779    /// Blinded storage node retrieval request
780    BlindedStorageNode {
781        /// Target account
782        account: B256,
783        /// Path to the storage node
784        path: Nibbles,
785        /// Channel to send result back to original caller
786        result_sender: Sender<TrieNodeProviderResult>,
787    },
788}
789
790/// Worker for storage trie operations.
791///
792/// Each worker maintains a dedicated database transaction and processes
793/// storage proof requests and blinded node lookups.
794struct StorageProofWorker<Factory> {
795    /// Shared task context with database factory and prefix sets
796    task_ctx: ProofTaskCtx<Factory>,
797    /// Channel for receiving work
798    work_rx: CrossbeamReceiver<StorageWorkerJob>,
799    /// Unique identifier for this worker (used for tracing)
800    worker_id: usize,
801    /// Counter tracking worker availability
802    available_workers: Arc<AtomicUsize>,
803    /// Cached storage roots
804    cached_storage_roots: Arc<DashMap<B256, B256>>,
805    /// Metrics collector for this worker
806    #[cfg(feature = "metrics")]
807    metrics: ProofTaskTrieMetrics,
808    /// Cursor metrics for this worker
809    #[cfg(feature = "metrics")]
810    cursor_metrics: ProofTaskCursorMetrics,
811    /// Set to true if V2 proofs are enabled.
812    v2_enabled: bool,
813}
814
815impl<Factory> StorageProofWorker<Factory>
816where
817    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
818{
819    /// Creates a new storage proof worker.
820    const fn new(
821        task_ctx: ProofTaskCtx<Factory>,
822        work_rx: CrossbeamReceiver<StorageWorkerJob>,
823        worker_id: usize,
824        available_workers: Arc<AtomicUsize>,
825        cached_storage_roots: Arc<DashMap<B256, B256>>,
826        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
827        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
828    ) -> Self {
829        Self {
830            task_ctx,
831            work_rx,
832            worker_id,
833            available_workers,
834            cached_storage_roots,
835            #[cfg(feature = "metrics")]
836            metrics,
837            #[cfg(feature = "metrics")]
838            cursor_metrics,
839            v2_enabled: false,
840        }
841    }
842
843    /// Changes whether or not V2 proofs are enabled.
844    const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
845        self.v2_enabled = v2_enabled;
846        self
847    }
848
849    /// Runs the worker loop, processing jobs until the channel closes.
850    ///
851    /// # Lifecycle
852    ///
853    /// 1. Initializes database provider and transaction
854    /// 2. Advertises availability
855    /// 3. Processes jobs in a loop:
856    ///    - Receives job from channel
857    ///    - Marks worker as busy
858    ///    - Processes the job
859    ///    - Marks worker as available
860    /// 4. Shuts down when channel closes
861    ///
862    /// # Panic Safety
863    ///
864    /// If this function panics, the worker thread terminates but other workers
865    /// continue operating and the system degrades gracefully.
866    fn run(mut self) -> ProviderResult<()> {
867        // Create provider from factory
868        let provider = self.task_ctx.factory.database_provider_ro()?;
869        let proof_tx = ProofTaskTx::new(provider, self.worker_id);
870
871        trace!(
872            target: "trie::proof_task",
873            worker_id = self.worker_id,
874            "Storage worker started"
875        );
876
877        let mut storage_proofs_processed = 0u64;
878        let mut storage_nodes_processed = 0u64;
879        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
880        let mut v2_calculator = if self.v2_enabled {
881            let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
882            let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
883            Some(proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor))
884        } else {
885            None
886        };
887
888        // Initially mark this worker as available.
889        self.available_workers.fetch_add(1, Ordering::Relaxed);
890
891        while let Ok(job) = self.work_rx.recv() {
892            // Mark worker as busy.
893            self.available_workers.fetch_sub(1, Ordering::Relaxed);
894
895            match job {
896                StorageWorkerJob::StorageProof { input, proof_result_sender } => {
897                    self.process_storage_proof(
898                        &proof_tx,
899                        v2_calculator.as_mut(),
900                        input,
901                        proof_result_sender,
902                        &mut storage_proofs_processed,
903                        &mut cursor_metrics_cache,
904                    );
905                }
906
907                StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
908                    Self::process_blinded_node(
909                        self.worker_id,
910                        &proof_tx,
911                        account,
912                        path,
913                        result_sender,
914                        &mut storage_nodes_processed,
915                    );
916                }
917            }
918
919            // Mark worker as available again.
920            self.available_workers.fetch_add(1, Ordering::Relaxed);
921        }
922
923        trace!(
924            target: "trie::proof_task",
925            worker_id = self.worker_id,
926            storage_proofs_processed,
927            storage_nodes_processed,
928            "Storage worker shutting down"
929        );
930
931        #[cfg(feature = "metrics")]
932        {
933            self.metrics.record_storage_nodes(storage_nodes_processed as usize);
934            self.cursor_metrics.record(&mut cursor_metrics_cache);
935        }
936
937        Ok(())
938    }
939
940    /// Processes a storage proof request.
941    fn process_storage_proof<Provider>(
942        &self,
943        proof_tx: &ProofTaskTx<Provider>,
944        v2_calculator: Option<
945            &mut proof_v2::StorageProofCalculator<
946                <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
947                <Provider as HashedCursorFactory>::StorageCursor<'_>,
948            >,
949        >,
950        input: StorageProofInput,
951        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
952        storage_proofs_processed: &mut u64,
953        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
954    ) where
955        Provider: TrieCursorFactory + HashedCursorFactory,
956    {
957        let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
958        let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
959        let hashed_address = input.hashed_address();
960        let proof_start = Instant::now();
961
962        let result = match &input {
963            StorageProofInput::Legacy { hashed_address, prefix_set, target_slots, .. } => {
964                trace!(
965                    target: "trie::proof_task",
966                    worker_id = self.worker_id,
967                    hashed_address = ?hashed_address,
968                    prefix_set_len = prefix_set.len(),
969                    target_slots_len = target_slots.len(),
970                    "Processing storage proof"
971                );
972
973                proof_tx.compute_legacy_storage_proof(
974                    input,
975                    &mut trie_cursor_metrics,
976                    &mut hashed_cursor_metrics,
977                )
978            }
979            StorageProofInput::V2 { hashed_address, targets } => {
980                trace!(
981                    target: "trie::proof_task",
982                    worker_id = self.worker_id,
983                    hashed_address = ?hashed_address,
984                    targets_len = targets.len(),
985                    "Processing V2 storage proof"
986                );
987                proof_tx
988                    .compute_v2_storage_proof(input, v2_calculator.expect("v2 calculator provided"))
989            }
990        };
991
992        let proof_elapsed = proof_start.elapsed();
993        *storage_proofs_processed += 1;
994
995        let root = result.as_ref().ok().and_then(|result| result.root());
996
997        if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
998            trace!(
999                target: "trie::proof_task",
1000                worker_id = self.worker_id,
1001                hashed_address = ?hashed_address,
1002                storage_proofs_processed,
1003                "Proof result receiver dropped, discarding result"
1004            );
1005        }
1006
1007        if let Some(root) = root {
1008            self.cached_storage_roots.insert(hashed_address, root);
1009        }
1010
1011        trace!(
1012            target: "trie::proof_task",
1013            worker_id = self.worker_id,
1014            hashed_address = ?hashed_address,
1015            proof_time_us = proof_elapsed.as_micros(),
1016            total_processed = storage_proofs_processed,
1017            trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
1018            hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
1019            ?trie_cursor_metrics,
1020            ?hashed_cursor_metrics,
1021            ?root,
1022            "Storage proof completed"
1023        );
1024
1025        #[cfg(feature = "metrics")]
1026        {
1027            // Accumulate per-proof metrics into the worker's cache
1028            let per_proof_cache = ProofTaskCursorMetricsCache {
1029                account_trie_cursor: TrieCursorMetricsCache::default(),
1030                account_hashed_cursor: HashedCursorMetricsCache::default(),
1031                storage_trie_cursor: trie_cursor_metrics,
1032                storage_hashed_cursor: hashed_cursor_metrics,
1033            };
1034            cursor_metrics_cache.extend(&per_proof_cache);
1035        }
1036    }
1037
1038    /// Processes a blinded storage node lookup request.
1039    fn process_blinded_node<Provider>(
1040        worker_id: usize,
1041        proof_tx: &ProofTaskTx<Provider>,
1042        account: B256,
1043        path: Nibbles,
1044        result_sender: Sender<TrieNodeProviderResult>,
1045        storage_nodes_processed: &mut u64,
1046    ) where
1047        Provider: TrieCursorFactory + HashedCursorFactory,
1048    {
1049        trace!(
1050            target: "trie::proof_task",
1051            worker_id,
1052            ?account,
1053            ?path,
1054            "Processing blinded storage node"
1055        );
1056
1057        let start = Instant::now();
1058        let result = proof_tx.process_blinded_storage_node(account, &path);
1059        let elapsed = start.elapsed();
1060
1061        *storage_nodes_processed += 1;
1062
1063        if result_sender.send(result).is_err() {
1064            trace!(
1065                target: "trie::proof_task",
1066                worker_id,
1067                ?account,
1068                ?path,
1069                storage_nodes_processed,
1070                "Blinded storage node receiver dropped, discarding result"
1071            );
1072        }
1073
1074        trace!(
1075            target: "trie::proof_task",
1076            worker_id,
1077            ?account,
1078            ?path,
1079            elapsed_us = elapsed.as_micros(),
1080            total_processed = storage_nodes_processed,
1081            "Blinded storage node completed"
1082        );
1083    }
1084}
1085
1086/// Worker for account trie operations.
1087///
1088/// Each worker maintains a dedicated database transaction and processes
1089/// account multiproof requests and blinded node lookups.
1090struct AccountProofWorker<Factory> {
1091    /// Shared task context with database factory and prefix sets
1092    task_ctx: ProofTaskCtx<Factory>,
1093    /// Channel for receiving work
1094    work_rx: CrossbeamReceiver<AccountWorkerJob>,
1095    /// Unique identifier for this worker (used for tracing)
1096    worker_id: usize,
1097    /// Channel for dispatching storage proof work
1098    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1099    /// Counter tracking worker availability
1100    available_workers: Arc<AtomicUsize>,
1101    /// Cached storage roots
1102    cached_storage_roots: Arc<DashMap<B256, B256>>,
1103    /// Metrics collector for this worker
1104    #[cfg(feature = "metrics")]
1105    metrics: ProofTaskTrieMetrics,
1106    /// Cursor metrics for this worker
1107    #[cfg(feature = "metrics")]
1108    cursor_metrics: ProofTaskCursorMetrics,
1109    /// Set to true if V2 proofs are enabled.
1110    v2_enabled: bool,
1111}
1112
1113impl<Factory> AccountProofWorker<Factory>
1114where
1115    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
1116{
1117    /// Creates a new account proof worker.
1118    #[allow(clippy::too_many_arguments)]
1119    const fn new(
1120        task_ctx: ProofTaskCtx<Factory>,
1121        work_rx: CrossbeamReceiver<AccountWorkerJob>,
1122        worker_id: usize,
1123        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1124        available_workers: Arc<AtomicUsize>,
1125        cached_storage_roots: Arc<DashMap<B256, B256>>,
1126        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
1127        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
1128    ) -> Self {
1129        Self {
1130            task_ctx,
1131            work_rx,
1132            worker_id,
1133            storage_work_tx,
1134            available_workers,
1135            cached_storage_roots,
1136            #[cfg(feature = "metrics")]
1137            metrics,
1138            #[cfg(feature = "metrics")]
1139            cursor_metrics,
1140            v2_enabled: false,
1141        }
1142    }
1143
1144    /// Changes whether or not V2 proofs are enabled.
1145    const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
1146        self.v2_enabled = v2_enabled;
1147        self
1148    }
1149
1150    /// Runs the worker loop, processing jobs until the channel closes.
1151    ///
1152    /// # Lifecycle
1153    ///
1154    /// 1. Initializes database provider and transaction
1155    /// 2. Advertises availability
1156    /// 3. Processes jobs in a loop:
1157    ///    - Receives job from channel
1158    ///    - Marks worker as busy
1159    ///    - Processes the job
1160    ///    - Marks worker as available
1161    /// 4. Shuts down when channel closes
1162    ///
1163    /// # Panic Safety
1164    ///
1165    /// If this function panics, the worker thread terminates but other workers
1166    /// continue operating and the system degrades gracefully.
1167    fn run(mut self) -> ProviderResult<()> {
1168        // Create provider from factory
1169        let provider = self.task_ctx.factory.database_provider_ro()?;
1170        let proof_tx = ProofTaskTx::new(provider, self.worker_id);
1171
1172        trace!(
1173            target: "trie::proof_task",
1174            worker_id=self.worker_id,
1175            "Account worker started"
1176        );
1177
1178        let mut account_proofs_processed = 0u64;
1179        let mut account_nodes_processed = 0u64;
1180        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1181
1182        let mut v2_calculator = if self.v2_enabled {
1183            let trie_cursor = proof_tx.provider.account_trie_cursor()?;
1184            let hashed_cursor = proof_tx.provider.hashed_account_cursor()?;
1185            Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new(
1186                trie_cursor,
1187                hashed_cursor,
1188            ))
1189        } else {
1190            None
1191        };
1192
1193        // Count this worker as available only after successful initialization.
1194        self.available_workers.fetch_add(1, Ordering::Relaxed);
1195
1196        while let Ok(job) = self.work_rx.recv() {
1197            // Mark worker as busy.
1198            self.available_workers.fetch_sub(1, Ordering::Relaxed);
1199
1200            match job {
1201                AccountWorkerJob::AccountMultiproof { input } => {
1202                    self.process_account_multiproof(
1203                        &proof_tx,
1204                        v2_calculator.as_mut(),
1205                        *input,
1206                        &mut account_proofs_processed,
1207                        &mut cursor_metrics_cache,
1208                    );
1209                }
1210
1211                AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1212                    Self::process_blinded_node(
1213                        self.worker_id,
1214                        &proof_tx,
1215                        path,
1216                        result_sender,
1217                        &mut account_nodes_processed,
1218                    );
1219                }
1220            }
1221
1222            // Mark worker as available again.
1223            self.available_workers.fetch_add(1, Ordering::Relaxed);
1224        }
1225
1226        trace!(
1227            target: "trie::proof_task",
1228            worker_id=self.worker_id,
1229            account_proofs_processed,
1230            account_nodes_processed,
1231            "Account worker shutting down"
1232        );
1233
1234        #[cfg(feature = "metrics")]
1235        {
1236            self.metrics.record_account_nodes(account_nodes_processed as usize);
1237            self.cursor_metrics.record(&mut cursor_metrics_cache);
1238        }
1239
1240        Ok(())
1241    }
1242
1243    fn compute_legacy_account_multiproof<Provider>(
1244        &self,
1245        proof_tx: &ProofTaskTx<Provider>,
1246        targets: MultiProofTargets,
1247        mut prefix_sets: TriePrefixSets,
1248        collect_branch_node_masks: bool,
1249        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1250        proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1251    ) -> Result<ProofResult, ParallelStateRootError>
1252    where
1253        Provider: TrieCursorFactory + HashedCursorFactory,
1254    {
1255        let span = debug_span!(
1256            target: "trie::proof_task",
1257            "Account multiproof calculation",
1258            targets = targets.len(),
1259            worker_id=self.worker_id,
1260        );
1261        let _span_guard = span.enter();
1262
1263        trace!(
1264            target: "trie::proof_task",
1265            "Processing account multiproof"
1266        );
1267
1268        let mut tracker = ParallelTrieTracker::default();
1269
1270        let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1271
1272        let storage_root_targets_len =
1273            StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1274
1275        tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1276
1277        let storage_proof_receivers = dispatch_storage_proofs(
1278            &self.storage_work_tx,
1279            &targets,
1280            &mut storage_prefix_sets,
1281            collect_branch_node_masks,
1282            multi_added_removed_keys.as_ref(),
1283        )?;
1284
1285        let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1286
1287        let ctx = AccountMultiproofParams {
1288            targets: &targets,
1289            prefix_set: account_prefix_set,
1290            collect_branch_node_masks,
1291            multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1292            storage_proof_receivers,
1293            cached_storage_roots: &self.cached_storage_roots,
1294        };
1295
1296        let result = build_account_multiproof_with_storage_roots(
1297            &proof_tx.provider,
1298            ctx,
1299            &mut tracker,
1300            proof_cursor_metrics,
1301        );
1302
1303        let stats = tracker.finish();
1304        result.map(|proof| ProofResult::Legacy(proof, stats))
1305    }
1306
1307    fn compute_v2_account_multiproof<Provider>(
1308        &self,
1309        v2_calculator: &mut proof_v2::ProofCalculator<
1310            <Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
1311            <Provider as HashedCursorFactory>::AccountCursor<'_>,
1312            AsyncAccountValueEncoder,
1313        >,
1314        targets: MultiProofTargetsV2,
1315    ) -> Result<ProofResult, ParallelStateRootError>
1316    where
1317        Provider: TrieCursorFactory + HashedCursorFactory,
1318    {
1319        let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1320
1321        let span = debug_span!(
1322            target: "trie::proof_task",
1323            "Account V2 multiproof calculation",
1324            account_targets = account_targets.len(),
1325            storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1326            worker_id = self.worker_id,
1327        );
1328        let _span_guard = span.enter();
1329
1330        trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1331
1332        let storage_proof_receivers =
1333            dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1334
1335        let mut value_encoder = AsyncAccountValueEncoder::new(
1336            self.storage_work_tx.clone(),
1337            storage_proof_receivers,
1338            self.cached_storage_roots.clone(),
1339        );
1340
1341        let proof = DecodedMultiProofV2 {
1342            account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?,
1343            storage_proofs: value_encoder.into_storage_proofs()?,
1344        };
1345
1346        Ok(ProofResult::V2(proof))
1347    }
1348
1349    /// Processes an account multiproof request.
1350    fn process_account_multiproof<Provider>(
1351        &self,
1352        proof_tx: &ProofTaskTx<Provider>,
1353        v2_calculator: Option<
1354            &mut proof_v2::ProofCalculator<
1355                <Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
1356                <Provider as HashedCursorFactory>::AccountCursor<'_>,
1357                AsyncAccountValueEncoder,
1358            >,
1359        >,
1360        input: AccountMultiproofInput,
1361        account_proofs_processed: &mut u64,
1362        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1363    ) where
1364        Provider: TrieCursorFactory + HashedCursorFactory,
1365    {
1366        let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
1367        let proof_start = Instant::now();
1368
1369        let (proof_result_sender, result) = match input {
1370            AccountMultiproofInput::Legacy {
1371                targets,
1372                prefix_sets,
1373                collect_branch_node_masks,
1374                multi_added_removed_keys,
1375                proof_result_sender,
1376            } => (
1377                proof_result_sender,
1378                self.compute_legacy_account_multiproof(
1379                    proof_tx,
1380                    targets,
1381                    prefix_sets,
1382                    collect_branch_node_masks,
1383                    multi_added_removed_keys,
1384                    &mut proof_cursor_metrics,
1385                ),
1386            ),
1387            AccountMultiproofInput::V2 { targets, proof_result_sender } => (
1388                proof_result_sender,
1389                self.compute_v2_account_multiproof::<Provider>(
1390                    v2_calculator.expect("v2 calculator provided"),
1391                    targets,
1392                ),
1393            ),
1394        };
1395
1396        let ProofResultContext {
1397            sender: result_tx,
1398            sequence_number: seq,
1399            state,
1400            start_time: start,
1401        } = proof_result_sender;
1402
1403        let proof_elapsed = proof_start.elapsed();
1404        let total_elapsed = start.elapsed();
1405        *account_proofs_processed += 1;
1406
1407        // Send result to MultiProofTask
1408        if result_tx
1409            .send(ProofResultMessage {
1410                sequence_number: seq,
1411                result,
1412                elapsed: total_elapsed,
1413                state,
1414            })
1415            .is_err()
1416        {
1417            trace!(
1418                target: "trie::proof_task",
1419                worker_id=self.worker_id,
1420                account_proofs_processed,
1421                "Account multiproof receiver dropped, discarding result"
1422            );
1423        }
1424
1425        proof_cursor_metrics.record_spans();
1426
1427        trace!(
1428            target: "trie::proof_task",
1429            proof_time_us = proof_elapsed.as_micros(),
1430            total_elapsed_us = total_elapsed.as_micros(),
1431            total_processed = account_proofs_processed,
1432            account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1433            account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1434            storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1435            storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1436            account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1437            account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1438            storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1439            storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1440            "Account multiproof completed"
1441        );
1442
1443        #[cfg(feature = "metrics")]
1444        // Accumulate per-proof metrics into the worker's cache
1445        cursor_metrics_cache.extend(&proof_cursor_metrics);
1446    }
1447
1448    /// Processes a blinded account node lookup request.
1449    fn process_blinded_node<Provider>(
1450        worker_id: usize,
1451        proof_tx: &ProofTaskTx<Provider>,
1452        path: Nibbles,
1453        result_sender: Sender<TrieNodeProviderResult>,
1454        account_nodes_processed: &mut u64,
1455    ) where
1456        Provider: TrieCursorFactory + HashedCursorFactory,
1457    {
1458        let span = debug_span!(
1459            target: "trie::proof_task",
1460            "Blinded account node calculation",
1461            ?path,
1462            worker_id,
1463        );
1464        let _span_guard = span.enter();
1465
1466        trace!(
1467            target: "trie::proof_task",
1468            "Processing blinded account node"
1469        );
1470
1471        let start = Instant::now();
1472        let result = proof_tx.process_blinded_account_node(&path);
1473        let elapsed = start.elapsed();
1474
1475        *account_nodes_processed += 1;
1476
1477        if result_sender.send(result).is_err() {
1478            trace!(
1479                target: "trie::proof_task",
1480                worker_id,
1481                ?path,
1482                account_nodes_processed,
1483                "Blinded account node receiver dropped, discarding result"
1484            );
1485        }
1486
1487        trace!(
1488            target: "trie::proof_task",
1489            node_time_us = elapsed.as_micros(),
1490            total_processed = account_nodes_processed,
1491            "Blinded account node completed"
1492        );
1493    }
1494}
1495
1496/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk.
1497///
1498/// This is a helper function used by account workers to build the account subtree proof
1499/// while storage proofs are still being computed. Receivers are consumed only when needed,
1500/// enabling interleaved parallelism between account trie traversal and storage proof computation.
1501///
1502/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs.
1503fn build_account_multiproof_with_storage_roots<P>(
1504    provider: &P,
1505    ctx: AccountMultiproofParams<'_>,
1506    tracker: &mut ParallelTrieTracker,
1507    proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1508) -> Result<DecodedMultiProof, ParallelStateRootError>
1509where
1510    P: TrieCursorFactory + HashedCursorFactory,
1511{
1512    let accounts_added_removed_keys =
1513        ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1514
1515    // Wrap account trie cursor with instrumented cursor
1516    let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1517    let account_trie_cursor = InstrumentedTrieCursor::new(
1518        account_trie_cursor,
1519        &mut proof_cursor_metrics.account_trie_cursor,
1520    );
1521
1522    // Create the walker.
1523    let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1524        .with_added_removed_keys(accounts_added_removed_keys)
1525        .with_deletions_retained(true);
1526
1527    // Create a hash builder to rebuild the root node since it is not available in the database.
1528    let retainer = ctx
1529        .targets
1530        .keys()
1531        .map(Nibbles::unpack)
1532        .collect::<ProofRetainer>()
1533        .with_added_removed_keys(accounts_added_removed_keys);
1534    let mut hash_builder = HashBuilder::default()
1535        .with_proof_retainer(retainer)
1536        .with_updates(ctx.collect_branch_node_masks);
1537
1538    // Initialize storage multiproofs map with pre-allocated capacity.
1539    // Proofs will be inserted as they're consumed from receivers during trie walk.
1540    let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1541        B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1542    let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1543
1544    // Wrap account hashed cursor with instrumented cursor
1545    let account_hashed_cursor =
1546        provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1547    let account_hashed_cursor = InstrumentedHashedCursor::new(
1548        account_hashed_cursor,
1549        &mut proof_cursor_metrics.account_hashed_cursor,
1550    );
1551
1552    let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1553
1554    let mut storage_proof_receivers = ctx.storage_proof_receivers;
1555
1556    while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1557        match account_node {
1558            TrieElement::Branch(node) => {
1559                hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1560            }
1561            TrieElement::Leaf(hashed_address, account) => {
1562                let root = match storage_proof_receivers.remove(&hashed_address) {
1563                    Some(receiver) => {
1564                        let _guard = debug_span!(
1565                            target: "trie::proof_task",
1566                            "Waiting for storage proof",
1567                            ?hashed_address,
1568                        );
1569                        // Block on this specific storage proof receiver - enables interleaved
1570                        // parallelism
1571                        let proof_msg = receiver.recv().map_err(|_| {
1572                            ParallelStateRootError::StorageRoot(
1573                                reth_execution_errors::StorageRootError::Database(
1574                                    DatabaseError::Other(format!(
1575                                        "Storage proof channel closed for {hashed_address}"
1576                                    )),
1577                                ),
1578                            )
1579                        })?;
1580
1581                        drop(_guard);
1582
1583                        // Extract storage proof from the result
1584                        debug_assert_eq!(
1585                            proof_msg.hashed_address, hashed_address,
1586                            "storage worker must return same address"
1587                        );
1588                        let proof_result = proof_msg.result?;
1589                        let Some(root) = proof_result.root() else {
1590                            trace!(
1591                                target: "trie::proof_task",
1592                                ?proof_result,
1593                                "Received proof_result without root",
1594                            );
1595                            panic!("Partial proofs are not yet supported");
1596                        };
1597                        let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1598                            .expect("Partial proofs are not yet supported (into)");
1599                        collected_decoded_storages.insert(hashed_address, proof);
1600                        root
1601                    }
1602                    // Since we do not store all intermediate nodes in the database, there might
1603                    // be a possibility of re-adding a non-modified leaf to the hash builder.
1604                    None => {
1605                        tracker.inc_missed_leaves();
1606
1607                        match ctx.cached_storage_roots.entry(hashed_address) {
1608                            dashmap::Entry::Occupied(occ) => *occ.get(),
1609                            dashmap::Entry::Vacant(vac) => {
1610                                let root =
1611                                    StorageProof::new_hashed(provider, provider, hashed_address)
1612                                        .with_prefix_set_mut(Default::default())
1613                                        .with_trie_cursor_metrics(
1614                                            &mut proof_cursor_metrics.storage_trie_cursor,
1615                                        )
1616                                        .with_hashed_cursor_metrics(
1617                                            &mut proof_cursor_metrics.storage_hashed_cursor,
1618                                        )
1619                                        .storage_multiproof(
1620                                            ctx.targets
1621                                                .get(&hashed_address)
1622                                                .cloned()
1623                                                .unwrap_or_default(),
1624                                        )
1625                                        .map_err(|e| {
1626                                            ParallelStateRootError::StorageRoot(
1627                                                reth_execution_errors::StorageRootError::Database(
1628                                                    DatabaseError::Other(e.to_string()),
1629                                                ),
1630                                            )
1631                                        })?
1632                                        .root;
1633
1634                                vac.insert(root);
1635                                root
1636                            }
1637                        }
1638                    }
1639                };
1640
1641                // Encode account
1642                account_rlp.clear();
1643                let account = account.into_trie_account(root);
1644                account.encode(&mut account_rlp as &mut dyn BufMut);
1645
1646                hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1647            }
1648        }
1649    }
1650
1651    let _ = hash_builder.root();
1652
1653    let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1654    let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1655
1656    let branch_node_masks = if ctx.collect_branch_node_masks {
1657        let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1658        updated_branch_nodes
1659            .into_iter()
1660            .map(|(path, node)| {
1661                (path, BranchNodeMasks { hash_mask: node.hash_mask, tree_mask: node.tree_mask })
1662            })
1663            .collect()
1664    } else {
1665        BranchNodeMasksMap::default()
1666    };
1667
1668    // Consume remaining storage proof receivers for accounts not encountered during trie walk.
1669    // Done last to allow storage workers more time to complete while we finalized the account trie.
1670    for (hashed_address, receiver) in storage_proof_receivers {
1671        if let Ok(proof_msg) = receiver.recv() {
1672            let proof_result = proof_msg.result?;
1673            let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1674                .expect("Partial proofs are not yet supported");
1675            collected_decoded_storages.insert(hashed_address, proof);
1676        }
1677    }
1678
1679    Ok(DecodedMultiProof {
1680        account_subtree: decoded_account_subtree,
1681        branch_node_masks,
1682        storages: collected_decoded_storages,
1683    })
1684}
1685/// Queues storage proofs for all accounts in the targets and returns receivers.
1686///
1687/// This function queues all storage proof tasks to the worker pool but returns immediately
1688/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1689/// computation. This enables interleaved parallelism for better performance.
1690///
1691/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1692fn dispatch_storage_proofs(
1693    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1694    targets: &MultiProofTargets,
1695    storage_prefix_sets: &mut B256Map<PrefixSet>,
1696    with_branch_node_masks: bool,
1697    multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1698) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1699    let mut storage_proof_receivers =
1700        B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1701
1702    let mut sorted_targets: Vec<_> = targets.iter().collect();
1703    sorted_targets.sort_unstable_by_key(|(addr, _)| *addr);
1704
1705    // Dispatch all storage proofs to worker pool
1706    for (hashed_address, target_slots) in sorted_targets {
1707        // Create channel for receiving ProofResultMessage
1708        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1709
1710        // Create computation input based on V2 flag
1711        let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1712        let input = StorageProofInput::legacy(
1713            *hashed_address,
1714            prefix_set,
1715            target_slots.clone(),
1716            with_branch_node_masks,
1717            multi_added_removed_keys.cloned(),
1718        );
1719
1720        // Always dispatch a storage proof so we obtain the storage root even when no slots are
1721        // requested.
1722        storage_work_tx
1723            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1724            .map_err(|_| {
1725                ParallelStateRootError::Other(format!(
1726                    "Failed to queue storage proof for {}: storage worker pool unavailable",
1727                    hashed_address
1728                ))
1729            })?;
1730
1731        storage_proof_receivers.insert(*hashed_address, result_rx);
1732    }
1733
1734    Ok(storage_proof_receivers)
1735}
1736
1737/// Queues V2 storage proofs for all accounts in the targets and returns receivers.
1738///
1739/// This function queues all storage proof tasks to the worker pool but returns immediately
1740/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1741/// computation. This enables interleaved parallelism for better performance.
1742///
1743/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1744fn dispatch_v2_storage_proofs(
1745    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1746    account_targets: &Vec<proof_v2::Target>,
1747    storage_targets: B256Map<Vec<proof_v2::Target>>,
1748) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1749    let mut storage_proof_receivers =
1750        B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1751
1752    // Dispatch all proofs for targeted storage slots
1753    for (hashed_address, targets) in storage_targets {
1754        // Create channel for receiving StorageProofResultMessage
1755        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1756        let input = StorageProofInput::new(hashed_address, targets);
1757
1758        storage_work_tx
1759            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1760            .map_err(|_| {
1761                ParallelStateRootError::Other(format!(
1762                    "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1763                ))
1764            })?;
1765
1766        storage_proof_receivers.insert(hashed_address, result_rx);
1767    }
1768
1769    // If there are any targeted accounts which did not have storage targets then we generate a
1770    // single proof target for them so that we get their root.
1771    for target in account_targets {
1772        let hashed_address = target.key();
1773        if storage_proof_receivers.contains_key(&hashed_address) {
1774            continue
1775        }
1776
1777        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1778        let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]);
1779
1780        storage_work_tx
1781            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1782            .map_err(|_| {
1783                ParallelStateRootError::Other(format!(
1784                    "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1785                ))
1786            })?;
1787
1788        storage_proof_receivers.insert(hashed_address, result_rx);
1789    }
1790
1791    Ok(storage_proof_receivers)
1792}
1793
1794/// Input parameters for storage proof computation.
1795#[derive(Debug)]
1796pub enum StorageProofInput {
1797    /// Legacy storage proof variant
1798    Legacy {
1799        /// The hashed address for which the proof is calculated.
1800        hashed_address: B256,
1801        /// The prefix set for the proof calculation.
1802        prefix_set: PrefixSet,
1803        /// The target slots for the proof calculation.
1804        target_slots: B256Set,
1805        /// Whether or not to collect branch node masks
1806        with_branch_node_masks: bool,
1807        /// Provided by the user to give the necessary context to retain extra proofs.
1808        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1809    },
1810    /// V2 storage proof variant
1811    V2 {
1812        /// The hashed address for which the proof is calculated.
1813        hashed_address: B256,
1814        /// The set of proof targets
1815        targets: Vec<proof_v2::Target>,
1816    },
1817}
1818
1819impl StorageProofInput {
1820    /// Creates a legacy [`StorageProofInput`] with the given hashed address, prefix set, and target
1821    /// slots.
1822    pub const fn legacy(
1823        hashed_address: B256,
1824        prefix_set: PrefixSet,
1825        target_slots: B256Set,
1826        with_branch_node_masks: bool,
1827        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1828    ) -> Self {
1829        Self::Legacy {
1830            hashed_address,
1831            prefix_set,
1832            target_slots,
1833            with_branch_node_masks,
1834            multi_added_removed_keys,
1835        }
1836    }
1837
1838    /// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
1839    pub const fn new(hashed_address: B256, targets: Vec<proof_v2::Target>) -> Self {
1840        Self::V2 { hashed_address, targets }
1841    }
1842
1843    /// Returns the targeted hashed address.
1844    pub const fn hashed_address(&self) -> B256 {
1845        match self {
1846            Self::Legacy { hashed_address, .. } | Self::V2 { hashed_address, .. } => {
1847                *hashed_address
1848            }
1849        }
1850    }
1851}
1852
1853/// Input parameters for account multiproof computation.
1854#[derive(Debug)]
1855pub enum AccountMultiproofInput {
1856    /// Legacy account multiproof proof variant
1857    Legacy {
1858        /// The targets for which to compute the multiproof.
1859        targets: MultiProofTargets,
1860        /// The prefix sets for the proof calculation.
1861        prefix_sets: TriePrefixSets,
1862        /// Whether or not to collect branch node masks.
1863        collect_branch_node_masks: bool,
1864        /// Provided by the user to give the necessary context to retain extra proofs.
1865        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1866        /// Context for sending the proof result.
1867        proof_result_sender: ProofResultContext,
1868    },
1869    /// V2 account multiproof variant
1870    V2 {
1871        /// The targets for which to compute the multiproof.
1872        targets: MultiProofTargetsV2,
1873        /// Context for sending the proof result.
1874        proof_result_sender: ProofResultContext,
1875    },
1876}
1877
1878impl AccountMultiproofInput {
1879    /// Returns the [`ProofResultContext`] for this input, consuming the input.
1880    fn into_proof_result_sender(self) -> ProofResultContext {
1881        match self {
1882            Self::Legacy { proof_result_sender, .. } | Self::V2 { proof_result_sender, .. } => {
1883                proof_result_sender
1884            }
1885        }
1886    }
1887}
1888
1889/// Parameters for building an account multiproof with pre-computed storage roots.
1890struct AccountMultiproofParams<'a> {
1891    /// The targets for which to compute the multiproof.
1892    targets: &'a MultiProofTargets,
1893    /// The prefix set for the account trie walk.
1894    prefix_set: PrefixSet,
1895    /// Whether or not to collect branch node masks.
1896    collect_branch_node_masks: bool,
1897    /// Provided by the user to give the necessary context to retain extra proofs.
1898    multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1899    /// Receivers for storage proofs being computed in parallel.
1900    storage_proof_receivers: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
1901    /// Cached storage roots. This will be used to read storage roots for missed leaves, as well as
1902    /// to write calculated storage roots.
1903    cached_storage_roots: &'a DashMap<B256, B256>,
1904}
1905
1906/// Internal message for account workers.
1907#[derive(Debug)]
1908enum AccountWorkerJob {
1909    /// Account multiproof computation request
1910    AccountMultiproof {
1911        /// Account multiproof input parameters
1912        input: Box<AccountMultiproofInput>,
1913    },
1914    /// Blinded account node retrieval request
1915    BlindedAccountNode {
1916        /// Path to the account node
1917        path: Nibbles,
1918        /// Channel to send result back to original caller
1919        result_sender: Sender<TrieNodeProviderResult>,
1920    },
1921}
1922
1923#[cfg(test)]
1924mod tests {
1925    use super::*;
1926    use reth_provider::test_utils::create_test_provider_factory;
1927    use tokio::{runtime::Builder, task};
1928
1929    fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1930        ProofTaskCtx::new(factory)
1931    }
1932
1933    /// Ensures `ProofWorkerHandle::new` spawns workers correctly.
1934    #[test]
1935    fn spawn_proof_workers_creates_handle() {
1936        let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
1937        runtime.block_on(async {
1938            let handle = tokio::runtime::Handle::current();
1939            let provider_factory = create_test_provider_factory();
1940            let changeset_cache = reth_trie_db::ChangesetCache::new();
1941            let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1942                provider_factory,
1943                changeset_cache,
1944            );
1945            let ctx = test_ctx(factory);
1946
1947            let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3, false);
1948
1949            // Verify handle can be cloned
1950            let _cloned_handle = proof_handle.clone();
1951
1952            // Workers shut down automatically when handle is dropped
1953            drop(proof_handle);
1954            task::yield_now().await;
1955        });
1956    }
1957}