Skip to main content

reth_trie_parallel/
proof_task.rs

1//! Parallel proof computation using worker pools with dedicated database transactions.
2//!
3//!
4//! # Architecture
5//!
6//! - **Worker Pools**: Pre-spawned workers with dedicated database transactions
7//!   - Storage pool: Handles storage proofs and blinded storage node requests
8//!   - Account pool: Handles account multiproofs and blinded account node requests
9//! - **Direct Channel Access**: [`ProofWorkerHandle`] provides type-safe queue methods with direct
10//!   access to worker channels, eliminating routing overhead
11//! - **Automatic Shutdown**: Workers terminate gracefully when all handles are dropped
12//!
13//! # Message Flow
14//!
15//! 1. `MultiProofTask` prepares a storage or account job and hands it to [`ProofWorkerHandle`]. The
16//!    job carries a [`ProofResultContext`] so the worker knows how to send the result back.
17//! 2. A worker receives the job, runs the proof, and sends a [`ProofResultMessage`] through the
18//!    provided [`ProofResultSender`].
19//! 3. `MultiProofTask` receives the message, uses `sequence_number` to keep proofs in order, and
20//!    proceeds with its state-root logic.
21//!
22//! Each job gets its own direct channel so results go straight back to `MultiProofTask`. That keeps
23//! ordering decisions in one place and lets workers run independently.
24//!
25//! ```text
26//! MultiProofTask -> MultiproofManager -> ProofWorkerHandle -> Storage/Account Worker
27//!        ^                                          |
28//!        |                                          v
29//! ProofResultMessage <-------- ProofResultSender ---
30//! ```
31
32use crate::{
33    root::ParallelStateRootError,
34    stats::{ParallelTrieStats, ParallelTrieTracker},
35    targets_v2::MultiProofTargetsV2,
36    value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
37    StorageRootTargets,
38};
39use alloy_primitives::{
40    map::{B256Map, B256Set},
41    B256,
42};
43use alloy_rlp::{BufMut, Encodable};
44use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
45use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
46use reth_primitives_traits::dashmap::{self, DashMap};
47use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
48use reth_storage_errors::db::DatabaseError;
49use reth_tasks::Runtime;
50use reth_trie::{
51    hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
52    node_iter::{TrieElement, TrieNodeIter},
53    prefix_set::TriePrefixSets,
54    proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
55    proof_v2,
56    trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
57    walker::TrieWalker,
58    DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState,
59    MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
60};
61use reth_trie_common::{
62    added_removed_keys::MultiAddedRemovedKeys,
63    prefix_set::{PrefixSet, PrefixSetMut},
64    proof::{DecodedProofNodes, ProofRetainer},
65    BranchNodeMasks, BranchNodeMasksMap,
66};
67use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
68use std::{
69    cell::RefCell,
70    rc::Rc,
71    sync::{
72        atomic::{AtomicUsize, Ordering},
73        mpsc::{channel, Receiver, Sender},
74        Arc,
75    },
76    time::{Duration, Instant},
77};
78use tracing::{debug, debug_span, error, trace};
79
80#[cfg(feature = "metrics")]
81use crate::proof_task_metrics::{
82    ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
83};
84
85type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
86
87/// Type alias for the V2 account proof calculator.
88type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
89    <Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
90    <Provider as HashedCursorFactory>::AccountCursor<'a>,
91    AsyncAccountValueEncoder<
92        <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
93        <Provider as HashedCursorFactory>::StorageCursor<'a>,
94    >,
95>;
96
97/// Type alias for the V2 storage proof calculator.
98type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
99    <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
100    <Provider as HashedCursorFactory>::StorageCursor<'a>,
101>;
102
103/// A handle that provides type-safe access to proof worker pools.
104///
105/// The handle stores direct senders to both storage and account worker pools,
106/// eliminating the need for a routing thread. All handles share reference-counted
107/// channels, and workers shut down gracefully when all handles are dropped.
108#[derive(Debug, Clone)]
109pub struct ProofWorkerHandle {
110    /// Direct sender to storage worker pool
111    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
112    /// Direct sender to account worker pool
113    account_work_tx: CrossbeamSender<AccountWorkerJob>,
114    /// Counter tracking available storage workers. Workers decrement when starting work,
115    /// increment when finishing. Used to determine whether to chunk multiproofs.
116    storage_available_workers: Arc<AtomicUsize>,
117    /// Counter tracking available account workers. Workers decrement when starting work,
118    /// increment when finishing. Used to determine whether to chunk multiproofs.
119    account_available_workers: Arc<AtomicUsize>,
120    /// Total number of storage workers spawned
121    storage_worker_count: usize,
122    /// Total number of account workers spawned
123    account_worker_count: usize,
124    /// Whether V2 storage proofs are enabled
125    v2_proofs_enabled: bool,
126}
127
128impl ProofWorkerHandle {
129    /// Spawns storage and account worker pools with dedicated database transactions.
130    ///
131    /// Returns a handle for submitting proof tasks to the worker pools.
132    /// Workers run until the last handle is dropped.
133    ///
134    /// # Parameters
135    /// - `runtime`: The centralized runtime used to spawn blocking worker tasks
136    /// - `task_ctx`: Shared context with database view and prefix sets
137    /// - `v2_proofs_enabled`: Whether to enable V2 storage proofs
138    pub fn new<Factory>(
139        runtime: &Runtime,
140        task_ctx: ProofTaskCtx<Factory>,
141        v2_proofs_enabled: bool,
142    ) -> Self
143    where
144        Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
145            + Clone
146            + Send
147            + 'static,
148    {
149        let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
150        let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
151
152        let storage_available_workers = Arc::<AtomicUsize>::default();
153        let account_available_workers = Arc::<AtomicUsize>::default();
154
155        let cached_storage_roots = Arc::<DashMap<_, _>>::default();
156
157        let storage_worker_count = runtime.proof_storage_worker_pool().current_num_threads();
158        let account_worker_count = runtime.proof_account_worker_pool().current_num_threads();
159
160        debug!(
161            target: "trie::proof_task",
162            storage_worker_count,
163            account_worker_count,
164            ?v2_proofs_enabled,
165            "Spawning proof worker pools"
166        );
167
168        let storage_pool = runtime.proof_storage_worker_pool();
169        let task_ctx_for_storage = task_ctx.clone();
170        let cached_storage_roots_for_storage = cached_storage_roots.clone();
171
172        for worker_id in 0..storage_worker_count {
173            let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
174            let task_ctx_clone = task_ctx_for_storage.clone();
175            let work_rx_clone = storage_work_rx.clone();
176            let storage_available_workers_clone = storage_available_workers.clone();
177            let cached_storage_roots = cached_storage_roots_for_storage.clone();
178
179            storage_pool.spawn(move || {
180                #[cfg(feature = "metrics")]
181                let metrics = ProofTaskTrieMetrics::default();
182                #[cfg(feature = "metrics")]
183                let cursor_metrics = ProofTaskCursorMetrics::new();
184
185                let _guard = span.enter();
186                let worker = StorageProofWorker::new(
187                    task_ctx_clone,
188                    work_rx_clone,
189                    worker_id,
190                    storage_available_workers_clone,
191                    cached_storage_roots,
192                    #[cfg(feature = "metrics")]
193                    metrics,
194                    #[cfg(feature = "metrics")]
195                    cursor_metrics,
196                )
197                .with_v2_proofs(v2_proofs_enabled);
198                if let Err(error) = worker.run() {
199                    error!(
200                        target: "trie::proof_task",
201                        worker_id,
202                        ?error,
203                        "Storage worker failed"
204                    );
205                }
206            });
207        }
208
209        let account_pool = runtime.proof_account_worker_pool();
210
211        for worker_id in 0..account_worker_count {
212            let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
213            let task_ctx_clone = task_ctx.clone();
214            let work_rx_clone = account_work_rx.clone();
215            let storage_work_tx_clone = storage_work_tx.clone();
216            let account_available_workers_clone = account_available_workers.clone();
217            let cached_storage_roots = cached_storage_roots.clone();
218
219            account_pool.spawn(move || {
220                #[cfg(feature = "metrics")]
221                let metrics = ProofTaskTrieMetrics::default();
222                #[cfg(feature = "metrics")]
223                let cursor_metrics = ProofTaskCursorMetrics::new();
224
225                let _guard = span.enter();
226                let worker = AccountProofWorker::new(
227                    task_ctx_clone,
228                    work_rx_clone,
229                    worker_id,
230                    storage_work_tx_clone,
231                    account_available_workers_clone,
232                    cached_storage_roots,
233                    #[cfg(feature = "metrics")]
234                    metrics,
235                    #[cfg(feature = "metrics")]
236                    cursor_metrics,
237                )
238                .with_v2_proofs(v2_proofs_enabled);
239                if let Err(error) = worker.run() {
240                    error!(
241                        target: "trie::proof_task",
242                        worker_id,
243                        ?error,
244                        "Account worker failed"
245                    );
246                }
247            });
248        }
249
250        Self {
251            storage_work_tx,
252            account_work_tx,
253            storage_available_workers,
254            account_available_workers,
255            storage_worker_count,
256            account_worker_count,
257            v2_proofs_enabled,
258        }
259    }
260
261    /// Returns whether V2 storage proofs are enabled for this worker pool.
262    pub const fn v2_proofs_enabled(&self) -> bool {
263        self.v2_proofs_enabled
264    }
265
266    /// Returns how many storage workers are currently available/idle.
267    pub fn available_storage_workers(&self) -> usize {
268        self.storage_available_workers.load(Ordering::Relaxed)
269    }
270
271    /// Returns how many account workers are currently available/idle.
272    pub fn available_account_workers(&self) -> usize {
273        self.account_available_workers.load(Ordering::Relaxed)
274    }
275
276    /// Returns the number of pending storage tasks in the queue.
277    pub fn pending_storage_tasks(&self) -> usize {
278        self.storage_work_tx.len()
279    }
280
281    /// Returns the number of pending account tasks in the queue.
282    pub fn pending_account_tasks(&self) -> usize {
283        self.account_work_tx.len()
284    }
285
286    /// Returns the total number of storage workers in the pool.
287    pub const fn total_storage_workers(&self) -> usize {
288        self.storage_worker_count
289    }
290
291    /// Returns the total number of account workers in the pool.
292    pub const fn total_account_workers(&self) -> usize {
293        self.account_worker_count
294    }
295
296    /// Returns the number of storage workers currently processing tasks.
297    ///
298    /// This is calculated as total workers minus available workers.
299    pub fn active_storage_workers(&self) -> usize {
300        self.storage_worker_count.saturating_sub(self.available_storage_workers())
301    }
302
303    /// Returns the number of account workers currently processing tasks.
304    ///
305    /// This is calculated as total workers minus available workers.
306    pub fn active_account_workers(&self) -> usize {
307        self.account_worker_count.saturating_sub(self.available_account_workers())
308    }
309
310    /// Dispatch a storage proof computation to storage worker pool
311    ///
312    /// The result will be sent via the `proof_result_sender` channel.
313    pub fn dispatch_storage_proof(
314        &self,
315        input: StorageProofInput,
316        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
317    ) -> Result<(), ProviderError> {
318        let hashed_address = input.hashed_address();
319        self.storage_work_tx
320            .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
321            .map_err(|err| {
322                if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
323                    let _ = proof_result_sender.send(StorageProofResultMessage {
324                        hashed_address,
325                        result: Err(DatabaseError::Other(
326                            "storage workers unavailable".to_string(),
327                        )
328                        .into()),
329                    });
330                }
331
332                ProviderError::other(std::io::Error::other("storage workers unavailable"))
333            })
334    }
335
336    /// Dispatch an account multiproof computation
337    ///
338    /// The result will be sent via the `result_sender` channel included in the input.
339    pub fn dispatch_account_multiproof(
340        &self,
341        input: AccountMultiproofInput,
342    ) -> Result<(), ProviderError> {
343        self.account_work_tx
344            .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
345            .map_err(|err| {
346                let error =
347                    ProviderError::other(std::io::Error::other("account workers unavailable"));
348
349                if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
350                    let ProofResultContext {
351                        sender: result_tx,
352                        sequence_number: seq,
353                        state,
354                        start_time: start,
355                    } = input.into_proof_result_sender();
356
357                    let _ = result_tx.send(ProofResultMessage {
358                        sequence_number: seq,
359                        result: Err(ParallelStateRootError::Provider(error.clone())),
360                        elapsed: start.elapsed(),
361                        state,
362                    });
363                }
364
365                error
366            })
367    }
368
369    /// Dispatch blinded storage node request to storage worker pool
370    pub(crate) fn dispatch_blinded_storage_node(
371        &self,
372        account: B256,
373        path: Nibbles,
374    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
375        let (tx, rx) = channel();
376        self.storage_work_tx
377            .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
378            .map_err(|_| {
379                ProviderError::other(std::io::Error::other("storage workers unavailable"))
380            })?;
381
382        Ok(rx)
383    }
384
385    /// Dispatch blinded account node request to account worker pool
386    pub(crate) fn dispatch_blinded_account_node(
387        &self,
388        path: Nibbles,
389    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
390        let (tx, rx) = channel();
391        self.account_work_tx
392            .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
393            .map_err(|_| {
394                ProviderError::other(std::io::Error::other("account workers unavailable"))
395            })?;
396
397        Ok(rx)
398    }
399}
400
401/// Data used for initializing cursor factories that is shared across all proof worker instances.
402#[derive(Clone, Debug)]
403pub struct ProofTaskCtx<Factory> {
404    /// The factory for creating state providers.
405    factory: Factory,
406}
407
408impl<Factory> ProofTaskCtx<Factory> {
409    /// Creates a new [`ProofTaskCtx`] with the given factory.
410    pub const fn new(factory: Factory) -> Self {
411        Self { factory }
412    }
413}
414
415/// This contains all information shared between account proof worker instances.
416#[derive(Debug)]
417pub struct ProofTaskTx<Provider> {
418    /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
419    provider: Provider,
420
421    /// Identifier for the worker within the worker pool, used only for tracing.
422    id: usize,
423}
424
425impl<Provider> ProofTaskTx<Provider> {
426    /// Initializes a [`ProofTaskTx`] with the given provider and ID.
427    const fn new(provider: Provider, id: usize) -> Self {
428        Self { provider, id }
429    }
430}
431
432impl<Provider> ProofTaskTx<Provider>
433where
434    Provider: TrieCursorFactory + HashedCursorFactory,
435{
436    /// Compute storage proof.
437    ///
438    /// Used by storage workers in the worker pool to compute storage proofs.
439    #[inline]
440    fn compute_legacy_storage_proof(
441        &self,
442        input: StorageProofInput,
443        trie_cursor_metrics: &mut TrieCursorMetricsCache,
444        hashed_cursor_metrics: &mut HashedCursorMetricsCache,
445    ) -> Result<StorageProofResult, StateProofError> {
446        // Consume the input so we can move large collections (e.g. target slots) without cloning.
447        let StorageProofInput::Legacy {
448            hashed_address,
449            prefix_set,
450            target_slots,
451            with_branch_node_masks,
452            multi_added_removed_keys,
453        } = input
454        else {
455            panic!("compute_legacy_storage_proof only accepts StorageProofInput::Legacy")
456        };
457
458        // Get or create added/removed keys context
459        let multi_added_removed_keys =
460            multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
461        let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
462
463        let span = debug_span!(
464            target: "trie::proof_task",
465            "Storage proof calculation",
466            target_slots = ?target_slots.len(),
467        );
468        let _span_guard = span.enter();
469
470        let proof_start = Instant::now();
471
472        // Compute raw storage multiproof
473        let raw_proof_result =
474            StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
475                .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
476                .with_branch_node_masks(with_branch_node_masks)
477                .with_added_removed_keys(added_removed_keys)
478                .with_trie_cursor_metrics(trie_cursor_metrics)
479                .with_hashed_cursor_metrics(hashed_cursor_metrics)
480                .storage_multiproof(target_slots);
481        trie_cursor_metrics.record_span("trie_cursor");
482        hashed_cursor_metrics.record_span("hashed_cursor");
483
484        // Decode proof into DecodedStorageMultiProof
485        let decoded_result =
486            raw_proof_result.and_then(|raw_proof| raw_proof.try_into().map_err(Into::into))?;
487
488        trace!(
489            target: "trie::proof_task",
490            hashed_address = ?hashed_address,
491            proof_time_us = proof_start.elapsed().as_micros(),
492            worker_id = self.id,
493            "Completed storage proof calculation"
494        );
495
496        Ok(StorageProofResult::Legacy { proof: decoded_result })
497    }
498
499    fn compute_v2_storage_proof(
500        &self,
501        input: StorageProofInput,
502        calculator: &mut proof_v2::StorageProofCalculator<
503            <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
504            <Provider as HashedCursorFactory>::StorageCursor<'_>,
505        >,
506    ) -> Result<StorageProofResult, StateProofError> {
507        let StorageProofInput::V2 { hashed_address, mut targets } = input else {
508            panic!("compute_v2_storage_proof only accepts StorageProofInput::V2")
509        };
510
511        let span = debug_span!(
512            target: "trie::proof_task",
513            "V2 Storage proof calculation",
514            targets = ?targets.len(),
515        );
516        let _span_guard = span.enter();
517
518        let proof_start = Instant::now();
519
520        // If targets is empty it means the caller only wants the root node.
521        let proof = if targets.is_empty() {
522            let root_node = calculator.storage_root_node(hashed_address)?;
523            vec![root_node]
524        } else {
525            calculator.storage_proof(hashed_address, &mut targets)?
526        };
527
528        let root = calculator.compute_root_hash(&proof)?;
529
530        trace!(
531            target: "trie::proof_task",
532            hashed_address = ?hashed_address,
533            proof_time_us = proof_start.elapsed().as_micros(),
534            ?root,
535            worker_id = self.id,
536            "Completed V2 storage proof calculation"
537        );
538
539        Ok(StorageProofResult::V2 { proof, root })
540    }
541
542    /// Process a blinded storage node request.
543    ///
544    /// Used by storage workers to retrieve blinded storage trie nodes for proof construction.
545    fn process_blinded_storage_node(
546        &self,
547        account: B256,
548        path: &Nibbles,
549    ) -> TrieNodeProviderResult {
550        let storage_node_provider =
551            ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
552        storage_node_provider.trie_node(path)
553    }
554}
555impl TrieNodeProviderFactory for ProofWorkerHandle {
556    type AccountNodeProvider = ProofTaskTrieNodeProvider;
557    type StorageNodeProvider = ProofTaskTrieNodeProvider;
558
559    fn account_node_provider(&self) -> Self::AccountNodeProvider {
560        ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
561    }
562
563    fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
564        ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
565    }
566}
567
568/// Trie node provider for retrieving trie nodes by path.
569#[derive(Debug)]
570pub enum ProofTaskTrieNodeProvider {
571    /// Blinded account trie node provider.
572    AccountNode {
573        /// Handle to the proof worker pools.
574        handle: ProofWorkerHandle,
575    },
576    /// Blinded storage trie node provider.
577    StorageNode {
578        /// Target account.
579        account: B256,
580        /// Handle to the proof worker pools.
581        handle: ProofWorkerHandle,
582    },
583}
584
585impl TrieNodeProvider for ProofTaskTrieNodeProvider {
586    fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
587        match self {
588            Self::AccountNode { handle } => {
589                let rx = handle
590                    .dispatch_blinded_account_node(*path)
591                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
592                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
593            }
594            Self::StorageNode { handle, account } => {
595                let rx = handle
596                    .dispatch_blinded_storage_node(*account, *path)
597                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
598                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
599            }
600        }
601    }
602}
603
604/// Result of a multiproof calculation.
605#[derive(Debug)]
606pub enum ProofResult {
607    /// Legacy multiproof calculation result.
608    Legacy(DecodedMultiProof, ParallelTrieStats),
609    /// V2 multiproof calculation result.
610    V2(DecodedMultiProofV2),
611}
612
613impl ProofResult {
614    /// Creates an empty [`ProofResult`] of the appropriate variant based on `v2_enabled`.
615    ///
616    /// Use this when constructing empty proofs (e.g., for state updates where all targets
617    /// were already fetched) to ensure consistency with the proof version being used.
618    pub fn empty(v2_enabled: bool) -> Self {
619        if v2_enabled {
620            Self::V2(DecodedMultiProofV2::default())
621        } else {
622            let stats = ParallelTrieTracker::default().finish();
623            Self::Legacy(DecodedMultiProof::default(), stats)
624        }
625    }
626
627    /// Returns true if the result contains no proofs
628    pub fn is_empty(&self) -> bool {
629        match self {
630            Self::Legacy(proof, _) => proof.is_empty(),
631            Self::V2(proof) => proof.is_empty(),
632        }
633    }
634
635    /// Extends the receiver with the value of the given results.
636    ///
637    /// # Panics
638    ///
639    /// This method panics if the two [`ProofResult`]s are not the same variant.
640    pub fn extend(&mut self, other: Self) {
641        match (self, other) {
642            (Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other),
643            (Self::V2(proof), Self::V2(other)) => proof.extend(other),
644            _ => panic!("mismatched ProofResults, cannot extend one with the other"),
645        }
646    }
647
648    /// Returns the number of account proofs.
649    pub fn account_proofs_len(&self) -> usize {
650        match self {
651            Self::Legacy(proof, _) => proof.account_subtree.len(),
652            Self::V2(proof) => proof.account_proofs.len(),
653        }
654    }
655
656    /// Returns the total number of storage proofs
657    pub fn storage_proofs_len(&self) -> usize {
658        match self {
659            Self::Legacy(proof, _) => {
660                proof.storages.values().map(|p| p.subtree.len()).sum::<usize>()
661            }
662            Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::<usize>(),
663        }
664    }
665}
666
667/// Channel used by worker threads to deliver `ProofResultMessage` items back to
668/// `MultiProofTask`.
669///
670/// Workers use this sender to deliver proof results directly to `MultiProofTask`.
671pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
672
673/// Message containing a completed proof result with metadata for direct delivery to
674/// `MultiProofTask`.
675///
676/// This type enables workers to send proof results directly to the `MultiProofTask` event loop.
677#[derive(Debug)]
678pub struct ProofResultMessage {
679    /// Sequence number for ordering proofs
680    pub sequence_number: u64,
681    /// The proof calculation result (either account multiproof or storage proof)
682    pub result: Result<ProofResult, ParallelStateRootError>,
683    /// Time taken for the entire proof calculation (from dispatch to completion)
684    pub elapsed: Duration,
685    /// Original state update that triggered this proof
686    pub state: HashedPostState,
687}
688
689/// Context for sending proof calculation results back to `MultiProofTask`.
690///
691/// This struct contains all context needed to send and track proof calculation results.
692/// Workers use this to deliver completed proofs back to the main event loop.
693#[derive(Debug, Clone)]
694pub struct ProofResultContext {
695    /// Channel sender for result delivery
696    pub sender: ProofResultSender,
697    /// Sequence number for proof ordering
698    pub sequence_number: u64,
699    /// Original state update that triggered this proof
700    pub state: HashedPostState,
701    /// Calculation start time for measuring elapsed duration
702    pub start_time: Instant,
703}
704
705impl ProofResultContext {
706    /// Creates a new proof result context.
707    pub const fn new(
708        sender: ProofResultSender,
709        sequence_number: u64,
710        state: HashedPostState,
711        start_time: Instant,
712    ) -> Self {
713        Self { sender, sequence_number, state, start_time }
714    }
715}
716
717/// The results of a storage proof calculation.
718#[derive(Debug)]
719pub(crate) enum StorageProofResult {
720    Legacy {
721        /// The storage multiproof
722        proof: DecodedStorageMultiProof,
723    },
724    V2 {
725        /// The calculated V2 proof nodes
726        proof: Vec<ProofTrieNode>,
727        /// The storage root calculated by the V2 proof
728        root: Option<B256>,
729    },
730}
731
732impl StorageProofResult {
733    /// Returns the calculated root of the trie, if one can be calculated from the proof.
734    const fn root(&self) -> Option<B256> {
735        match self {
736            Self::Legacy { proof } => Some(proof.root),
737            Self::V2 { root, .. } => *root,
738        }
739    }
740}
741
742impl From<StorageProofResult> for Option<DecodedStorageMultiProof> {
743    /// Returns None if the V2 proof result doesn't have a calculated root hash.
744    fn from(proof_result: StorageProofResult) -> Self {
745        match proof_result {
746            StorageProofResult::Legacy { proof } => Some(proof),
747            StorageProofResult::V2 { proof, root } => root.map(|root| {
748                let branch_node_masks = proof
749                    .iter()
750                    .filter_map(|node| node.masks.map(|masks| (node.path, masks)))
751                    .collect();
752                let subtree = proof.into_iter().map(|node| (node.path, node.node)).collect();
753                DecodedStorageMultiProof { root, subtree, branch_node_masks }
754            }),
755        }
756    }
757}
758
759/// Message containing a completed storage proof result with metadata.
760#[derive(Debug)]
761pub struct StorageProofResultMessage {
762    /// The hashed address this storage proof belongs to
763    pub(crate) hashed_address: B256,
764    /// The storage proof calculation result
765    pub(crate) result: Result<StorageProofResult, StateProofError>,
766}
767
768/// Internal message for storage workers.
769#[derive(Debug)]
770pub(crate) enum StorageWorkerJob {
771    /// Storage proof computation request
772    StorageProof {
773        /// Storage proof input parameters
774        input: StorageProofInput,
775        /// Context for sending the proof result.
776        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
777    },
778    /// Blinded storage node retrieval request
779    BlindedStorageNode {
780        /// Target account
781        account: B256,
782        /// Path to the storage node
783        path: Nibbles,
784        /// Channel to send result back to original caller
785        result_sender: Sender<TrieNodeProviderResult>,
786    },
787}
788
789/// Worker for storage trie operations.
790///
791/// Each worker maintains a dedicated database transaction and processes
792/// storage proof requests and blinded node lookups.
793struct StorageProofWorker<Factory> {
794    /// Shared task context with database factory and prefix sets
795    task_ctx: ProofTaskCtx<Factory>,
796    /// Channel for receiving work
797    work_rx: CrossbeamReceiver<StorageWorkerJob>,
798    /// Unique identifier for this worker (used for tracing)
799    worker_id: usize,
800    /// Counter tracking worker availability
801    available_workers: Arc<AtomicUsize>,
802    /// Cached storage roots
803    cached_storage_roots: Arc<DashMap<B256, B256>>,
804    /// Metrics collector for this worker
805    #[cfg(feature = "metrics")]
806    metrics: ProofTaskTrieMetrics,
807    /// Cursor metrics for this worker
808    #[cfg(feature = "metrics")]
809    cursor_metrics: ProofTaskCursorMetrics,
810    /// Set to true if V2 proofs are enabled.
811    v2_enabled: bool,
812}
813
814impl<Factory> StorageProofWorker<Factory>
815where
816    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
817{
818    /// Creates a new storage proof worker.
819    const fn new(
820        task_ctx: ProofTaskCtx<Factory>,
821        work_rx: CrossbeamReceiver<StorageWorkerJob>,
822        worker_id: usize,
823        available_workers: Arc<AtomicUsize>,
824        cached_storage_roots: Arc<DashMap<B256, B256>>,
825        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
826        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
827    ) -> Self {
828        Self {
829            task_ctx,
830            work_rx,
831            worker_id,
832            available_workers,
833            cached_storage_roots,
834            #[cfg(feature = "metrics")]
835            metrics,
836            #[cfg(feature = "metrics")]
837            cursor_metrics,
838            v2_enabled: false,
839        }
840    }
841
842    /// Changes whether or not V2 proofs are enabled.
843    const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
844        self.v2_enabled = v2_enabled;
845        self
846    }
847
848    /// Runs the worker loop, processing jobs until the channel closes.
849    ///
850    /// # Lifecycle
851    ///
852    /// 1. Initializes database provider and transaction
853    /// 2. Advertises availability
854    /// 3. Processes jobs in a loop:
855    ///    - Receives job from channel
856    ///    - Marks worker as busy
857    ///    - Processes the job
858    ///    - Marks worker as available
859    /// 4. Shuts down when channel closes
860    ///
861    /// # Panic Safety
862    ///
863    /// If this function panics, the worker thread terminates but other workers
864    /// continue operating and the system degrades gracefully.
865    fn run(mut self) -> ProviderResult<()> {
866        // Create provider from factory
867        let provider = self.task_ctx.factory.database_provider_ro()?;
868        let proof_tx = ProofTaskTx::new(provider, self.worker_id);
869
870        trace!(
871            target: "trie::proof_task",
872            worker_id = self.worker_id,
873            "Storage worker started"
874        );
875
876        let mut storage_proofs_processed = 0u64;
877        let mut storage_nodes_processed = 0u64;
878        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
879        let mut v2_calculator = if self.v2_enabled {
880            let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
881            let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
882            Some(proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor))
883        } else {
884            None
885        };
886
887        // Initially mark this worker as available.
888        self.available_workers.fetch_add(1, Ordering::Relaxed);
889
890        let mut total_idle_time = Duration::ZERO;
891        let mut idle_start = Instant::now();
892
893        while let Ok(job) = self.work_rx.recv() {
894            total_idle_time += idle_start.elapsed();
895
896            // Mark worker as busy.
897            self.available_workers.fetch_sub(1, Ordering::Relaxed);
898
899            match job {
900                StorageWorkerJob::StorageProof { input, proof_result_sender } => {
901                    self.process_storage_proof(
902                        &proof_tx,
903                        v2_calculator.as_mut(),
904                        input,
905                        proof_result_sender,
906                        &mut storage_proofs_processed,
907                        &mut cursor_metrics_cache,
908                    );
909                }
910
911                StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
912                    Self::process_blinded_node(
913                        self.worker_id,
914                        &proof_tx,
915                        account,
916                        path,
917                        result_sender,
918                        &mut storage_nodes_processed,
919                    );
920                }
921            }
922
923            // Mark worker as available again.
924            self.available_workers.fetch_add(1, Ordering::Relaxed);
925
926            idle_start = Instant::now();
927        }
928
929        trace!(
930            target: "trie::proof_task",
931            worker_id = self.worker_id,
932            storage_proofs_processed,
933            storage_nodes_processed,
934            total_idle_time_us = total_idle_time.as_micros(),
935            "Storage worker shutting down"
936        );
937
938        #[cfg(feature = "metrics")]
939        {
940            self.metrics.record_storage_nodes(storage_nodes_processed as usize);
941            self.metrics.record_storage_worker_idle_time(total_idle_time);
942            self.cursor_metrics.record(&mut cursor_metrics_cache);
943        }
944
945        Ok(())
946    }
947
948    /// Processes a storage proof request.
949    fn process_storage_proof<Provider>(
950        &self,
951        proof_tx: &ProofTaskTx<Provider>,
952        v2_calculator: Option<
953            &mut proof_v2::StorageProofCalculator<
954                <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
955                <Provider as HashedCursorFactory>::StorageCursor<'_>,
956            >,
957        >,
958        input: StorageProofInput,
959        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
960        storage_proofs_processed: &mut u64,
961        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
962    ) where
963        Provider: TrieCursorFactory + HashedCursorFactory,
964    {
965        let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
966        let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
967        let hashed_address = input.hashed_address();
968        let proof_start = Instant::now();
969
970        let result = match &input {
971            StorageProofInput::Legacy { hashed_address, prefix_set, target_slots, .. } => {
972                trace!(
973                    target: "trie::proof_task",
974                    worker_id = self.worker_id,
975                    hashed_address = ?hashed_address,
976                    prefix_set_len = prefix_set.len(),
977                    target_slots_len = target_slots.len(),
978                    "Processing storage proof"
979                );
980
981                proof_tx.compute_legacy_storage_proof(
982                    input,
983                    &mut trie_cursor_metrics,
984                    &mut hashed_cursor_metrics,
985                )
986            }
987            StorageProofInput::V2 { hashed_address, targets } => {
988                trace!(
989                    target: "trie::proof_task",
990                    worker_id = self.worker_id,
991                    hashed_address = ?hashed_address,
992                    targets_len = targets.len(),
993                    "Processing V2 storage proof"
994                );
995                proof_tx
996                    .compute_v2_storage_proof(input, v2_calculator.expect("v2 calculator provided"))
997            }
998        };
999
1000        let proof_elapsed = proof_start.elapsed();
1001        *storage_proofs_processed += 1;
1002
1003        let root = result.as_ref().ok().and_then(|result| result.root());
1004
1005        if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
1006            trace!(
1007                target: "trie::proof_task",
1008                worker_id = self.worker_id,
1009                hashed_address = ?hashed_address,
1010                storage_proofs_processed,
1011                "Proof result receiver dropped, discarding result"
1012            );
1013        }
1014
1015        if let Some(root) = root {
1016            self.cached_storage_roots.insert(hashed_address, root);
1017        }
1018
1019        trace!(
1020            target: "trie::proof_task",
1021            worker_id = self.worker_id,
1022            hashed_address = ?hashed_address,
1023            proof_time_us = proof_elapsed.as_micros(),
1024            total_processed = storage_proofs_processed,
1025            trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
1026            hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
1027            ?trie_cursor_metrics,
1028            ?hashed_cursor_metrics,
1029            ?root,
1030            "Storage proof completed"
1031        );
1032
1033        #[cfg(feature = "metrics")]
1034        {
1035            // Accumulate per-proof metrics into the worker's cache
1036            let per_proof_cache = ProofTaskCursorMetricsCache {
1037                account_trie_cursor: TrieCursorMetricsCache::default(),
1038                account_hashed_cursor: HashedCursorMetricsCache::default(),
1039                storage_trie_cursor: trie_cursor_metrics,
1040                storage_hashed_cursor: hashed_cursor_metrics,
1041            };
1042            cursor_metrics_cache.extend(&per_proof_cache);
1043        }
1044    }
1045
1046    /// Processes a blinded storage node lookup request.
1047    fn process_blinded_node<Provider>(
1048        worker_id: usize,
1049        proof_tx: &ProofTaskTx<Provider>,
1050        account: B256,
1051        path: Nibbles,
1052        result_sender: Sender<TrieNodeProviderResult>,
1053        storage_nodes_processed: &mut u64,
1054    ) where
1055        Provider: TrieCursorFactory + HashedCursorFactory,
1056    {
1057        trace!(
1058            target: "trie::proof_task",
1059            worker_id,
1060            ?account,
1061            ?path,
1062            "Processing blinded storage node"
1063        );
1064
1065        let start = Instant::now();
1066        let result = proof_tx.process_blinded_storage_node(account, &path);
1067        let elapsed = start.elapsed();
1068
1069        *storage_nodes_processed += 1;
1070
1071        if result_sender.send(result).is_err() {
1072            trace!(
1073                target: "trie::proof_task",
1074                worker_id,
1075                ?account,
1076                ?path,
1077                storage_nodes_processed,
1078                "Blinded storage node receiver dropped, discarding result"
1079            );
1080        }
1081
1082        trace!(
1083            target: "trie::proof_task",
1084            worker_id,
1085            ?account,
1086            ?path,
1087            elapsed_us = elapsed.as_micros(),
1088            total_processed = storage_nodes_processed,
1089            "Blinded storage node completed"
1090        );
1091    }
1092}
1093
1094/// Worker for account trie operations.
1095///
1096/// Each worker maintains a dedicated database transaction and processes
1097/// account multiproof requests and blinded node lookups.
1098struct AccountProofWorker<Factory> {
1099    /// Shared task context with database factory and prefix sets
1100    task_ctx: ProofTaskCtx<Factory>,
1101    /// Channel for receiving work
1102    work_rx: CrossbeamReceiver<AccountWorkerJob>,
1103    /// Unique identifier for this worker (used for tracing)
1104    worker_id: usize,
1105    /// Channel for dispatching storage proof work (for pre-dispatched target proofs)
1106    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1107    /// Counter tracking worker availability
1108    available_workers: Arc<AtomicUsize>,
1109    /// Cached storage roots
1110    cached_storage_roots: Arc<DashMap<B256, B256>>,
1111    /// Metrics collector for this worker
1112    #[cfg(feature = "metrics")]
1113    metrics: ProofTaskTrieMetrics,
1114    /// Cursor metrics for this worker
1115    #[cfg(feature = "metrics")]
1116    cursor_metrics: ProofTaskCursorMetrics,
1117    /// Set to true if V2 proofs are enabled.
1118    v2_enabled: bool,
1119}
1120
1121impl<Factory> AccountProofWorker<Factory>
1122where
1123    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
1124{
1125    /// Creates a new account proof worker.
1126    #[allow(clippy::too_many_arguments)]
1127    const fn new(
1128        task_ctx: ProofTaskCtx<Factory>,
1129        work_rx: CrossbeamReceiver<AccountWorkerJob>,
1130        worker_id: usize,
1131        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1132        available_workers: Arc<AtomicUsize>,
1133        cached_storage_roots: Arc<DashMap<B256, B256>>,
1134        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
1135        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
1136    ) -> Self {
1137        Self {
1138            task_ctx,
1139            work_rx,
1140            worker_id,
1141            storage_work_tx,
1142            available_workers,
1143            cached_storage_roots,
1144            #[cfg(feature = "metrics")]
1145            metrics,
1146            #[cfg(feature = "metrics")]
1147            cursor_metrics,
1148            v2_enabled: false,
1149        }
1150    }
1151
1152    /// Changes whether or not V2 proofs are enabled.
1153    const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
1154        self.v2_enabled = v2_enabled;
1155        self
1156    }
1157
1158    /// Runs the worker loop, processing jobs until the channel closes.
1159    ///
1160    /// # Lifecycle
1161    ///
1162    /// 1. Initializes database provider and transaction
1163    /// 2. Advertises availability
1164    /// 3. Processes jobs in a loop:
1165    ///    - Receives job from channel
1166    ///    - Marks worker as busy
1167    ///    - Processes the job
1168    ///    - Marks worker as available
1169    /// 4. Shuts down when channel closes
1170    ///
1171    /// # Panic Safety
1172    ///
1173    /// If this function panics, the worker thread terminates but other workers
1174    /// continue operating and the system degrades gracefully.
1175    fn run(mut self) -> ProviderResult<()> {
1176        let provider = self.task_ctx.factory.database_provider_ro()?;
1177
1178        trace!(
1179            target: "trie::proof_task",
1180            worker_id=self.worker_id,
1181            "Account worker started"
1182        );
1183
1184        let mut account_proofs_processed = 0u64;
1185        let mut account_nodes_processed = 0u64;
1186        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1187
1188        // Create both account and storage calculators for V2 proofs.
1189        // The storage calculator is wrapped in Rc<RefCell<...>> for sharing with value encoders.
1190        let (mut v2_account_calculator, v2_storage_calculator) = if self.v2_enabled {
1191            let account_trie_cursor = provider.account_trie_cursor()?;
1192            let account_hashed_cursor = provider.hashed_account_cursor()?;
1193
1194            let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
1195            let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
1196
1197            (
1198                Some(proof_v2::ProofCalculator::<
1199                    _,
1200                    _,
1201                    AsyncAccountValueEncoder<
1202                        <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
1203                        <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
1204                    >,
1205                >::new(account_trie_cursor, account_hashed_cursor)),
1206                Some(Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
1207                    storage_trie_cursor,
1208                    storage_hashed_cursor,
1209                )))),
1210            )
1211        } else {
1212            (None, None)
1213        };
1214
1215        // Count this worker as available only after successful initialization.
1216        self.available_workers.fetch_add(1, Ordering::Relaxed);
1217
1218        let mut total_idle_time = Duration::ZERO;
1219        let mut idle_start = Instant::now();
1220        let mut value_encoder_stats_cache = ValueEncoderStats::default();
1221
1222        while let Ok(job) = self.work_rx.recv() {
1223            total_idle_time += idle_start.elapsed();
1224
1225            // Mark worker as busy.
1226            self.available_workers.fetch_sub(1, Ordering::Relaxed);
1227
1228            match job {
1229                AccountWorkerJob::AccountMultiproof { input } => {
1230                    let value_encoder_stats = self.process_account_multiproof(
1231                        &provider,
1232                        v2_account_calculator.as_mut(),
1233                        v2_storage_calculator.clone(),
1234                        *input,
1235                        &mut account_proofs_processed,
1236                        &mut cursor_metrics_cache,
1237                    );
1238                    total_idle_time += value_encoder_stats.storage_wait_time;
1239                    value_encoder_stats_cache.extend(&value_encoder_stats);
1240                }
1241
1242                AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1243                    Self::process_blinded_node(
1244                        self.worker_id,
1245                        &provider,
1246                        path,
1247                        result_sender,
1248                        &mut account_nodes_processed,
1249                    );
1250                }
1251            }
1252
1253            // Mark worker as available again.
1254            self.available_workers.fetch_add(1, Ordering::Relaxed);
1255
1256            idle_start = Instant::now();
1257        }
1258
1259        trace!(
1260            target: "trie::proof_task",
1261            worker_id=self.worker_id,
1262            account_proofs_processed,
1263            account_nodes_processed,
1264            total_idle_time_us = total_idle_time.as_micros(),
1265            "Account worker shutting down"
1266        );
1267
1268        #[cfg(feature = "metrics")]
1269        {
1270            self.metrics.record_account_nodes(account_nodes_processed as usize);
1271            self.metrics.record_account_worker_idle_time(total_idle_time);
1272            self.cursor_metrics.record(&mut cursor_metrics_cache);
1273            self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
1274        }
1275
1276        Ok(())
1277    }
1278
1279    fn compute_legacy_account_multiproof<Provider>(
1280        &self,
1281        provider: &Provider,
1282        targets: MultiProofTargets,
1283        mut prefix_sets: TriePrefixSets,
1284        collect_branch_node_masks: bool,
1285        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1286        proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1287    ) -> Result<(ProofResult, Duration), ParallelStateRootError>
1288    where
1289        Provider: TrieCursorFactory + HashedCursorFactory,
1290    {
1291        let span = debug_span!(
1292            target: "trie::proof_task",
1293            "Account multiproof calculation",
1294            targets = targets.len(),
1295            num_slots = targets.values().map(|slots| slots.len()).sum::<usize>(),
1296        );
1297        let _span_guard = span.enter();
1298
1299        trace!(
1300            target: "trie::proof_task",
1301            "Processing account multiproof"
1302        );
1303
1304        let mut tracker = ParallelTrieTracker::default();
1305
1306        let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1307
1308        let storage_root_targets_len =
1309            StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1310
1311        tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1312
1313        let storage_proof_receivers = dispatch_storage_proofs(
1314            &self.storage_work_tx,
1315            &targets,
1316            &mut storage_prefix_sets,
1317            collect_branch_node_masks,
1318            multi_added_removed_keys.as_ref(),
1319        )?;
1320
1321        let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1322
1323        let ctx = AccountMultiproofParams {
1324            targets: &targets,
1325            prefix_set: account_prefix_set,
1326            collect_branch_node_masks,
1327            multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1328            storage_proof_receivers,
1329            cached_storage_roots: &self.cached_storage_roots,
1330        };
1331
1332        let mut storage_wait_time = Duration::ZERO;
1333        let result = build_account_multiproof_with_storage_roots(
1334            provider,
1335            ctx,
1336            &mut tracker,
1337            proof_cursor_metrics,
1338            &mut storage_wait_time,
1339        )?;
1340
1341        let stats = tracker.finish();
1342        Ok((ProofResult::Legacy(result, stats), storage_wait_time))
1343    }
1344
1345    fn compute_v2_account_multiproof<'a, Provider>(
1346        &self,
1347        v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1348        v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1349        targets: MultiProofTargetsV2,
1350    ) -> Result<(ProofResult, ValueEncoderStats), ParallelStateRootError>
1351    where
1352        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1353    {
1354        let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1355
1356        let span = debug_span!(
1357            target: "trie::proof_task",
1358            "Account V2 multiproof calculation",
1359            account_targets = account_targets.len(),
1360            storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1361        );
1362        let _span_guard = span.enter();
1363
1364        trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1365
1366        let storage_proof_receivers =
1367            dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1368
1369        let mut value_encoder = AsyncAccountValueEncoder::new(
1370            storage_proof_receivers,
1371            self.cached_storage_roots.clone(),
1372            v2_storage_calculator,
1373        );
1374
1375        let account_proofs =
1376            v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
1377
1378        let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
1379
1380        let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
1381
1382        Ok((ProofResult::V2(proof), value_encoder_stats))
1383    }
1384
1385    /// Processes an account multiproof request.
1386    ///
1387    /// Returns stats from the value encoder used during proof computation.
1388    fn process_account_multiproof<'a, Provider>(
1389        &self,
1390        provider: &Provider,
1391        v2_account_calculator: Option<&mut V2AccountProofCalculator<'a, Provider>>,
1392        v2_storage_calculator: Option<Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>>,
1393        input: AccountMultiproofInput,
1394        account_proofs_processed: &mut u64,
1395        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1396    ) -> ValueEncoderStats
1397    where
1398        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1399    {
1400        let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
1401        let proof_start = Instant::now();
1402
1403        let (proof_result_sender, result, value_encoder_stats) = match input {
1404            AccountMultiproofInput::Legacy {
1405                targets,
1406                prefix_sets,
1407                collect_branch_node_masks,
1408                multi_added_removed_keys,
1409                proof_result_sender,
1410            } => {
1411                let (result, value_encoder_stats) = match self.compute_legacy_account_multiproof(
1412                    provider,
1413                    targets,
1414                    prefix_sets,
1415                    collect_branch_node_masks,
1416                    multi_added_removed_keys,
1417                    &mut proof_cursor_metrics,
1418                ) {
1419                    Ok((proof, wait_time)) => (
1420                        Ok(proof),
1421                        ValueEncoderStats { storage_wait_time: wait_time, ..Default::default() },
1422                    ),
1423                    Err(e) => (Err(e), ValueEncoderStats::default()),
1424                };
1425                (proof_result_sender, result, value_encoder_stats)
1426            }
1427            AccountMultiproofInput::V2 { targets, proof_result_sender } => {
1428                let (result, value_encoder_stats) = match self
1429                    .compute_v2_account_multiproof::<Provider>(
1430                        v2_account_calculator.expect("v2 account calculator provided"),
1431                        v2_storage_calculator.expect("v2 storage calculator provided"),
1432                        targets,
1433                    ) {
1434                    Ok((proof, stats)) => (Ok(proof), stats),
1435                    Err(e) => (Err(e), ValueEncoderStats::default()),
1436                };
1437                (proof_result_sender, result, value_encoder_stats)
1438            }
1439        };
1440
1441        let ProofResultContext {
1442            sender: result_tx,
1443            sequence_number: seq,
1444            state,
1445            start_time: start,
1446        } = proof_result_sender;
1447
1448        let proof_elapsed = proof_start.elapsed();
1449        let total_elapsed = start.elapsed();
1450        *account_proofs_processed += 1;
1451
1452        // Send result to MultiProofTask
1453        if result_tx
1454            .send(ProofResultMessage {
1455                sequence_number: seq,
1456                result,
1457                elapsed: total_elapsed,
1458                state,
1459            })
1460            .is_err()
1461        {
1462            trace!(
1463                target: "trie::proof_task",
1464                worker_id=self.worker_id,
1465                account_proofs_processed,
1466                "Account multiproof receiver dropped, discarding result"
1467            );
1468        }
1469
1470        proof_cursor_metrics.record_spans();
1471
1472        trace!(
1473            target: "trie::proof_task",
1474            proof_time_us = proof_elapsed.as_micros(),
1475            total_elapsed_us = total_elapsed.as_micros(),
1476            total_processed = account_proofs_processed,
1477            account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1478            account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1479            storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1480            storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1481            account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1482            account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1483            storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1484            storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1485            "Account multiproof completed"
1486        );
1487
1488        #[cfg(feature = "metrics")]
1489        // Accumulate per-proof metrics into the worker's cache
1490        cursor_metrics_cache.extend(&proof_cursor_metrics);
1491
1492        value_encoder_stats
1493    }
1494
1495    /// Processes a blinded account node lookup request.
1496    fn process_blinded_node<Provider>(
1497        worker_id: usize,
1498        provider: &Provider,
1499        path: Nibbles,
1500        result_sender: Sender<TrieNodeProviderResult>,
1501        account_nodes_processed: &mut u64,
1502    ) where
1503        Provider: TrieCursorFactory + HashedCursorFactory,
1504    {
1505        let span = debug_span!(
1506            target: "trie::proof_task",
1507            "Blinded account node calculation",
1508            ?path,
1509        );
1510        let _span_guard = span.enter();
1511
1512        trace!(
1513            target: "trie::proof_task",
1514            "Processing blinded account node"
1515        );
1516
1517        let start = Instant::now();
1518        let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
1519        let result = account_node_provider.trie_node(&path);
1520        let elapsed = start.elapsed();
1521
1522        *account_nodes_processed += 1;
1523
1524        if result_sender.send(result).is_err() {
1525            trace!(
1526                target: "trie::proof_task",
1527                worker_id,
1528                ?path,
1529                account_nodes_processed,
1530                "Blinded account node receiver dropped, discarding result"
1531            );
1532        }
1533
1534        trace!(
1535            target: "trie::proof_task",
1536            node_time_us = elapsed.as_micros(),
1537            total_processed = account_nodes_processed,
1538            "Blinded account node completed"
1539        );
1540    }
1541}
1542
1543/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk.
1544///
1545/// This is a helper function used by account workers to build the account subtree proof
1546/// while storage proofs are still being computed. Receivers are consumed only when needed,
1547/// enabling interleaved parallelism between account trie traversal and storage proof computation.
1548///
1549/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs.
1550/// Also accumulates the time spent waiting for storage proofs into `storage_wait_time`.
1551fn build_account_multiproof_with_storage_roots<P>(
1552    provider: &P,
1553    ctx: AccountMultiproofParams<'_>,
1554    tracker: &mut ParallelTrieTracker,
1555    proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
1556    storage_wait_time: &mut Duration,
1557) -> Result<DecodedMultiProof, ParallelStateRootError>
1558where
1559    P: TrieCursorFactory + HashedCursorFactory,
1560{
1561    let accounts_added_removed_keys =
1562        ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1563
1564    // Wrap account trie cursor with instrumented cursor
1565    let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1566    let account_trie_cursor = InstrumentedTrieCursor::new(
1567        account_trie_cursor,
1568        &mut proof_cursor_metrics.account_trie_cursor,
1569    );
1570
1571    // Create the walker.
1572    let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1573        .with_added_removed_keys(accounts_added_removed_keys)
1574        .with_deletions_retained(true);
1575
1576    // Create a hash builder to rebuild the root node since it is not available in the database.
1577    let retainer = ctx
1578        .targets
1579        .keys()
1580        .map(Nibbles::unpack)
1581        .collect::<ProofRetainer>()
1582        .with_added_removed_keys(accounts_added_removed_keys);
1583    let mut hash_builder = HashBuilder::default()
1584        .with_proof_retainer(retainer)
1585        .with_updates(ctx.collect_branch_node_masks);
1586
1587    // Initialize storage multiproofs map with pre-allocated capacity.
1588    // Proofs will be inserted as they're consumed from receivers during trie walk.
1589    let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1590        B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1591    let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1592
1593    // Wrap account hashed cursor with instrumented cursor
1594    let account_hashed_cursor =
1595        provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1596    let account_hashed_cursor = InstrumentedHashedCursor::new(
1597        account_hashed_cursor,
1598        &mut proof_cursor_metrics.account_hashed_cursor,
1599    );
1600
1601    let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1602
1603    let mut storage_proof_receivers = ctx.storage_proof_receivers;
1604
1605    while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1606        match account_node {
1607            TrieElement::Branch(node) => {
1608                hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1609            }
1610            TrieElement::Leaf(hashed_address, account) => {
1611                let root = match storage_proof_receivers.remove(&hashed_address) {
1612                    Some(receiver) => {
1613                        let _guard = debug_span!(
1614                            target: "trie::proof_task",
1615                            "Waiting for storage proof",
1616                        );
1617                        // Block on this specific storage proof receiver - enables interleaved
1618                        // parallelism
1619                        let wait_start = Instant::now();
1620                        let proof_msg = receiver.recv().map_err(|_| {
1621                            ParallelStateRootError::StorageRoot(
1622                                reth_execution_errors::StorageRootError::Database(
1623                                    DatabaseError::Other(format!(
1624                                        "Storage proof channel closed for {hashed_address}"
1625                                    )),
1626                                ),
1627                            )
1628                        })?;
1629                        *storage_wait_time += wait_start.elapsed();
1630
1631                        drop(_guard);
1632
1633                        // Extract storage proof from the result
1634                        debug_assert_eq!(
1635                            proof_msg.hashed_address, hashed_address,
1636                            "storage worker must return same address"
1637                        );
1638                        let proof_result = proof_msg.result?;
1639                        let Some(root) = proof_result.root() else {
1640                            trace!(
1641                                target: "trie::proof_task",
1642                                ?proof_result,
1643                                "Received proof_result without root",
1644                            );
1645                            panic!("Partial proofs are not yet supported");
1646                        };
1647                        let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1648                            .expect("Partial proofs are not yet supported (into)");
1649                        collected_decoded_storages.insert(hashed_address, proof);
1650                        root
1651                    }
1652                    // Since we do not store all intermediate nodes in the database, there might
1653                    // be a possibility of re-adding a non-modified leaf to the hash builder.
1654                    None => {
1655                        tracker.inc_missed_leaves();
1656
1657                        match ctx.cached_storage_roots.entry(hashed_address) {
1658                            dashmap::Entry::Occupied(occ) => *occ.get(),
1659                            dashmap::Entry::Vacant(vac) => {
1660                                let root =
1661                                    StorageProof::new_hashed(provider, provider, hashed_address)
1662                                        .with_prefix_set_mut(Default::default())
1663                                        .with_trie_cursor_metrics(
1664                                            &mut proof_cursor_metrics.storage_trie_cursor,
1665                                        )
1666                                        .with_hashed_cursor_metrics(
1667                                            &mut proof_cursor_metrics.storage_hashed_cursor,
1668                                        )
1669                                        .storage_multiproof(
1670                                            ctx.targets
1671                                                .get(&hashed_address)
1672                                                .cloned()
1673                                                .unwrap_or_default(),
1674                                        )
1675                                        .map_err(|e| {
1676                                            ParallelStateRootError::StorageRoot(
1677                                                reth_execution_errors::StorageRootError::Database(
1678                                                    DatabaseError::Other(e.to_string()),
1679                                                ),
1680                                            )
1681                                        })?
1682                                        .root;
1683
1684                                vac.insert(root);
1685                                root
1686                            }
1687                        }
1688                    }
1689                };
1690
1691                // Encode account
1692                account_rlp.clear();
1693                let account = account.into_trie_account(root);
1694                account.encode(&mut account_rlp as &mut dyn BufMut);
1695
1696                hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1697            }
1698        }
1699    }
1700
1701    let _ = hash_builder.root();
1702
1703    let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1704    let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1705
1706    let branch_node_masks = if ctx.collect_branch_node_masks {
1707        let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1708        updated_branch_nodes
1709            .into_iter()
1710            .map(|(path, node)| {
1711                (path, BranchNodeMasks { hash_mask: node.hash_mask, tree_mask: node.tree_mask })
1712            })
1713            .collect()
1714    } else {
1715        BranchNodeMasksMap::default()
1716    };
1717
1718    // Consume remaining storage proof receivers for accounts not encountered during trie walk.
1719    // Done last to allow storage workers more time to complete while we finalized the account trie.
1720    for (hashed_address, receiver) in storage_proof_receivers {
1721        let wait_start = Instant::now();
1722        if let Ok(proof_msg) = receiver.recv() {
1723            *storage_wait_time += wait_start.elapsed();
1724            let proof_result = proof_msg.result?;
1725            let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
1726                .expect("Partial proofs are not yet supported");
1727            collected_decoded_storages.insert(hashed_address, proof);
1728        }
1729    }
1730
1731    Ok(DecodedMultiProof {
1732        account_subtree: decoded_account_subtree,
1733        branch_node_masks,
1734        storages: collected_decoded_storages,
1735    })
1736}
1737/// Queues storage proofs for all accounts in the targets and returns receivers.
1738///
1739/// This function queues all storage proof tasks to the worker pool but returns immediately
1740/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1741/// computation. This enables interleaved parallelism for better performance.
1742///
1743/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1744fn dispatch_storage_proofs(
1745    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1746    targets: &MultiProofTargets,
1747    storage_prefix_sets: &mut B256Map<PrefixSet>,
1748    with_branch_node_masks: bool,
1749    multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1750) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1751    let mut storage_proof_receivers =
1752        B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1753
1754    let mut sorted_targets: Vec<_> = targets.iter().collect();
1755    sorted_targets.sort_unstable_by_key(|(addr, _)| *addr);
1756
1757    // Dispatch all storage proofs to worker pool
1758    for (hashed_address, target_slots) in sorted_targets {
1759        // Create channel for receiving ProofResultMessage
1760        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1761
1762        // Create computation input based on V2 flag
1763        let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1764        let input = StorageProofInput::legacy(
1765            *hashed_address,
1766            prefix_set,
1767            target_slots.clone(),
1768            with_branch_node_masks,
1769            multi_added_removed_keys.cloned(),
1770        );
1771
1772        // Always dispatch a storage proof so we obtain the storage root even when no slots are
1773        // requested.
1774        storage_work_tx
1775            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1776            .map_err(|_| {
1777                ParallelStateRootError::Other(format!(
1778                    "Failed to queue storage proof for {}: storage worker pool unavailable",
1779                    hashed_address
1780                ))
1781            })?;
1782
1783        storage_proof_receivers.insert(*hashed_address, result_rx);
1784    }
1785
1786    Ok(storage_proof_receivers)
1787}
1788
1789/// Queues V2 storage proofs for all accounts in the targets and returns receivers.
1790///
1791/// This function queues all storage proof tasks to the worker pool but returns immediately
1792/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1793/// computation. This enables interleaved parallelism for better performance.
1794///
1795/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1796fn dispatch_v2_storage_proofs(
1797    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1798    account_targets: &Vec<proof_v2::Target>,
1799    mut storage_targets: B256Map<Vec<proof_v2::Target>>,
1800) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1801    let mut storage_proof_receivers =
1802        B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1803
1804    // Collect hashed addresses from account targets that need their storage roots computed
1805    let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1806
1807    // For storage targets with associated account proofs, ensure the first target has
1808    // min_len(0) so the root node is returned for storage root computation
1809    for (hashed_address, targets) in &mut storage_targets {
1810        if account_target_addresses.contains(hashed_address) &&
1811            let Some(first) = targets.first_mut()
1812        {
1813            *first = first.with_min_len(0);
1814        }
1815    }
1816
1817    // Sort storage targets by address for optimal dispatch order.
1818    // Since trie walk processes accounts in lexicographical order, dispatching in the same order
1819    // reduces head-of-line blocking when consuming results.
1820    let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1821    sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1822
1823    // Dispatch all proofs for targeted storage slots
1824    for (hashed_address, targets) in sorted_storage_targets {
1825        // Create channel for receiving StorageProofResultMessage
1826        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1827        let input = StorageProofInput::new(hashed_address, targets);
1828
1829        storage_work_tx
1830            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1831            .map_err(|_| {
1832                ParallelStateRootError::Other(format!(
1833                    "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1834                ))
1835            })?;
1836
1837        storage_proof_receivers.insert(hashed_address, result_rx);
1838    }
1839
1840    // If there are any targeted accounts which did not have storage targets then we generate a
1841    // single proof target for them so that we get their root.
1842    for target in account_targets {
1843        let hashed_address = target.key();
1844        if storage_proof_receivers.contains_key(&hashed_address) {
1845            continue
1846        }
1847
1848        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1849        let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]);
1850
1851        storage_work_tx
1852            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1853            .map_err(|_| {
1854                ParallelStateRootError::Other(format!(
1855                    "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1856                ))
1857            })?;
1858
1859        storage_proof_receivers.insert(hashed_address, result_rx);
1860    }
1861
1862    Ok(storage_proof_receivers)
1863}
1864
1865/// Input parameters for storage proof computation.
1866#[derive(Debug)]
1867pub enum StorageProofInput {
1868    /// Legacy storage proof variant
1869    Legacy {
1870        /// The hashed address for which the proof is calculated.
1871        hashed_address: B256,
1872        /// The prefix set for the proof calculation.
1873        prefix_set: PrefixSet,
1874        /// The target slots for the proof calculation.
1875        target_slots: B256Set,
1876        /// Whether or not to collect branch node masks
1877        with_branch_node_masks: bool,
1878        /// Provided by the user to give the necessary context to retain extra proofs.
1879        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1880    },
1881    /// V2 storage proof variant
1882    V2 {
1883        /// The hashed address for which the proof is calculated.
1884        hashed_address: B256,
1885        /// The set of proof targets
1886        targets: Vec<proof_v2::Target>,
1887    },
1888}
1889
1890impl StorageProofInput {
1891    /// Creates a legacy [`StorageProofInput`] with the given hashed address, prefix set, and target
1892    /// slots.
1893    pub const fn legacy(
1894        hashed_address: B256,
1895        prefix_set: PrefixSet,
1896        target_slots: B256Set,
1897        with_branch_node_masks: bool,
1898        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1899    ) -> Self {
1900        Self::Legacy {
1901            hashed_address,
1902            prefix_set,
1903            target_slots,
1904            with_branch_node_masks,
1905            multi_added_removed_keys,
1906        }
1907    }
1908
1909    /// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
1910    pub const fn new(hashed_address: B256, targets: Vec<proof_v2::Target>) -> Self {
1911        Self::V2 { hashed_address, targets }
1912    }
1913
1914    /// Returns the targeted hashed address.
1915    pub const fn hashed_address(&self) -> B256 {
1916        match self {
1917            Self::Legacy { hashed_address, .. } | Self::V2 { hashed_address, .. } => {
1918                *hashed_address
1919            }
1920        }
1921    }
1922}
1923
1924/// Input parameters for account multiproof computation.
1925#[derive(Debug)]
1926pub enum AccountMultiproofInput {
1927    /// Legacy account multiproof proof variant
1928    Legacy {
1929        /// The targets for which to compute the multiproof.
1930        targets: MultiProofTargets,
1931        /// The prefix sets for the proof calculation.
1932        prefix_sets: TriePrefixSets,
1933        /// Whether or not to collect branch node masks.
1934        collect_branch_node_masks: bool,
1935        /// Provided by the user to give the necessary context to retain extra proofs.
1936        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1937        /// Context for sending the proof result.
1938        proof_result_sender: ProofResultContext,
1939    },
1940    /// V2 account multiproof variant
1941    V2 {
1942        /// The targets for which to compute the multiproof.
1943        targets: MultiProofTargetsV2,
1944        /// Context for sending the proof result.
1945        proof_result_sender: ProofResultContext,
1946    },
1947}
1948
1949impl AccountMultiproofInput {
1950    /// Returns the [`ProofResultContext`] for this input, consuming the input.
1951    fn into_proof_result_sender(self) -> ProofResultContext {
1952        match self {
1953            Self::Legacy { proof_result_sender, .. } | Self::V2 { proof_result_sender, .. } => {
1954                proof_result_sender
1955            }
1956        }
1957    }
1958}
1959
1960/// Parameters for building an account multiproof with pre-computed storage roots.
1961struct AccountMultiproofParams<'a> {
1962    /// The targets for which to compute the multiproof.
1963    targets: &'a MultiProofTargets,
1964    /// The prefix set for the account trie walk.
1965    prefix_set: PrefixSet,
1966    /// Whether or not to collect branch node masks.
1967    collect_branch_node_masks: bool,
1968    /// Provided by the user to give the necessary context to retain extra proofs.
1969    multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1970    /// Receivers for storage proofs being computed in parallel.
1971    storage_proof_receivers: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
1972    /// Cached storage roots. This will be used to read storage roots for missed leaves, as well as
1973    /// to write calculated storage roots.
1974    cached_storage_roots: &'a DashMap<B256, B256>,
1975}
1976
1977/// Internal message for account workers.
1978#[derive(Debug)]
1979enum AccountWorkerJob {
1980    /// Account multiproof computation request
1981    AccountMultiproof {
1982        /// Account multiproof input parameters
1983        input: Box<AccountMultiproofInput>,
1984    },
1985    /// Blinded account node retrieval request
1986    BlindedAccountNode {
1987        /// Path to the account node
1988        path: Nibbles,
1989        /// Channel to send result back to original caller
1990        result_sender: Sender<TrieNodeProviderResult>,
1991    },
1992}
1993
1994#[cfg(test)]
1995mod tests {
1996    use super::*;
1997    use reth_provider::test_utils::create_test_provider_factory;
1998
1999    fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
2000        ProofTaskCtx::new(factory)
2001    }
2002
2003    /// Ensures `ProofWorkerHandle::new` spawns workers correctly.
2004    #[test]
2005    fn spawn_proof_workers_creates_handle() {
2006        let provider_factory = create_test_provider_factory();
2007        let changeset_cache = reth_trie_db::ChangesetCache::new();
2008        let factory = reth_provider::providers::OverlayStateProviderFactory::new(
2009            provider_factory,
2010            changeset_cache,
2011        );
2012        let ctx = test_ctx(factory);
2013
2014        let runtime = reth_tasks::Runtime::test();
2015        let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
2016
2017        // Verify handle can be cloned
2018        let _cloned_handle = proof_handle.clone();
2019
2020        // Workers shut down automatically when handle is dropped
2021        drop(proof_handle);
2022    }
2023}