reth_trie_parallel/
proof_task.rs

1//! Parallel proof computation using worker pools with dedicated database transactions.
2//!
3//!
4//! # Architecture
5//!
6//! - **Worker Pools**: Pre-spawned workers with dedicated database transactions
7//!   - Storage pool: Handles storage proofs and blinded storage node requests
8//!   - Account pool: Handles account multiproofs and blinded account node requests
9//! - **Direct Channel Access**: [`ProofWorkerHandle`] provides type-safe queue methods with direct
10//!   access to worker channels, eliminating routing overhead
11//! - **Automatic Shutdown**: Workers terminate gracefully when all handles are dropped
12//!
13//! # Message Flow
14//!
15//! 1. `MultiProofTask` prepares a storage or account job and hands it to [`ProofWorkerHandle`]. The
16//!    job carries a [`ProofResultContext`] so the worker knows how to send the result back.
17//! 2. A worker receives the job, runs the proof, and sends a [`ProofResultMessage`] through the
18//!    provided [`ProofResultSender`].
19//! 3. `MultiProofTask` receives the message, uses `sequence_number` to keep proofs in order, and
20//!    proceeds with its state-root logic.
21//!
22//! Each job gets its own direct channel so results go straight back to `MultiProofTask`. That keeps
23//! ordering decisions in one place and lets workers run independently.
24//!
25//! ```text
26//! MultiProofTask -> MultiproofManager -> ProofWorkerHandle -> Storage/Account Worker
27//!        ^                                          |
28//!        |                                          v
29//! ProofResultMessage <-------- ProofResultSender ---
30//! ```
31
32use crate::{
33    root::ParallelStateRootError,
34    stats::{ParallelTrieStats, ParallelTrieTracker},
35    StorageRootTargets,
36};
37use alloy_primitives::{
38    map::{B256Map, B256Set},
39    B256,
40};
41use alloy_rlp::{BufMut, Encodable};
42use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
43use dashmap::DashMap;
44use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
45use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
46use reth_storage_errors::db::DatabaseError;
47use reth_trie::{
48    hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
49    node_iter::{TrieElement, TrieNodeIter},
50    prefix_set::TriePrefixSets,
51    proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
52    trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
53    walker::TrieWalker,
54    DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets,
55    Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE,
56};
57use reth_trie_common::{
58    added_removed_keys::MultiAddedRemovedKeys,
59    prefix_set::{PrefixSet, PrefixSetMut},
60    proof::{DecodedProofNodes, ProofRetainer},
61};
62use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
63use std::{
64    sync::{
65        atomic::{AtomicUsize, Ordering},
66        mpsc::{channel, Receiver, Sender},
67        Arc,
68    },
69    time::{Duration, Instant},
70};
71use tokio::runtime::Handle;
72use tracing::{debug, debug_span, error, trace};
73
74#[cfg(feature = "metrics")]
75use crate::proof_task_metrics::{
76    ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
77};
78
79type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
80type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
81
82/// A handle that provides type-safe access to proof worker pools.
83///
84/// The handle stores direct senders to both storage and account worker pools,
85/// eliminating the need for a routing thread. All handles share reference-counted
86/// channels, and workers shut down gracefully when all handles are dropped.
87#[derive(Debug, Clone)]
88pub struct ProofWorkerHandle {
89    /// Direct sender to storage worker pool
90    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
91    /// Direct sender to account worker pool
92    account_work_tx: CrossbeamSender<AccountWorkerJob>,
93    /// Counter tracking available storage workers. Workers decrement when starting work,
94    /// increment when finishing. Used to determine whether to chunk multiproofs.
95    storage_available_workers: Arc<AtomicUsize>,
96    /// Counter tracking available account workers. Workers decrement when starting work,
97    /// increment when finishing. Used to determine whether to chunk multiproofs.
98    account_available_workers: Arc<AtomicUsize>,
99    /// Total number of storage workers spawned
100    storage_worker_count: usize,
101    /// Total number of account workers spawned
102    account_worker_count: usize,
103}
104
105impl ProofWorkerHandle {
106    /// Spawns storage and account worker pools with dedicated database transactions.
107    ///
108    /// Returns a handle for submitting proof tasks to the worker pools.
109    /// Workers run until the last handle is dropped.
110    ///
111    /// # Parameters
112    /// - `executor`: Tokio runtime handle for spawning blocking tasks
113    /// - `task_ctx`: Shared context with database view and prefix sets
114    /// - `storage_worker_count`: Number of storage workers to spawn
115    /// - `account_worker_count`: Number of account workers to spawn
116    pub fn new<Factory>(
117        executor: Handle,
118        task_ctx: ProofTaskCtx<Factory>,
119        storage_worker_count: usize,
120        account_worker_count: usize,
121    ) -> Self
122    where
123        Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
124            + Clone
125            + Send
126            + 'static,
127    {
128        let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
129        let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
130
131        // Initialize availability counters at zero. Each worker will increment when it
132        // successfully initializes, ensuring only healthy workers are counted.
133        let storage_available_workers = Arc::new(AtomicUsize::new(0));
134        let account_available_workers = Arc::new(AtomicUsize::new(0));
135
136        debug!(
137            target: "trie::proof_task",
138            storage_worker_count,
139            account_worker_count,
140            "Spawning proof worker pools"
141        );
142
143        let parent_span =
144            debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
145                .entered();
146        // Spawn storage workers
147        for worker_id in 0..storage_worker_count {
148            let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
149            let task_ctx_clone = task_ctx.clone();
150            let work_rx_clone = storage_work_rx.clone();
151            let storage_available_workers_clone = storage_available_workers.clone();
152
153            executor.spawn_blocking(move || {
154                #[cfg(feature = "metrics")]
155                let metrics = ProofTaskTrieMetrics::default();
156                #[cfg(feature = "metrics")]
157                let cursor_metrics = ProofTaskCursorMetrics::new();
158
159                let _guard = span.enter();
160                let worker = StorageProofWorker::new(
161                    task_ctx_clone,
162                    work_rx_clone,
163                    worker_id,
164                    storage_available_workers_clone,
165                    #[cfg(feature = "metrics")]
166                    metrics,
167                    #[cfg(feature = "metrics")]
168                    cursor_metrics,
169                );
170                if let Err(error) = worker.run() {
171                    error!(
172                        target: "trie::proof_task",
173                        worker_id,
174                        ?error,
175                        "Storage worker failed"
176                    );
177                }
178            });
179        }
180        drop(parent_span);
181
182        let parent_span =
183            debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
184                .entered();
185        // Spawn account workers
186        for worker_id in 0..account_worker_count {
187            let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
188            let task_ctx_clone = task_ctx.clone();
189            let work_rx_clone = account_work_rx.clone();
190            let storage_work_tx_clone = storage_work_tx.clone();
191            let account_available_workers_clone = account_available_workers.clone();
192
193            executor.spawn_blocking(move || {
194                #[cfg(feature = "metrics")]
195                let metrics = ProofTaskTrieMetrics::default();
196                #[cfg(feature = "metrics")]
197                let cursor_metrics = ProofTaskCursorMetrics::new();
198
199                let _guard = span.enter();
200                let worker = AccountProofWorker::new(
201                    task_ctx_clone,
202                    work_rx_clone,
203                    worker_id,
204                    storage_work_tx_clone,
205                    account_available_workers_clone,
206                    #[cfg(feature = "metrics")]
207                    metrics,
208                    #[cfg(feature = "metrics")]
209                    cursor_metrics,
210                );
211                if let Err(error) = worker.run() {
212                    error!(
213                        target: "trie::proof_task",
214                        worker_id,
215                        ?error,
216                        "Account worker failed"
217                    );
218                }
219            });
220        }
221        drop(parent_span);
222
223        Self {
224            storage_work_tx,
225            account_work_tx,
226            storage_available_workers,
227            account_available_workers,
228            storage_worker_count,
229            account_worker_count,
230        }
231    }
232
233    /// Returns how many storage workers are currently available/idle.
234    pub fn available_storage_workers(&self) -> usize {
235        self.storage_available_workers.load(Ordering::Relaxed)
236    }
237
238    /// Returns how many account workers are currently available/idle.
239    pub fn available_account_workers(&self) -> usize {
240        self.account_available_workers.load(Ordering::Relaxed)
241    }
242
243    /// Returns the number of pending storage tasks in the queue.
244    pub fn pending_storage_tasks(&self) -> usize {
245        self.storage_work_tx.len()
246    }
247
248    /// Returns the number of pending account tasks in the queue.
249    pub fn pending_account_tasks(&self) -> usize {
250        self.account_work_tx.len()
251    }
252
253    /// Returns the total number of storage workers in the pool.
254    pub const fn total_storage_workers(&self) -> usize {
255        self.storage_worker_count
256    }
257
258    /// Returns the total number of account workers in the pool.
259    pub const fn total_account_workers(&self) -> usize {
260        self.account_worker_count
261    }
262
263    /// Returns the number of storage workers currently processing tasks.
264    ///
265    /// This is calculated as total workers minus available workers.
266    pub fn active_storage_workers(&self) -> usize {
267        self.storage_worker_count.saturating_sub(self.available_storage_workers())
268    }
269
270    /// Returns the number of account workers currently processing tasks.
271    ///
272    /// This is calculated as total workers minus available workers.
273    pub fn active_account_workers(&self) -> usize {
274        self.account_worker_count.saturating_sub(self.available_account_workers())
275    }
276
277    /// Dispatch a storage proof computation to storage worker pool
278    ///
279    /// The result will be sent via the `proof_result_sender` channel.
280    pub fn dispatch_storage_proof(
281        &self,
282        input: StorageProofInput,
283        proof_result_sender: ProofResultContext,
284    ) -> Result<(), ProviderError> {
285        self.storage_work_tx
286            .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
287            .map_err(|err| {
288                let error =
289                    ProviderError::other(std::io::Error::other("storage workers unavailable"));
290
291                if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
292                    let ProofResultContext {
293                        sender: result_tx,
294                        sequence_number: seq,
295                        state,
296                        start_time: start,
297                    } = proof_result_sender;
298
299                    let _ = result_tx.send(ProofResultMessage {
300                        sequence_number: seq,
301                        result: Err(ParallelStateRootError::Provider(error.clone())),
302                        elapsed: start.elapsed(),
303                        state,
304                    });
305                }
306
307                error
308            })
309    }
310
311    /// Dispatch an account multiproof computation
312    ///
313    /// The result will be sent via the `result_sender` channel included in the input.
314    pub fn dispatch_account_multiproof(
315        &self,
316        input: AccountMultiproofInput,
317    ) -> Result<(), ProviderError> {
318        self.account_work_tx
319            .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
320            .map_err(|err| {
321                let error =
322                    ProviderError::other(std::io::Error::other("account workers unavailable"));
323
324                if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
325                    let AccountMultiproofInput {
326                        proof_result_sender:
327                            ProofResultContext {
328                                sender: result_tx,
329                                sequence_number: seq,
330                                state,
331                                start_time: start,
332                            },
333                        ..
334                    } = *input;
335
336                    let _ = result_tx.send(ProofResultMessage {
337                        sequence_number: seq,
338                        result: Err(ParallelStateRootError::Provider(error.clone())),
339                        elapsed: start.elapsed(),
340                        state,
341                    });
342                }
343
344                error
345            })
346    }
347
348    /// Dispatch blinded storage node request to storage worker pool
349    pub(crate) fn dispatch_blinded_storage_node(
350        &self,
351        account: B256,
352        path: Nibbles,
353    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
354        let (tx, rx) = channel();
355        self.storage_work_tx
356            .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
357            .map_err(|_| {
358                ProviderError::other(std::io::Error::other("storage workers unavailable"))
359            })?;
360
361        Ok(rx)
362    }
363
364    /// Dispatch blinded account node request to account worker pool
365    pub(crate) fn dispatch_blinded_account_node(
366        &self,
367        path: Nibbles,
368    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
369        let (tx, rx) = channel();
370        self.account_work_tx
371            .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
372            .map_err(|_| {
373                ProviderError::other(std::io::Error::other("account workers unavailable"))
374            })?;
375
376        Ok(rx)
377    }
378}
379
380/// Data used for initializing cursor factories that is shared across all storage proof instances.
381#[derive(Clone, Debug)]
382pub struct ProofTaskCtx<Factory> {
383    /// The factory for creating state providers.
384    factory: Factory,
385}
386
387impl<Factory> ProofTaskCtx<Factory> {
388    /// Creates a new [`ProofTaskCtx`] with the given factory.
389    pub const fn new(factory: Factory) -> Self {
390        Self { factory }
391    }
392}
393
394/// This contains all information shared between all storage proof instances.
395#[derive(Debug)]
396pub struct ProofTaskTx<Provider> {
397    /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
398    provider: Provider,
399
400    /// Identifier for the worker within the worker pool, used only for tracing.
401    id: usize,
402}
403
404impl<Provider> ProofTaskTx<Provider> {
405    /// Initializes a [`ProofTaskTx`] with the given provider and ID.
406    const fn new(provider: Provider, id: usize) -> Self {
407        Self { provider, id }
408    }
409}
410
411impl<Provider> ProofTaskTx<Provider>
412where
413    Provider: TrieCursorFactory + HashedCursorFactory,
414{
415    /// Compute storage proof.
416    ///
417    /// Used by storage workers in the worker pool to compute storage proofs.
418    #[inline]
419    fn compute_storage_proof(
420        &self,
421        input: StorageProofInput,
422        trie_cursor_metrics: &mut TrieCursorMetricsCache,
423        hashed_cursor_metrics: &mut HashedCursorMetricsCache,
424    ) -> StorageProofResult {
425        // Consume the input so we can move large collections (e.g. target slots) without cloning.
426        let StorageProofInput {
427            hashed_address,
428            prefix_set,
429            target_slots,
430            with_branch_node_masks,
431            multi_added_removed_keys,
432        } = input;
433
434        // Get or create added/removed keys context
435        let multi_added_removed_keys =
436            multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
437        let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
438
439        let span = debug_span!(
440            target: "trie::proof_task",
441            "Storage proof calculation",
442            ?hashed_address,
443            target_slots = ?target_slots.len(),
444            worker_id = self.id,
445        );
446        let _span_guard = span.enter();
447
448        let proof_start = Instant::now();
449
450        // Compute raw storage multiproof
451        let raw_proof_result =
452            StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
453                .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
454                .with_branch_node_masks(with_branch_node_masks)
455                .with_added_removed_keys(added_removed_keys)
456                .with_trie_cursor_metrics(trie_cursor_metrics)
457                .with_hashed_cursor_metrics(hashed_cursor_metrics)
458                .storage_multiproof(target_slots)
459                .map_err(|e| ParallelStateRootError::Other(e.to_string()));
460        trie_cursor_metrics.record_span("trie_cursor");
461        hashed_cursor_metrics.record_span("hashed_cursor");
462
463        // Decode proof into DecodedStorageMultiProof
464        let decoded_result = raw_proof_result.and_then(|raw_proof| {
465            raw_proof.try_into().map_err(|e: alloy_rlp::Error| {
466                ParallelStateRootError::Other(format!(
467                    "Failed to decode storage proof for {}: {}",
468                    hashed_address, e
469                ))
470            })
471        });
472
473        trace!(
474            target: "trie::proof_task",
475            hashed_address = ?hashed_address,
476            proof_time_us = proof_start.elapsed().as_micros(),
477            worker_id = self.id,
478            "Completed storage proof calculation"
479        );
480
481        decoded_result
482    }
483
484    /// Process a blinded storage node request.
485    ///
486    /// Used by storage workers to retrieve blinded storage trie nodes for proof construction.
487    fn process_blinded_storage_node(
488        &self,
489        account: B256,
490        path: &Nibbles,
491    ) -> TrieNodeProviderResult {
492        let storage_node_provider =
493            ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
494        storage_node_provider.trie_node(path)
495    }
496
497    /// Process a blinded account node request.
498    ///
499    /// Used by account workers to retrieve blinded account trie nodes for proof construction.
500    fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
501        let account_node_provider =
502            ProofBlindedAccountProvider::new(&self.provider, &self.provider);
503        account_node_provider.trie_node(path)
504    }
505}
506impl TrieNodeProviderFactory for ProofWorkerHandle {
507    type AccountNodeProvider = ProofTaskTrieNodeProvider;
508    type StorageNodeProvider = ProofTaskTrieNodeProvider;
509
510    fn account_node_provider(&self) -> Self::AccountNodeProvider {
511        ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
512    }
513
514    fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
515        ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
516    }
517}
518
519/// Trie node provider for retrieving trie nodes by path.
520#[derive(Debug)]
521pub enum ProofTaskTrieNodeProvider {
522    /// Blinded account trie node provider.
523    AccountNode {
524        /// Handle to the proof worker pools.
525        handle: ProofWorkerHandle,
526    },
527    /// Blinded storage trie node provider.
528    StorageNode {
529        /// Target account.
530        account: B256,
531        /// Handle to the proof worker pools.
532        handle: ProofWorkerHandle,
533    },
534}
535
536impl TrieNodeProvider for ProofTaskTrieNodeProvider {
537    fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
538        match self {
539            Self::AccountNode { handle } => {
540                let rx = handle
541                    .dispatch_blinded_account_node(*path)
542                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
543                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
544            }
545            Self::StorageNode { handle, account } => {
546                let rx = handle
547                    .dispatch_blinded_storage_node(*account, *path)
548                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
549                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
550            }
551        }
552    }
553}
554/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
555#[derive(Debug)]
556pub enum ProofResult {
557    /// Account multiproof with statistics
558    AccountMultiproof {
559        /// The account multiproof
560        proof: DecodedMultiProof,
561        /// Statistics collected during proof computation
562        stats: ParallelTrieStats,
563    },
564    /// Storage proof for a specific account
565    StorageProof {
566        /// The hashed address this storage proof belongs to
567        hashed_address: B256,
568        /// The storage multiproof
569        proof: DecodedStorageMultiProof,
570    },
571}
572
573impl ProofResult {
574    /// Convert this proof result into a `DecodedMultiProof`.
575    ///
576    /// For account multiproofs, returns the multiproof directly (discarding stats).
577    /// For storage proofs, wraps the storage proof into a minimal multiproof.
578    pub fn into_multiproof(self) -> DecodedMultiProof {
579        match self {
580            Self::AccountMultiproof { proof, stats: _ } => proof,
581            Self::StorageProof { hashed_address, proof } => {
582                DecodedMultiProof::from_storage_proof(hashed_address, proof)
583            }
584        }
585    }
586}
587/// Channel used by worker threads to deliver `ProofResultMessage` items back to
588/// `MultiProofTask`.
589///
590/// Workers use this sender to deliver proof results directly to `MultiProofTask`.
591pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
592
593/// Message containing a completed proof result with metadata for direct delivery to
594/// `MultiProofTask`.
595///
596/// This type enables workers to send proof results directly to the `MultiProofTask` event loop.
597#[derive(Debug)]
598pub struct ProofResultMessage {
599    /// Sequence number for ordering proofs
600    pub sequence_number: u64,
601    /// The proof calculation result (either account multiproof or storage proof)
602    pub result: Result<ProofResult, ParallelStateRootError>,
603    /// Time taken for the entire proof calculation (from dispatch to completion)
604    pub elapsed: Duration,
605    /// Original state update that triggered this proof
606    pub state: HashedPostState,
607}
608
609/// Context for sending proof calculation results back to `MultiProofTask`.
610///
611/// This struct contains all context needed to send and track proof calculation results.
612/// Workers use this to deliver completed proofs back to the main event loop.
613#[derive(Debug, Clone)]
614pub struct ProofResultContext {
615    /// Channel sender for result delivery
616    pub sender: ProofResultSender,
617    /// Sequence number for proof ordering
618    pub sequence_number: u64,
619    /// Original state update that triggered this proof
620    pub state: HashedPostState,
621    /// Calculation start time for measuring elapsed duration
622    pub start_time: Instant,
623}
624
625impl ProofResultContext {
626    /// Creates a new proof result context.
627    pub const fn new(
628        sender: ProofResultSender,
629        sequence_number: u64,
630        state: HashedPostState,
631        start_time: Instant,
632    ) -> Self {
633        Self { sender, sequence_number, state, start_time }
634    }
635}
636/// Internal message for storage workers.
637#[derive(Debug)]
638enum StorageWorkerJob {
639    /// Storage proof computation request
640    StorageProof {
641        /// Storage proof input parameters
642        input: StorageProofInput,
643        /// Context for sending the proof result.
644        proof_result_sender: ProofResultContext,
645    },
646    /// Blinded storage node retrieval request
647    BlindedStorageNode {
648        /// Target account
649        account: B256,
650        /// Path to the storage node
651        path: Nibbles,
652        /// Channel to send result back to original caller
653        result_sender: Sender<TrieNodeProviderResult>,
654    },
655}
656
657/// Worker for storage trie operations.
658///
659/// Each worker maintains a dedicated database transaction and processes
660/// storage proof requests and blinded node lookups.
661struct StorageProofWorker<Factory> {
662    /// Shared task context with database factory and prefix sets
663    task_ctx: ProofTaskCtx<Factory>,
664    /// Channel for receiving work
665    work_rx: CrossbeamReceiver<StorageWorkerJob>,
666    /// Unique identifier for this worker (used for tracing)
667    worker_id: usize,
668    /// Counter tracking worker availability
669    available_workers: Arc<AtomicUsize>,
670    /// Metrics collector for this worker
671    #[cfg(feature = "metrics")]
672    metrics: ProofTaskTrieMetrics,
673    /// Cursor metrics for this worker
674    #[cfg(feature = "metrics")]
675    cursor_metrics: ProofTaskCursorMetrics,
676}
677
678impl<Factory> StorageProofWorker<Factory>
679where
680    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
681{
682    /// Creates a new storage proof worker.
683    const fn new(
684        task_ctx: ProofTaskCtx<Factory>,
685        work_rx: CrossbeamReceiver<StorageWorkerJob>,
686        worker_id: usize,
687        available_workers: Arc<AtomicUsize>,
688        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
689        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
690    ) -> Self {
691        Self {
692            task_ctx,
693            work_rx,
694            worker_id,
695            available_workers,
696            #[cfg(feature = "metrics")]
697            metrics,
698            #[cfg(feature = "metrics")]
699            cursor_metrics,
700        }
701    }
702
703    /// Runs the worker loop, processing jobs until the channel closes.
704    ///
705    /// # Lifecycle
706    ///
707    /// 1. Initializes database provider and transaction
708    /// 2. Advertises availability
709    /// 3. Processes jobs in a loop:
710    ///    - Receives job from channel
711    ///    - Marks worker as busy
712    ///    - Processes the job
713    ///    - Marks worker as available
714    /// 4. Shuts down when channel closes
715    ///
716    /// # Panic Safety
717    ///
718    /// If this function panics, the worker thread terminates but other workers
719    /// continue operating and the system degrades gracefully.
720    fn run(mut self) -> ProviderResult<()> {
721        let Self {
722            task_ctx,
723            work_rx,
724            worker_id,
725            available_workers,
726            #[cfg(feature = "metrics")]
727            metrics,
728            #[cfg(feature = "metrics")]
729            ref mut cursor_metrics,
730        } = self;
731
732        // Create provider from factory
733        let provider = task_ctx.factory.database_provider_ro()?;
734        let proof_tx = ProofTaskTx::new(provider, worker_id);
735
736        trace!(
737            target: "trie::proof_task",
738            worker_id,
739            "Storage worker started"
740        );
741
742        let mut storage_proofs_processed = 0u64;
743        let mut storage_nodes_processed = 0u64;
744        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
745
746        // Initially mark this worker as available.
747        available_workers.fetch_add(1, Ordering::Relaxed);
748
749        while let Ok(job) = work_rx.recv() {
750            // Mark worker as busy.
751            available_workers.fetch_sub(1, Ordering::Relaxed);
752
753            match job {
754                StorageWorkerJob::StorageProof { input, proof_result_sender } => {
755                    Self::process_storage_proof(
756                        worker_id,
757                        &proof_tx,
758                        input,
759                        proof_result_sender,
760                        &mut storage_proofs_processed,
761                        &mut cursor_metrics_cache,
762                    );
763                }
764
765                StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
766                    Self::process_blinded_node(
767                        worker_id,
768                        &proof_tx,
769                        account,
770                        path,
771                        result_sender,
772                        &mut storage_nodes_processed,
773                    );
774                }
775            }
776
777            // Mark worker as available again.
778            available_workers.fetch_add(1, Ordering::Relaxed);
779        }
780
781        trace!(
782            target: "trie::proof_task",
783            worker_id,
784            storage_proofs_processed,
785            storage_nodes_processed,
786            "Storage worker shutting down"
787        );
788
789        #[cfg(feature = "metrics")]
790        {
791            metrics.record_storage_nodes(storage_nodes_processed as usize);
792            cursor_metrics.record(&mut cursor_metrics_cache);
793        }
794
795        Ok(())
796    }
797
798    /// Processes a storage proof request.
799    fn process_storage_proof<Provider>(
800        worker_id: usize,
801        proof_tx: &ProofTaskTx<Provider>,
802        input: StorageProofInput,
803        proof_result_sender: ProofResultContext,
804        storage_proofs_processed: &mut u64,
805        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
806    ) where
807        Provider: TrieCursorFactory + HashedCursorFactory,
808    {
809        let hashed_address = input.hashed_address;
810        let ProofResultContext { sender, sequence_number: seq, state, start_time } =
811            proof_result_sender;
812
813        let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
814        let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
815
816        trace!(
817            target: "trie::proof_task",
818            worker_id,
819            hashed_address = ?hashed_address,
820            prefix_set_len = input.prefix_set.len(),
821            target_slots_len = input.target_slots.len(),
822            "Processing storage proof"
823        );
824
825        let proof_start = Instant::now();
826        let result = proof_tx.compute_storage_proof(
827            input,
828            &mut trie_cursor_metrics,
829            &mut hashed_cursor_metrics,
830        );
831
832        let proof_elapsed = proof_start.elapsed();
833        *storage_proofs_processed += 1;
834
835        let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
836            hashed_address,
837            proof: storage_proof,
838        });
839
840        if sender
841            .send(ProofResultMessage {
842                sequence_number: seq,
843                result: result_msg,
844                elapsed: start_time.elapsed(),
845                state,
846            })
847            .is_err()
848        {
849            trace!(
850                target: "trie::proof_task",
851                worker_id,
852                hashed_address = ?hashed_address,
853                storage_proofs_processed,
854                "Proof result receiver dropped, discarding result"
855            );
856        }
857
858        trace!(
859            target: "trie::proof_task",
860            worker_id,
861            hashed_address = ?hashed_address,
862            proof_time_us = proof_elapsed.as_micros(),
863            total_processed = storage_proofs_processed,
864            trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
865            hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
866            ?trie_cursor_metrics,
867            ?hashed_cursor_metrics,
868            "Storage proof completed"
869        );
870
871        #[cfg(feature = "metrics")]
872        {
873            // Accumulate per-proof metrics into the worker's cache
874            let per_proof_cache = ProofTaskCursorMetricsCache {
875                account_trie_cursor: TrieCursorMetricsCache::default(),
876                account_hashed_cursor: HashedCursorMetricsCache::default(),
877                storage_trie_cursor: trie_cursor_metrics,
878                storage_hashed_cursor: hashed_cursor_metrics,
879            };
880            cursor_metrics_cache.extend(&per_proof_cache);
881        }
882    }
883
884    /// Processes a blinded storage node lookup request.
885    fn process_blinded_node<Provider>(
886        worker_id: usize,
887        proof_tx: &ProofTaskTx<Provider>,
888        account: B256,
889        path: Nibbles,
890        result_sender: Sender<TrieNodeProviderResult>,
891        storage_nodes_processed: &mut u64,
892    ) where
893        Provider: TrieCursorFactory + HashedCursorFactory,
894    {
895        trace!(
896            target: "trie::proof_task",
897            worker_id,
898            ?account,
899            ?path,
900            "Processing blinded storage node"
901        );
902
903        let start = Instant::now();
904        let result = proof_tx.process_blinded_storage_node(account, &path);
905        let elapsed = start.elapsed();
906
907        *storage_nodes_processed += 1;
908
909        if result_sender.send(result).is_err() {
910            trace!(
911                target: "trie::proof_task",
912                worker_id,
913                ?account,
914                ?path,
915                storage_nodes_processed,
916                "Blinded storage node receiver dropped, discarding result"
917            );
918        }
919
920        trace!(
921            target: "trie::proof_task",
922            worker_id,
923            ?account,
924            ?path,
925            elapsed_us = elapsed.as_micros(),
926            total_processed = storage_nodes_processed,
927            "Blinded storage node completed"
928        );
929    }
930}
931
932/// Worker for account trie operations.
933///
934/// Each worker maintains a dedicated database transaction and processes
935/// account multiproof requests and blinded node lookups.
936struct AccountProofWorker<Factory> {
937    /// Shared task context with database factory and prefix sets
938    task_ctx: ProofTaskCtx<Factory>,
939    /// Channel for receiving work
940    work_rx: CrossbeamReceiver<AccountWorkerJob>,
941    /// Unique identifier for this worker (used for tracing)
942    worker_id: usize,
943    /// Channel for dispatching storage proof work
944    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
945    /// Counter tracking worker availability
946    available_workers: Arc<AtomicUsize>,
947    /// Metrics collector for this worker
948    #[cfg(feature = "metrics")]
949    metrics: ProofTaskTrieMetrics,
950    /// Cursor metrics for this worker
951    #[cfg(feature = "metrics")]
952    cursor_metrics: ProofTaskCursorMetrics,
953}
954
955impl<Factory> AccountProofWorker<Factory>
956where
957    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
958{
959    /// Creates a new account proof worker.
960    const fn new(
961        task_ctx: ProofTaskCtx<Factory>,
962        work_rx: CrossbeamReceiver<AccountWorkerJob>,
963        worker_id: usize,
964        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
965        available_workers: Arc<AtomicUsize>,
966        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
967        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
968    ) -> Self {
969        Self {
970            task_ctx,
971            work_rx,
972            worker_id,
973            storage_work_tx,
974            available_workers,
975            #[cfg(feature = "metrics")]
976            metrics,
977            #[cfg(feature = "metrics")]
978            cursor_metrics,
979        }
980    }
981
982    /// Runs the worker loop, processing jobs until the channel closes.
983    ///
984    /// # Lifecycle
985    ///
986    /// 1. Initializes database provider and transaction
987    /// 2. Advertises availability
988    /// 3. Processes jobs in a loop:
989    ///    - Receives job from channel
990    ///    - Marks worker as busy
991    ///    - Processes the job
992    ///    - Marks worker as available
993    /// 4. Shuts down when channel closes
994    ///
995    /// # Panic Safety
996    ///
997    /// If this function panics, the worker thread terminates but other workers
998    /// continue operating and the system degrades gracefully.
999    fn run(mut self) -> ProviderResult<()> {
1000        let Self {
1001            task_ctx,
1002            work_rx,
1003            worker_id,
1004            storage_work_tx,
1005            available_workers,
1006            #[cfg(feature = "metrics")]
1007            metrics,
1008            #[cfg(feature = "metrics")]
1009            ref mut cursor_metrics,
1010        } = self;
1011
1012        // Create provider from factory
1013        let provider = task_ctx.factory.database_provider_ro()?;
1014        let proof_tx = ProofTaskTx::new(provider, worker_id);
1015
1016        trace!(
1017            target: "trie::proof_task",
1018            worker_id,
1019            "Account worker started"
1020        );
1021
1022        let mut account_proofs_processed = 0u64;
1023        let mut account_nodes_processed = 0u64;
1024        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1025
1026        // Count this worker as available only after successful initialization.
1027        available_workers.fetch_add(1, Ordering::Relaxed);
1028
1029        while let Ok(job) = work_rx.recv() {
1030            // Mark worker as busy.
1031            available_workers.fetch_sub(1, Ordering::Relaxed);
1032
1033            match job {
1034                AccountWorkerJob::AccountMultiproof { input } => {
1035                    Self::process_account_multiproof(
1036                        worker_id,
1037                        &proof_tx,
1038                        storage_work_tx.clone(),
1039                        *input,
1040                        &mut account_proofs_processed,
1041                        &mut cursor_metrics_cache,
1042                    );
1043                }
1044
1045                AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1046                    Self::process_blinded_node(
1047                        worker_id,
1048                        &proof_tx,
1049                        path,
1050                        result_sender,
1051                        &mut account_nodes_processed,
1052                    );
1053                }
1054            }
1055
1056            // Mark worker as available again.
1057            available_workers.fetch_add(1, Ordering::Relaxed);
1058        }
1059
1060        trace!(
1061            target: "trie::proof_task",
1062            worker_id,
1063            account_proofs_processed,
1064            account_nodes_processed,
1065            "Account worker shutting down"
1066        );
1067
1068        #[cfg(feature = "metrics")]
1069        {
1070            metrics.record_account_nodes(account_nodes_processed as usize);
1071            cursor_metrics.record(&mut cursor_metrics_cache);
1072        }
1073
1074        Ok(())
1075    }
1076
1077    /// Processes an account multiproof request.
1078    fn process_account_multiproof<Provider>(
1079        worker_id: usize,
1080        proof_tx: &ProofTaskTx<Provider>,
1081        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1082        input: AccountMultiproofInput,
1083        account_proofs_processed: &mut u64,
1084        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1085    ) where
1086        Provider: TrieCursorFactory + HashedCursorFactory,
1087    {
1088        let AccountMultiproofInput {
1089            targets,
1090            mut prefix_sets,
1091            collect_branch_node_masks,
1092            multi_added_removed_keys,
1093            missed_leaves_storage_roots,
1094            proof_result_sender:
1095                ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
1096        } = input;
1097
1098        let span = debug_span!(
1099            target: "trie::proof_task",
1100            "Account multiproof calculation",
1101            targets = targets.len(),
1102            worker_id,
1103        );
1104        let _span_guard = span.enter();
1105
1106        trace!(
1107            target: "trie::proof_task",
1108            "Processing account multiproof"
1109        );
1110
1111        let proof_start = Instant::now();
1112
1113        let mut tracker = ParallelTrieTracker::default();
1114
1115        let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1116
1117        let storage_root_targets_len =
1118            StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1119
1120        tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1121
1122        let storage_proof_receivers = match dispatch_storage_proofs(
1123            &storage_work_tx,
1124            &targets,
1125            &mut storage_prefix_sets,
1126            collect_branch_node_masks,
1127            multi_added_removed_keys.as_ref(),
1128        ) {
1129            Ok(receivers) => receivers,
1130            Err(error) => {
1131                // Send error through result channel
1132                error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
1133                let _ = result_tx.send(ProofResultMessage {
1134                    sequence_number: seq,
1135                    result: Err(error),
1136                    elapsed: start.elapsed(),
1137                    state,
1138                });
1139                return;
1140            }
1141        };
1142
1143        // Use the missed leaves cache passed from the multiproof manager
1144        let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1145
1146        let ctx = AccountMultiproofParams {
1147            targets: &targets,
1148            prefix_set: account_prefix_set,
1149            collect_branch_node_masks,
1150            multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1151            storage_proof_receivers,
1152            missed_leaves_storage_roots: missed_leaves_storage_roots.as_ref(),
1153        };
1154
1155        let result =
1156            build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
1157
1158        let proof_elapsed = proof_start.elapsed();
1159        let total_elapsed = start.elapsed();
1160        let proof_cursor_metrics = tracker.cursor_metrics;
1161        proof_cursor_metrics.record_spans();
1162
1163        let stats = tracker.finish();
1164        let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
1165        *account_proofs_processed += 1;
1166
1167        // Send result to MultiProofTask
1168        if result_tx
1169            .send(ProofResultMessage {
1170                sequence_number: seq,
1171                result,
1172                elapsed: total_elapsed,
1173                state,
1174            })
1175            .is_err()
1176        {
1177            trace!(
1178                target: "trie::proof_task",
1179                worker_id,
1180                account_proofs_processed,
1181                "Account multiproof receiver dropped, discarding result"
1182            );
1183        }
1184
1185        trace!(
1186            target: "trie::proof_task",
1187            proof_time_us = proof_elapsed.as_micros(),
1188            total_elapsed_us = total_elapsed.as_micros(),
1189            total_processed = account_proofs_processed,
1190            account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1191            account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1192            storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1193            storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1194            account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1195            account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1196            storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1197            storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1198            "Account multiproof completed"
1199        );
1200
1201        #[cfg(feature = "metrics")]
1202        // Accumulate per-proof metrics into the worker's cache
1203        cursor_metrics_cache.extend(&proof_cursor_metrics);
1204    }
1205
1206    /// Processes a blinded account node lookup request.
1207    fn process_blinded_node<Provider>(
1208        worker_id: usize,
1209        proof_tx: &ProofTaskTx<Provider>,
1210        path: Nibbles,
1211        result_sender: Sender<TrieNodeProviderResult>,
1212        account_nodes_processed: &mut u64,
1213    ) where
1214        Provider: TrieCursorFactory + HashedCursorFactory,
1215    {
1216        let span = debug_span!(
1217            target: "trie::proof_task",
1218            "Blinded account node calculation",
1219            ?path,
1220            worker_id,
1221        );
1222        let _span_guard = span.enter();
1223
1224        trace!(
1225            target: "trie::proof_task",
1226            "Processing blinded account node"
1227        );
1228
1229        let start = Instant::now();
1230        let result = proof_tx.process_blinded_account_node(&path);
1231        let elapsed = start.elapsed();
1232
1233        *account_nodes_processed += 1;
1234
1235        if result_sender.send(result).is_err() {
1236            trace!(
1237                target: "trie::proof_task",
1238                worker_id,
1239                ?path,
1240                account_nodes_processed,
1241                "Blinded account node receiver dropped, discarding result"
1242            );
1243        }
1244
1245        trace!(
1246            target: "trie::proof_task",
1247            node_time_us = elapsed.as_micros(),
1248            total_processed = account_nodes_processed,
1249            "Blinded account node completed"
1250        );
1251    }
1252}
1253
1254/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk.
1255///
1256/// This is a helper function used by account workers to build the account subtree proof
1257/// while storage proofs are still being computed. Receivers are consumed only when needed,
1258/// enabling interleaved parallelism between account trie traversal and storage proof computation.
1259///
1260/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs.
1261fn build_account_multiproof_with_storage_roots<P>(
1262    provider: &P,
1263    ctx: AccountMultiproofParams<'_>,
1264    tracker: &mut ParallelTrieTracker,
1265) -> Result<DecodedMultiProof, ParallelStateRootError>
1266where
1267    P: TrieCursorFactory + HashedCursorFactory,
1268{
1269    let accounts_added_removed_keys =
1270        ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1271
1272    // Create local metrics caches for account cursors. We can't directly use the metrics caches in
1273    // the tracker due to the call to `inc_missed_leaves` which occurs on it.
1274    let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default();
1275    let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default();
1276
1277    // Wrap account trie cursor with instrumented cursor
1278    let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1279    let account_trie_cursor =
1280        InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics);
1281
1282    // Create the walker.
1283    let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1284        .with_added_removed_keys(accounts_added_removed_keys)
1285        .with_deletions_retained(true);
1286
1287    // Create a hash builder to rebuild the root node since it is not available in the database.
1288    let retainer = ctx
1289        .targets
1290        .keys()
1291        .map(Nibbles::unpack)
1292        .collect::<ProofRetainer>()
1293        .with_added_removed_keys(accounts_added_removed_keys);
1294    let mut hash_builder = HashBuilder::default()
1295        .with_proof_retainer(retainer)
1296        .with_updates(ctx.collect_branch_node_masks);
1297
1298    // Initialize storage multiproofs map with pre-allocated capacity.
1299    // Proofs will be inserted as they're consumed from receivers during trie walk.
1300    let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1301        B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1302    let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1303
1304    // Wrap account hashed cursor with instrumented cursor
1305    let account_hashed_cursor =
1306        provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1307    let account_hashed_cursor =
1308        InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics);
1309
1310    let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1311
1312    let mut storage_proof_receivers = ctx.storage_proof_receivers;
1313
1314    while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1315        match account_node {
1316            TrieElement::Branch(node) => {
1317                hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1318            }
1319            TrieElement::Leaf(hashed_address, account) => {
1320                let root = match storage_proof_receivers.remove(&hashed_address) {
1321                    Some(receiver) => {
1322                        let _guard = debug_span!(
1323                            target: "trie::proof_task",
1324                            "Waiting for storage proof",
1325                            ?hashed_address,
1326                        );
1327                        // Block on this specific storage proof receiver - enables interleaved
1328                        // parallelism
1329                        let proof_msg = receiver.recv().map_err(|_| {
1330                            ParallelStateRootError::StorageRoot(
1331                                reth_execution_errors::StorageRootError::Database(
1332                                    DatabaseError::Other(format!(
1333                                        "Storage proof channel closed for {hashed_address}"
1334                                    )),
1335                                ),
1336                            )
1337                        })?;
1338
1339                        drop(_guard);
1340
1341                        // Extract storage proof from the result
1342                        let proof = match proof_msg.result? {
1343                            ProofResult::StorageProof { hashed_address: addr, proof } => {
1344                                debug_assert_eq!(
1345                                    addr,
1346                                    hashed_address,
1347                                    "storage worker must return same address: expected {hashed_address}, got {addr}"
1348                                );
1349                                proof
1350                            }
1351                            ProofResult::AccountMultiproof { .. } => {
1352                                unreachable!("storage worker only sends StorageProof variant")
1353                            }
1354                        };
1355
1356                        let root = proof.root;
1357                        collected_decoded_storages.insert(hashed_address, proof);
1358                        root
1359                    }
1360                    // Since we do not store all intermediate nodes in the database, there might
1361                    // be a possibility of re-adding a non-modified leaf to the hash builder.
1362                    None => {
1363                        tracker.inc_missed_leaves();
1364
1365                        match ctx.missed_leaves_storage_roots.entry(hashed_address) {
1366                            dashmap::Entry::Occupied(occ) => *occ.get(),
1367                            dashmap::Entry::Vacant(vac) => {
1368                                let root =
1369                                    StorageProof::new_hashed(provider, provider, hashed_address)
1370                                        .with_prefix_set_mut(Default::default())
1371                                        .with_trie_cursor_metrics(
1372                                            &mut tracker.cursor_metrics.storage_trie_cursor,
1373                                        )
1374                                        .with_hashed_cursor_metrics(
1375                                            &mut tracker.cursor_metrics.storage_hashed_cursor,
1376                                        )
1377                                        .storage_multiproof(
1378                                            ctx.targets
1379                                                .get(&hashed_address)
1380                                                .cloned()
1381                                                .unwrap_or_default(),
1382                                        )
1383                                        .map_err(|e| {
1384                                            ParallelStateRootError::StorageRoot(
1385                                                reth_execution_errors::StorageRootError::Database(
1386                                                    DatabaseError::Other(e.to_string()),
1387                                                ),
1388                                            )
1389                                        })?
1390                                        .root;
1391
1392                                vac.insert(root);
1393                                root
1394                            }
1395                        }
1396                    }
1397                };
1398
1399                // Encode account
1400                account_rlp.clear();
1401                let account = account.into_trie_account(root);
1402                account.encode(&mut account_rlp as &mut dyn BufMut);
1403
1404                hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1405            }
1406        }
1407    }
1408
1409    // Consume remaining storage proof receivers for accounts not encountered during trie walk.
1410    for (hashed_address, receiver) in storage_proof_receivers {
1411        if let Ok(proof_msg) = receiver.recv() {
1412            // Extract storage proof from the result
1413            if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
1414                collected_decoded_storages.insert(hashed_address, proof);
1415            }
1416        }
1417    }
1418
1419    let _ = hash_builder.root();
1420
1421    let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1422    let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1423
1424    let (branch_node_hash_masks, branch_node_tree_masks) = if ctx.collect_branch_node_masks {
1425        let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1426        (
1427            updated_branch_nodes.iter().map(|(path, node)| (*path, node.hash_mask)).collect(),
1428            updated_branch_nodes.into_iter().map(|(path, node)| (path, node.tree_mask)).collect(),
1429        )
1430    } else {
1431        (Default::default(), Default::default())
1432    };
1433
1434    // Extend tracker with accumulated metrics from account cursors
1435    tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics);
1436    tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics);
1437
1438    Ok(DecodedMultiProof {
1439        account_subtree: decoded_account_subtree,
1440        branch_node_hash_masks,
1441        branch_node_tree_masks,
1442        storages: collected_decoded_storages,
1443    })
1444}
1445/// Queues storage proofs for all accounts in the targets and returns receivers.
1446///
1447/// This function queues all storage proof tasks to the worker pool but returns immediately
1448/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1449/// computation. This enables interleaved parallelism for better performance.
1450///
1451/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1452fn dispatch_storage_proofs(
1453    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1454    targets: &MultiProofTargets,
1455    storage_prefix_sets: &mut B256Map<PrefixSet>,
1456    with_branch_node_masks: bool,
1457    multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1458) -> Result<B256Map<CrossbeamReceiver<ProofResultMessage>>, ParallelStateRootError> {
1459    let mut storage_proof_receivers =
1460        B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1461
1462    // Dispatch all storage proofs to worker pool
1463    for (hashed_address, target_slots) in targets.iter() {
1464        let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1465
1466        // Create channel for receiving ProofResultMessage
1467        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1468        let start = Instant::now();
1469
1470        // Create computation input (data only, no communication channel)
1471        let input = StorageProofInput::new(
1472            *hashed_address,
1473            prefix_set,
1474            target_slots.clone(),
1475            with_branch_node_masks,
1476            multi_added_removed_keys.cloned(),
1477        );
1478
1479        // Always dispatch a storage proof so we obtain the storage root even when no slots are
1480        // requested.
1481        storage_work_tx
1482            .send(StorageWorkerJob::StorageProof {
1483                input,
1484                proof_result_sender: ProofResultContext::new(
1485                    result_tx,
1486                    0,
1487                    HashedPostState::default(),
1488                    start,
1489                ),
1490            })
1491            .map_err(|_| {
1492                ParallelStateRootError::Other(format!(
1493                    "Failed to queue storage proof for {}: storage worker pool unavailable",
1494                    hashed_address
1495                ))
1496            })?;
1497
1498        storage_proof_receivers.insert(*hashed_address, result_rx);
1499    }
1500
1501    Ok(storage_proof_receivers)
1502}
1503/// Input parameters for storage proof computation.
1504#[derive(Debug)]
1505pub struct StorageProofInput {
1506    /// The hashed address for which the proof is calculated.
1507    hashed_address: B256,
1508    /// The prefix set for the proof calculation.
1509    prefix_set: PrefixSet,
1510    /// The target slots for the proof calculation.
1511    target_slots: B256Set,
1512    /// Whether or not to collect branch node masks
1513    with_branch_node_masks: bool,
1514    /// Provided by the user to give the necessary context to retain extra proofs.
1515    multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1516}
1517
1518impl StorageProofInput {
1519    /// Creates a new [`StorageProofInput`] with the given hashed address, prefix set, and target
1520    /// slots.
1521    pub const fn new(
1522        hashed_address: B256,
1523        prefix_set: PrefixSet,
1524        target_slots: B256Set,
1525        with_branch_node_masks: bool,
1526        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1527    ) -> Self {
1528        Self {
1529            hashed_address,
1530            prefix_set,
1531            target_slots,
1532            with_branch_node_masks,
1533            multi_added_removed_keys,
1534        }
1535    }
1536}
1537/// Input parameters for account multiproof computation.
1538#[derive(Debug, Clone)]
1539pub struct AccountMultiproofInput {
1540    /// The targets for which to compute the multiproof.
1541    pub targets: MultiProofTargets,
1542    /// The prefix sets for the proof calculation.
1543    pub prefix_sets: TriePrefixSets,
1544    /// Whether or not to collect branch node masks.
1545    pub collect_branch_node_masks: bool,
1546    /// Provided by the user to give the necessary context to retain extra proofs.
1547    pub multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1548    /// Cached storage proof roots for missed leaves encountered during account trie walk.
1549    pub missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
1550    /// Context for sending the proof result.
1551    pub proof_result_sender: ProofResultContext,
1552}
1553
1554/// Parameters for building an account multiproof with pre-computed storage roots.
1555struct AccountMultiproofParams<'a> {
1556    /// The targets for which to compute the multiproof.
1557    targets: &'a MultiProofTargets,
1558    /// The prefix set for the account trie walk.
1559    prefix_set: PrefixSet,
1560    /// Whether or not to collect branch node masks.
1561    collect_branch_node_masks: bool,
1562    /// Provided by the user to give the necessary context to retain extra proofs.
1563    multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1564    /// Receivers for storage proofs being computed in parallel.
1565    storage_proof_receivers: B256Map<CrossbeamReceiver<ProofResultMessage>>,
1566    /// Cached storage proof roots for missed leaves encountered during account trie walk.
1567    missed_leaves_storage_roots: &'a DashMap<B256, B256>,
1568}
1569
1570/// Internal message for account workers.
1571#[derive(Debug)]
1572enum AccountWorkerJob {
1573    /// Account multiproof computation request
1574    AccountMultiproof {
1575        /// Account multiproof input parameters
1576        input: Box<AccountMultiproofInput>,
1577    },
1578    /// Blinded account node retrieval request
1579    BlindedAccountNode {
1580        /// Path to the account node
1581        path: Nibbles,
1582        /// Channel to send result back to original caller
1583        result_sender: Sender<TrieNodeProviderResult>,
1584    },
1585}
1586
1587#[cfg(test)]
1588mod tests {
1589    use super::*;
1590    use reth_provider::test_utils::create_test_provider_factory;
1591    use tokio::{runtime::Builder, task};
1592
1593    fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1594        ProofTaskCtx::new(factory)
1595    }
1596
1597    /// Ensures `ProofWorkerHandle::new` spawns workers correctly.
1598    #[test]
1599    fn spawn_proof_workers_creates_handle() {
1600        let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
1601        runtime.block_on(async {
1602            let handle = tokio::runtime::Handle::current();
1603            let provider_factory = create_test_provider_factory();
1604            let factory =
1605                reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
1606            let ctx = test_ctx(factory);
1607
1608            let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
1609
1610            // Verify handle can be cloned
1611            let _cloned_handle = proof_handle.clone();
1612
1613            // Workers shut down automatically when handle is dropped
1614            drop(proof_handle);
1615            task::yield_now().await;
1616        });
1617    }
1618}