reth_trie_parallel/
proof_task.rs

1//! Parallel proof computation using worker pools with dedicated database transactions.
2//!
3//!
4//! # Architecture
5//!
6//! - **Worker Pools**: Pre-spawned workers with dedicated database transactions
7//!   - Storage pool: Handles storage proofs and blinded storage node requests
8//!   - Account pool: Handles account multiproofs and blinded account node requests
9//! - **Direct Channel Access**: [`ProofWorkerHandle`] provides type-safe queue methods with direct
10//!   access to worker channels, eliminating routing overhead
11//! - **Automatic Shutdown**: Workers terminate gracefully when all handles are dropped
12//!
13//! # Message Flow
14//!
15//! 1. `MultiProofTask` prepares a storage or account job and hands it to [`ProofWorkerHandle`]. The
16//!    job carries a [`ProofResultContext`] so the worker knows how to send the result back.
17//! 2. A worker receives the job, runs the proof, and sends a [`ProofResultMessage`] through the
18//!    provided [`ProofResultSender`].
19//! 3. `MultiProofTask` receives the message, uses `sequence_number` to keep proofs in order, and
20//!    proceeds with its state-root logic.
21//!
22//! Each job gets its own direct channel so results go straight back to `MultiProofTask`. That keeps
23//! ordering decisions in one place and lets workers run independently.
24//!
25//! ```text
26//! MultiProofTask -> MultiproofManager -> ProofWorkerHandle -> Storage/Account Worker
27//!        ^                                          |
28//!        |                                          v
29//! ProofResultMessage <-------- ProofResultSender ---
30//! ```
31
32use crate::{
33    root::ParallelStateRootError,
34    stats::{ParallelTrieStats, ParallelTrieTracker},
35    StorageRootTargets,
36};
37use alloy_primitives::{
38    map::{B256Map, B256Set},
39    B256,
40};
41use alloy_rlp::{BufMut, Encodable};
42use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
43use dashmap::DashMap;
44use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
45use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
46use reth_storage_errors::db::DatabaseError;
47use reth_trie::{
48    hashed_cursor::{HashedCursorFactory, HashedCursorMetricsCache, InstrumentedHashedCursor},
49    node_iter::{TrieElement, TrieNodeIter},
50    prefix_set::TriePrefixSets,
51    proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
52    trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
53    walker::TrieWalker,
54    DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets,
55    Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE,
56};
57use reth_trie_common::{
58    added_removed_keys::MultiAddedRemovedKeys,
59    prefix_set::{PrefixSet, PrefixSetMut},
60    proof::{DecodedProofNodes, ProofRetainer},
61    BranchNodeMasks, BranchNodeMasksMap,
62};
63use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
64use std::{
65    sync::{
66        atomic::{AtomicUsize, Ordering},
67        mpsc::{channel, Receiver, Sender},
68        Arc,
69    },
70    time::{Duration, Instant},
71};
72use tokio::runtime::Handle;
73use tracing::{debug, debug_span, error, trace};
74
75#[cfg(feature = "metrics")]
76use crate::proof_task_metrics::{
77    ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
78};
79
80type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
81type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
82
83/// A handle that provides type-safe access to proof worker pools.
84///
85/// The handle stores direct senders to both storage and account worker pools,
86/// eliminating the need for a routing thread. All handles share reference-counted
87/// channels, and workers shut down gracefully when all handles are dropped.
88#[derive(Debug, Clone)]
89pub struct ProofWorkerHandle {
90    /// Direct sender to storage worker pool
91    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
92    /// Direct sender to account worker pool
93    account_work_tx: CrossbeamSender<AccountWorkerJob>,
94    /// Counter tracking available storage workers. Workers decrement when starting work,
95    /// increment when finishing. Used to determine whether to chunk multiproofs.
96    storage_available_workers: Arc<AtomicUsize>,
97    /// Counter tracking available account workers. Workers decrement when starting work,
98    /// increment when finishing. Used to determine whether to chunk multiproofs.
99    account_available_workers: Arc<AtomicUsize>,
100    /// Total number of storage workers spawned
101    storage_worker_count: usize,
102    /// Total number of account workers spawned
103    account_worker_count: usize,
104}
105
106impl ProofWorkerHandle {
107    /// Spawns storage and account worker pools with dedicated database transactions.
108    ///
109    /// Returns a handle for submitting proof tasks to the worker pools.
110    /// Workers run until the last handle is dropped.
111    ///
112    /// # Parameters
113    /// - `executor`: Tokio runtime handle for spawning blocking tasks
114    /// - `task_ctx`: Shared context with database view and prefix sets
115    /// - `storage_worker_count`: Number of storage workers to spawn
116    /// - `account_worker_count`: Number of account workers to spawn
117    pub fn new<Factory>(
118        executor: Handle,
119        task_ctx: ProofTaskCtx<Factory>,
120        storage_worker_count: usize,
121        account_worker_count: usize,
122    ) -> Self
123    where
124        Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
125            + Clone
126            + Send
127            + 'static,
128    {
129        let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
130        let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
131
132        // Initialize availability counters at zero. Each worker will increment when it
133        // successfully initializes, ensuring only healthy workers are counted.
134        let storage_available_workers = Arc::new(AtomicUsize::new(0));
135        let account_available_workers = Arc::new(AtomicUsize::new(0));
136
137        debug!(
138            target: "trie::proof_task",
139            storage_worker_count,
140            account_worker_count,
141            "Spawning proof worker pools"
142        );
143
144        let parent_span =
145            debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
146                .entered();
147        // Spawn storage workers
148        for worker_id in 0..storage_worker_count {
149            let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
150            let task_ctx_clone = task_ctx.clone();
151            let work_rx_clone = storage_work_rx.clone();
152            let storage_available_workers_clone = storage_available_workers.clone();
153
154            executor.spawn_blocking(move || {
155                #[cfg(feature = "metrics")]
156                let metrics = ProofTaskTrieMetrics::default();
157                #[cfg(feature = "metrics")]
158                let cursor_metrics = ProofTaskCursorMetrics::new();
159
160                let _guard = span.enter();
161                let worker = StorageProofWorker::new(
162                    task_ctx_clone,
163                    work_rx_clone,
164                    worker_id,
165                    storage_available_workers_clone,
166                    #[cfg(feature = "metrics")]
167                    metrics,
168                    #[cfg(feature = "metrics")]
169                    cursor_metrics,
170                );
171                if let Err(error) = worker.run() {
172                    error!(
173                        target: "trie::proof_task",
174                        worker_id,
175                        ?error,
176                        "Storage worker failed"
177                    );
178                }
179            });
180        }
181        drop(parent_span);
182
183        let parent_span =
184            debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
185                .entered();
186        // Spawn account workers
187        for worker_id in 0..account_worker_count {
188            let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
189            let task_ctx_clone = task_ctx.clone();
190            let work_rx_clone = account_work_rx.clone();
191            let storage_work_tx_clone = storage_work_tx.clone();
192            let account_available_workers_clone = account_available_workers.clone();
193
194            executor.spawn_blocking(move || {
195                #[cfg(feature = "metrics")]
196                let metrics = ProofTaskTrieMetrics::default();
197                #[cfg(feature = "metrics")]
198                let cursor_metrics = ProofTaskCursorMetrics::new();
199
200                let _guard = span.enter();
201                let worker = AccountProofWorker::new(
202                    task_ctx_clone,
203                    work_rx_clone,
204                    worker_id,
205                    storage_work_tx_clone,
206                    account_available_workers_clone,
207                    #[cfg(feature = "metrics")]
208                    metrics,
209                    #[cfg(feature = "metrics")]
210                    cursor_metrics,
211                );
212                if let Err(error) = worker.run() {
213                    error!(
214                        target: "trie::proof_task",
215                        worker_id,
216                        ?error,
217                        "Account worker failed"
218                    );
219                }
220            });
221        }
222        drop(parent_span);
223
224        Self {
225            storage_work_tx,
226            account_work_tx,
227            storage_available_workers,
228            account_available_workers,
229            storage_worker_count,
230            account_worker_count,
231        }
232    }
233
234    /// Returns how many storage workers are currently available/idle.
235    pub fn available_storage_workers(&self) -> usize {
236        self.storage_available_workers.load(Ordering::Relaxed)
237    }
238
239    /// Returns how many account workers are currently available/idle.
240    pub fn available_account_workers(&self) -> usize {
241        self.account_available_workers.load(Ordering::Relaxed)
242    }
243
244    /// Returns the number of pending storage tasks in the queue.
245    pub fn pending_storage_tasks(&self) -> usize {
246        self.storage_work_tx.len()
247    }
248
249    /// Returns the number of pending account tasks in the queue.
250    pub fn pending_account_tasks(&self) -> usize {
251        self.account_work_tx.len()
252    }
253
254    /// Returns the total number of storage workers in the pool.
255    pub const fn total_storage_workers(&self) -> usize {
256        self.storage_worker_count
257    }
258
259    /// Returns the total number of account workers in the pool.
260    pub const fn total_account_workers(&self) -> usize {
261        self.account_worker_count
262    }
263
264    /// Returns the number of storage workers currently processing tasks.
265    ///
266    /// This is calculated as total workers minus available workers.
267    pub fn active_storage_workers(&self) -> usize {
268        self.storage_worker_count.saturating_sub(self.available_storage_workers())
269    }
270
271    /// Returns the number of account workers currently processing tasks.
272    ///
273    /// This is calculated as total workers minus available workers.
274    pub fn active_account_workers(&self) -> usize {
275        self.account_worker_count.saturating_sub(self.available_account_workers())
276    }
277
278    /// Dispatch a storage proof computation to storage worker pool
279    ///
280    /// The result will be sent via the `proof_result_sender` channel.
281    pub fn dispatch_storage_proof(
282        &self,
283        input: StorageProofInput,
284        proof_result_sender: ProofResultContext,
285    ) -> Result<(), ProviderError> {
286        self.storage_work_tx
287            .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
288            .map_err(|err| {
289                let error =
290                    ProviderError::other(std::io::Error::other("storage workers unavailable"));
291
292                if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
293                    let ProofResultContext {
294                        sender: result_tx,
295                        sequence_number: seq,
296                        state,
297                        start_time: start,
298                    } = proof_result_sender;
299
300                    let _ = result_tx.send(ProofResultMessage {
301                        sequence_number: seq,
302                        result: Err(ParallelStateRootError::Provider(error.clone())),
303                        elapsed: start.elapsed(),
304                        state,
305                    });
306                }
307
308                error
309            })
310    }
311
312    /// Dispatch an account multiproof computation
313    ///
314    /// The result will be sent via the `result_sender` channel included in the input.
315    pub fn dispatch_account_multiproof(
316        &self,
317        input: AccountMultiproofInput,
318    ) -> Result<(), ProviderError> {
319        self.account_work_tx
320            .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
321            .map_err(|err| {
322                let error =
323                    ProviderError::other(std::io::Error::other("account workers unavailable"));
324
325                if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
326                    let AccountMultiproofInput {
327                        proof_result_sender:
328                            ProofResultContext {
329                                sender: result_tx,
330                                sequence_number: seq,
331                                state,
332                                start_time: start,
333                            },
334                        ..
335                    } = *input;
336
337                    let _ = result_tx.send(ProofResultMessage {
338                        sequence_number: seq,
339                        result: Err(ParallelStateRootError::Provider(error.clone())),
340                        elapsed: start.elapsed(),
341                        state,
342                    });
343                }
344
345                error
346            })
347    }
348
349    /// Dispatch blinded storage node request to storage worker pool
350    pub(crate) fn dispatch_blinded_storage_node(
351        &self,
352        account: B256,
353        path: Nibbles,
354    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
355        let (tx, rx) = channel();
356        self.storage_work_tx
357            .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
358            .map_err(|_| {
359                ProviderError::other(std::io::Error::other("storage workers unavailable"))
360            })?;
361
362        Ok(rx)
363    }
364
365    /// Dispatch blinded account node request to account worker pool
366    pub(crate) fn dispatch_blinded_account_node(
367        &self,
368        path: Nibbles,
369    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
370        let (tx, rx) = channel();
371        self.account_work_tx
372            .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
373            .map_err(|_| {
374                ProviderError::other(std::io::Error::other("account workers unavailable"))
375            })?;
376
377        Ok(rx)
378    }
379}
380
381/// Data used for initializing cursor factories that is shared across all storage proof instances.
382#[derive(Clone, Debug)]
383pub struct ProofTaskCtx<Factory> {
384    /// The factory for creating state providers.
385    factory: Factory,
386}
387
388impl<Factory> ProofTaskCtx<Factory> {
389    /// Creates a new [`ProofTaskCtx`] with the given factory.
390    pub const fn new(factory: Factory) -> Self {
391        Self { factory }
392    }
393}
394
395/// This contains all information shared between all storage proof instances.
396#[derive(Debug)]
397pub struct ProofTaskTx<Provider> {
398    /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
399    provider: Provider,
400
401    /// Identifier for the worker within the worker pool, used only for tracing.
402    id: usize,
403}
404
405impl<Provider> ProofTaskTx<Provider> {
406    /// Initializes a [`ProofTaskTx`] with the given provider and ID.
407    const fn new(provider: Provider, id: usize) -> Self {
408        Self { provider, id }
409    }
410}
411
412impl<Provider> ProofTaskTx<Provider>
413where
414    Provider: TrieCursorFactory + HashedCursorFactory,
415{
416    /// Compute storage proof.
417    ///
418    /// Used by storage workers in the worker pool to compute storage proofs.
419    #[inline]
420    fn compute_storage_proof(
421        &self,
422        input: StorageProofInput,
423        trie_cursor_metrics: &mut TrieCursorMetricsCache,
424        hashed_cursor_metrics: &mut HashedCursorMetricsCache,
425    ) -> StorageProofResult {
426        // Consume the input so we can move large collections (e.g. target slots) without cloning.
427        let StorageProofInput {
428            hashed_address,
429            prefix_set,
430            target_slots,
431            with_branch_node_masks,
432            multi_added_removed_keys,
433        } = input;
434
435        // Get or create added/removed keys context
436        let multi_added_removed_keys =
437            multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
438        let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
439
440        let span = debug_span!(
441            target: "trie::proof_task",
442            "Storage proof calculation",
443            ?hashed_address,
444            target_slots = ?target_slots.len(),
445            worker_id = self.id,
446        );
447        let _span_guard = span.enter();
448
449        let proof_start = Instant::now();
450
451        // Compute raw storage multiproof
452        let raw_proof_result =
453            StorageProof::new_hashed(&self.provider, &self.provider, hashed_address)
454                .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
455                .with_branch_node_masks(with_branch_node_masks)
456                .with_added_removed_keys(added_removed_keys)
457                .with_trie_cursor_metrics(trie_cursor_metrics)
458                .with_hashed_cursor_metrics(hashed_cursor_metrics)
459                .storage_multiproof(target_slots)
460                .map_err(|e| ParallelStateRootError::Other(e.to_string()));
461        trie_cursor_metrics.record_span("trie_cursor");
462        hashed_cursor_metrics.record_span("hashed_cursor");
463
464        // Decode proof into DecodedStorageMultiProof
465        let decoded_result = raw_proof_result.and_then(|raw_proof| {
466            raw_proof.try_into().map_err(|e: alloy_rlp::Error| {
467                ParallelStateRootError::Other(format!(
468                    "Failed to decode storage proof for {}: {}",
469                    hashed_address, e
470                ))
471            })
472        });
473
474        trace!(
475            target: "trie::proof_task",
476            hashed_address = ?hashed_address,
477            proof_time_us = proof_start.elapsed().as_micros(),
478            worker_id = self.id,
479            "Completed storage proof calculation"
480        );
481
482        decoded_result
483    }
484
485    /// Process a blinded storage node request.
486    ///
487    /// Used by storage workers to retrieve blinded storage trie nodes for proof construction.
488    fn process_blinded_storage_node(
489        &self,
490        account: B256,
491        path: &Nibbles,
492    ) -> TrieNodeProviderResult {
493        let storage_node_provider =
494            ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
495        storage_node_provider.trie_node(path)
496    }
497
498    /// Process a blinded account node request.
499    ///
500    /// Used by account workers to retrieve blinded account trie nodes for proof construction.
501    fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
502        let account_node_provider =
503            ProofBlindedAccountProvider::new(&self.provider, &self.provider);
504        account_node_provider.trie_node(path)
505    }
506}
507impl TrieNodeProviderFactory for ProofWorkerHandle {
508    type AccountNodeProvider = ProofTaskTrieNodeProvider;
509    type StorageNodeProvider = ProofTaskTrieNodeProvider;
510
511    fn account_node_provider(&self) -> Self::AccountNodeProvider {
512        ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
513    }
514
515    fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
516        ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
517    }
518}
519
520/// Trie node provider for retrieving trie nodes by path.
521#[derive(Debug)]
522pub enum ProofTaskTrieNodeProvider {
523    /// Blinded account trie node provider.
524    AccountNode {
525        /// Handle to the proof worker pools.
526        handle: ProofWorkerHandle,
527    },
528    /// Blinded storage trie node provider.
529    StorageNode {
530        /// Target account.
531        account: B256,
532        /// Handle to the proof worker pools.
533        handle: ProofWorkerHandle,
534    },
535}
536
537impl TrieNodeProvider for ProofTaskTrieNodeProvider {
538    fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
539        match self {
540            Self::AccountNode { handle } => {
541                let rx = handle
542                    .dispatch_blinded_account_node(*path)
543                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
544                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
545            }
546            Self::StorageNode { handle, account } => {
547                let rx = handle
548                    .dispatch_blinded_storage_node(*account, *path)
549                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
550                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
551            }
552        }
553    }
554}
555/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
556#[derive(Debug)]
557pub enum ProofResult {
558    /// Account multiproof with statistics
559    AccountMultiproof {
560        /// The account multiproof
561        proof: DecodedMultiProof,
562        /// Statistics collected during proof computation
563        stats: ParallelTrieStats,
564    },
565    /// Storage proof for a specific account
566    StorageProof {
567        /// The hashed address this storage proof belongs to
568        hashed_address: B256,
569        /// The storage multiproof
570        proof: DecodedStorageMultiProof,
571    },
572}
573
574impl ProofResult {
575    /// Convert this proof result into a `DecodedMultiProof`.
576    ///
577    /// For account multiproofs, returns the multiproof directly (discarding stats).
578    /// For storage proofs, wraps the storage proof into a minimal multiproof.
579    pub fn into_multiproof(self) -> DecodedMultiProof {
580        match self {
581            Self::AccountMultiproof { proof, stats: _ } => proof,
582            Self::StorageProof { hashed_address, proof } => {
583                DecodedMultiProof::from_storage_proof(hashed_address, proof)
584            }
585        }
586    }
587}
588/// Channel used by worker threads to deliver `ProofResultMessage` items back to
589/// `MultiProofTask`.
590///
591/// Workers use this sender to deliver proof results directly to `MultiProofTask`.
592pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
593
594/// Message containing a completed proof result with metadata for direct delivery to
595/// `MultiProofTask`.
596///
597/// This type enables workers to send proof results directly to the `MultiProofTask` event loop.
598#[derive(Debug)]
599pub struct ProofResultMessage {
600    /// Sequence number for ordering proofs
601    pub sequence_number: u64,
602    /// The proof calculation result (either account multiproof or storage proof)
603    pub result: Result<ProofResult, ParallelStateRootError>,
604    /// Time taken for the entire proof calculation (from dispatch to completion)
605    pub elapsed: Duration,
606    /// Original state update that triggered this proof
607    pub state: HashedPostState,
608}
609
610/// Context for sending proof calculation results back to `MultiProofTask`.
611///
612/// This struct contains all context needed to send and track proof calculation results.
613/// Workers use this to deliver completed proofs back to the main event loop.
614#[derive(Debug, Clone)]
615pub struct ProofResultContext {
616    /// Channel sender for result delivery
617    pub sender: ProofResultSender,
618    /// Sequence number for proof ordering
619    pub sequence_number: u64,
620    /// Original state update that triggered this proof
621    pub state: HashedPostState,
622    /// Calculation start time for measuring elapsed duration
623    pub start_time: Instant,
624}
625
626impl ProofResultContext {
627    /// Creates a new proof result context.
628    pub const fn new(
629        sender: ProofResultSender,
630        sequence_number: u64,
631        state: HashedPostState,
632        start_time: Instant,
633    ) -> Self {
634        Self { sender, sequence_number, state, start_time }
635    }
636}
637/// Internal message for storage workers.
638#[derive(Debug)]
639enum StorageWorkerJob {
640    /// Storage proof computation request
641    StorageProof {
642        /// Storage proof input parameters
643        input: StorageProofInput,
644        /// Context for sending the proof result.
645        proof_result_sender: ProofResultContext,
646    },
647    /// Blinded storage node retrieval request
648    BlindedStorageNode {
649        /// Target account
650        account: B256,
651        /// Path to the storage node
652        path: Nibbles,
653        /// Channel to send result back to original caller
654        result_sender: Sender<TrieNodeProviderResult>,
655    },
656}
657
658/// Worker for storage trie operations.
659///
660/// Each worker maintains a dedicated database transaction and processes
661/// storage proof requests and blinded node lookups.
662struct StorageProofWorker<Factory> {
663    /// Shared task context with database factory and prefix sets
664    task_ctx: ProofTaskCtx<Factory>,
665    /// Channel for receiving work
666    work_rx: CrossbeamReceiver<StorageWorkerJob>,
667    /// Unique identifier for this worker (used for tracing)
668    worker_id: usize,
669    /// Counter tracking worker availability
670    available_workers: Arc<AtomicUsize>,
671    /// Metrics collector for this worker
672    #[cfg(feature = "metrics")]
673    metrics: ProofTaskTrieMetrics,
674    /// Cursor metrics for this worker
675    #[cfg(feature = "metrics")]
676    cursor_metrics: ProofTaskCursorMetrics,
677}
678
679impl<Factory> StorageProofWorker<Factory>
680where
681    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
682{
683    /// Creates a new storage proof worker.
684    const fn new(
685        task_ctx: ProofTaskCtx<Factory>,
686        work_rx: CrossbeamReceiver<StorageWorkerJob>,
687        worker_id: usize,
688        available_workers: Arc<AtomicUsize>,
689        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
690        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
691    ) -> Self {
692        Self {
693            task_ctx,
694            work_rx,
695            worker_id,
696            available_workers,
697            #[cfg(feature = "metrics")]
698            metrics,
699            #[cfg(feature = "metrics")]
700            cursor_metrics,
701        }
702    }
703
704    /// Runs the worker loop, processing jobs until the channel closes.
705    ///
706    /// # Lifecycle
707    ///
708    /// 1. Initializes database provider and transaction
709    /// 2. Advertises availability
710    /// 3. Processes jobs in a loop:
711    ///    - Receives job from channel
712    ///    - Marks worker as busy
713    ///    - Processes the job
714    ///    - Marks worker as available
715    /// 4. Shuts down when channel closes
716    ///
717    /// # Panic Safety
718    ///
719    /// If this function panics, the worker thread terminates but other workers
720    /// continue operating and the system degrades gracefully.
721    fn run(mut self) -> ProviderResult<()> {
722        let Self {
723            task_ctx,
724            work_rx,
725            worker_id,
726            available_workers,
727            #[cfg(feature = "metrics")]
728            metrics,
729            #[cfg(feature = "metrics")]
730            ref mut cursor_metrics,
731        } = self;
732
733        // Create provider from factory
734        let provider = task_ctx.factory.database_provider_ro()?;
735        let proof_tx = ProofTaskTx::new(provider, worker_id);
736
737        trace!(
738            target: "trie::proof_task",
739            worker_id,
740            "Storage worker started"
741        );
742
743        let mut storage_proofs_processed = 0u64;
744        let mut storage_nodes_processed = 0u64;
745        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
746
747        // Initially mark this worker as available.
748        available_workers.fetch_add(1, Ordering::Relaxed);
749
750        while let Ok(job) = work_rx.recv() {
751            // Mark worker as busy.
752            available_workers.fetch_sub(1, Ordering::Relaxed);
753
754            match job {
755                StorageWorkerJob::StorageProof { input, proof_result_sender } => {
756                    Self::process_storage_proof(
757                        worker_id,
758                        &proof_tx,
759                        input,
760                        proof_result_sender,
761                        &mut storage_proofs_processed,
762                        &mut cursor_metrics_cache,
763                    );
764                }
765
766                StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
767                    Self::process_blinded_node(
768                        worker_id,
769                        &proof_tx,
770                        account,
771                        path,
772                        result_sender,
773                        &mut storage_nodes_processed,
774                    );
775                }
776            }
777
778            // Mark worker as available again.
779            available_workers.fetch_add(1, Ordering::Relaxed);
780        }
781
782        trace!(
783            target: "trie::proof_task",
784            worker_id,
785            storage_proofs_processed,
786            storage_nodes_processed,
787            "Storage worker shutting down"
788        );
789
790        #[cfg(feature = "metrics")]
791        {
792            metrics.record_storage_nodes(storage_nodes_processed as usize);
793            cursor_metrics.record(&mut cursor_metrics_cache);
794        }
795
796        Ok(())
797    }
798
799    /// Processes a storage proof request.
800    fn process_storage_proof<Provider>(
801        worker_id: usize,
802        proof_tx: &ProofTaskTx<Provider>,
803        input: StorageProofInput,
804        proof_result_sender: ProofResultContext,
805        storage_proofs_processed: &mut u64,
806        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
807    ) where
808        Provider: TrieCursorFactory + HashedCursorFactory,
809    {
810        let hashed_address = input.hashed_address;
811        let ProofResultContext { sender, sequence_number: seq, state, start_time } =
812            proof_result_sender;
813
814        let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
815        let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
816
817        trace!(
818            target: "trie::proof_task",
819            worker_id,
820            hashed_address = ?hashed_address,
821            prefix_set_len = input.prefix_set.len(),
822            target_slots_len = input.target_slots.len(),
823            "Processing storage proof"
824        );
825
826        let proof_start = Instant::now();
827        let result = proof_tx.compute_storage_proof(
828            input,
829            &mut trie_cursor_metrics,
830            &mut hashed_cursor_metrics,
831        );
832
833        let proof_elapsed = proof_start.elapsed();
834        *storage_proofs_processed += 1;
835
836        let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
837            hashed_address,
838            proof: storage_proof,
839        });
840
841        if sender
842            .send(ProofResultMessage {
843                sequence_number: seq,
844                result: result_msg,
845                elapsed: start_time.elapsed(),
846                state,
847            })
848            .is_err()
849        {
850            trace!(
851                target: "trie::proof_task",
852                worker_id,
853                hashed_address = ?hashed_address,
854                storage_proofs_processed,
855                "Proof result receiver dropped, discarding result"
856            );
857        }
858
859        trace!(
860            target: "trie::proof_task",
861            worker_id,
862            hashed_address = ?hashed_address,
863            proof_time_us = proof_elapsed.as_micros(),
864            total_processed = storage_proofs_processed,
865            trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
866            hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
867            ?trie_cursor_metrics,
868            ?hashed_cursor_metrics,
869            "Storage proof completed"
870        );
871
872        #[cfg(feature = "metrics")]
873        {
874            // Accumulate per-proof metrics into the worker's cache
875            let per_proof_cache = ProofTaskCursorMetricsCache {
876                account_trie_cursor: TrieCursorMetricsCache::default(),
877                account_hashed_cursor: HashedCursorMetricsCache::default(),
878                storage_trie_cursor: trie_cursor_metrics,
879                storage_hashed_cursor: hashed_cursor_metrics,
880            };
881            cursor_metrics_cache.extend(&per_proof_cache);
882        }
883    }
884
885    /// Processes a blinded storage node lookup request.
886    fn process_blinded_node<Provider>(
887        worker_id: usize,
888        proof_tx: &ProofTaskTx<Provider>,
889        account: B256,
890        path: Nibbles,
891        result_sender: Sender<TrieNodeProviderResult>,
892        storage_nodes_processed: &mut u64,
893    ) where
894        Provider: TrieCursorFactory + HashedCursorFactory,
895    {
896        trace!(
897            target: "trie::proof_task",
898            worker_id,
899            ?account,
900            ?path,
901            "Processing blinded storage node"
902        );
903
904        let start = Instant::now();
905        let result = proof_tx.process_blinded_storage_node(account, &path);
906        let elapsed = start.elapsed();
907
908        *storage_nodes_processed += 1;
909
910        if result_sender.send(result).is_err() {
911            trace!(
912                target: "trie::proof_task",
913                worker_id,
914                ?account,
915                ?path,
916                storage_nodes_processed,
917                "Blinded storage node receiver dropped, discarding result"
918            );
919        }
920
921        trace!(
922            target: "trie::proof_task",
923            worker_id,
924            ?account,
925            ?path,
926            elapsed_us = elapsed.as_micros(),
927            total_processed = storage_nodes_processed,
928            "Blinded storage node completed"
929        );
930    }
931}
932
933/// Worker for account trie operations.
934///
935/// Each worker maintains a dedicated database transaction and processes
936/// account multiproof requests and blinded node lookups.
937struct AccountProofWorker<Factory> {
938    /// Shared task context with database factory and prefix sets
939    task_ctx: ProofTaskCtx<Factory>,
940    /// Channel for receiving work
941    work_rx: CrossbeamReceiver<AccountWorkerJob>,
942    /// Unique identifier for this worker (used for tracing)
943    worker_id: usize,
944    /// Channel for dispatching storage proof work
945    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
946    /// Counter tracking worker availability
947    available_workers: Arc<AtomicUsize>,
948    /// Metrics collector for this worker
949    #[cfg(feature = "metrics")]
950    metrics: ProofTaskTrieMetrics,
951    /// Cursor metrics for this worker
952    #[cfg(feature = "metrics")]
953    cursor_metrics: ProofTaskCursorMetrics,
954}
955
956impl<Factory> AccountProofWorker<Factory>
957where
958    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
959{
960    /// Creates a new account proof worker.
961    const fn new(
962        task_ctx: ProofTaskCtx<Factory>,
963        work_rx: CrossbeamReceiver<AccountWorkerJob>,
964        worker_id: usize,
965        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
966        available_workers: Arc<AtomicUsize>,
967        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
968        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
969    ) -> Self {
970        Self {
971            task_ctx,
972            work_rx,
973            worker_id,
974            storage_work_tx,
975            available_workers,
976            #[cfg(feature = "metrics")]
977            metrics,
978            #[cfg(feature = "metrics")]
979            cursor_metrics,
980        }
981    }
982
983    /// Runs the worker loop, processing jobs until the channel closes.
984    ///
985    /// # Lifecycle
986    ///
987    /// 1. Initializes database provider and transaction
988    /// 2. Advertises availability
989    /// 3. Processes jobs in a loop:
990    ///    - Receives job from channel
991    ///    - Marks worker as busy
992    ///    - Processes the job
993    ///    - Marks worker as available
994    /// 4. Shuts down when channel closes
995    ///
996    /// # Panic Safety
997    ///
998    /// If this function panics, the worker thread terminates but other workers
999    /// continue operating and the system degrades gracefully.
1000    fn run(mut self) -> ProviderResult<()> {
1001        let Self {
1002            task_ctx,
1003            work_rx,
1004            worker_id,
1005            storage_work_tx,
1006            available_workers,
1007            #[cfg(feature = "metrics")]
1008            metrics,
1009            #[cfg(feature = "metrics")]
1010            ref mut cursor_metrics,
1011        } = self;
1012
1013        // Create provider from factory
1014        let provider = task_ctx.factory.database_provider_ro()?;
1015        let proof_tx = ProofTaskTx::new(provider, worker_id);
1016
1017        trace!(
1018            target: "trie::proof_task",
1019            worker_id,
1020            "Account worker started"
1021        );
1022
1023        let mut account_proofs_processed = 0u64;
1024        let mut account_nodes_processed = 0u64;
1025        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1026
1027        // Count this worker as available only after successful initialization.
1028        available_workers.fetch_add(1, Ordering::Relaxed);
1029
1030        while let Ok(job) = work_rx.recv() {
1031            // Mark worker as busy.
1032            available_workers.fetch_sub(1, Ordering::Relaxed);
1033
1034            match job {
1035                AccountWorkerJob::AccountMultiproof { input } => {
1036                    Self::process_account_multiproof(
1037                        worker_id,
1038                        &proof_tx,
1039                        storage_work_tx.clone(),
1040                        *input,
1041                        &mut account_proofs_processed,
1042                        &mut cursor_metrics_cache,
1043                    );
1044                }
1045
1046                AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1047                    Self::process_blinded_node(
1048                        worker_id,
1049                        &proof_tx,
1050                        path,
1051                        result_sender,
1052                        &mut account_nodes_processed,
1053                    );
1054                }
1055            }
1056
1057            // Mark worker as available again.
1058            available_workers.fetch_add(1, Ordering::Relaxed);
1059        }
1060
1061        trace!(
1062            target: "trie::proof_task",
1063            worker_id,
1064            account_proofs_processed,
1065            account_nodes_processed,
1066            "Account worker shutting down"
1067        );
1068
1069        #[cfg(feature = "metrics")]
1070        {
1071            metrics.record_account_nodes(account_nodes_processed as usize);
1072            cursor_metrics.record(&mut cursor_metrics_cache);
1073        }
1074
1075        Ok(())
1076    }
1077
1078    /// Processes an account multiproof request.
1079    fn process_account_multiproof<Provider>(
1080        worker_id: usize,
1081        proof_tx: &ProofTaskTx<Provider>,
1082        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1083        input: AccountMultiproofInput,
1084        account_proofs_processed: &mut u64,
1085        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1086    ) where
1087        Provider: TrieCursorFactory + HashedCursorFactory,
1088    {
1089        let AccountMultiproofInput {
1090            targets,
1091            mut prefix_sets,
1092            collect_branch_node_masks,
1093            multi_added_removed_keys,
1094            missed_leaves_storage_roots,
1095            proof_result_sender:
1096                ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
1097        } = input;
1098
1099        let span = debug_span!(
1100            target: "trie::proof_task",
1101            "Account multiproof calculation",
1102            targets = targets.len(),
1103            worker_id,
1104        );
1105        let _span_guard = span.enter();
1106
1107        trace!(
1108            target: "trie::proof_task",
1109            "Processing account multiproof"
1110        );
1111
1112        let proof_start = Instant::now();
1113
1114        let mut tracker = ParallelTrieTracker::default();
1115
1116        let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
1117
1118        let storage_root_targets_len =
1119            StorageRootTargets::count(&prefix_sets.account_prefix_set, &storage_prefix_sets);
1120
1121        tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
1122
1123        let storage_proof_receivers = match dispatch_storage_proofs(
1124            &storage_work_tx,
1125            &targets,
1126            &mut storage_prefix_sets,
1127            collect_branch_node_masks,
1128            multi_added_removed_keys.as_ref(),
1129        ) {
1130            Ok(receivers) => receivers,
1131            Err(error) => {
1132                // Send error through result channel
1133                error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
1134                let _ = result_tx.send(ProofResultMessage {
1135                    sequence_number: seq,
1136                    result: Err(error),
1137                    elapsed: start.elapsed(),
1138                    state,
1139                });
1140                return;
1141            }
1142        };
1143
1144        // Use the missed leaves cache passed from the multiproof manager
1145        let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
1146
1147        let ctx = AccountMultiproofParams {
1148            targets: &targets,
1149            prefix_set: account_prefix_set,
1150            collect_branch_node_masks,
1151            multi_added_removed_keys: multi_added_removed_keys.as_ref(),
1152            storage_proof_receivers,
1153            missed_leaves_storage_roots: missed_leaves_storage_roots.as_ref(),
1154        };
1155
1156        let result =
1157            build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
1158
1159        let proof_elapsed = proof_start.elapsed();
1160        let total_elapsed = start.elapsed();
1161        let proof_cursor_metrics = tracker.cursor_metrics;
1162        proof_cursor_metrics.record_spans();
1163
1164        let stats = tracker.finish();
1165        let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
1166        *account_proofs_processed += 1;
1167
1168        // Send result to MultiProofTask
1169        if result_tx
1170            .send(ProofResultMessage {
1171                sequence_number: seq,
1172                result,
1173                elapsed: total_elapsed,
1174                state,
1175            })
1176            .is_err()
1177        {
1178            trace!(
1179                target: "trie::proof_task",
1180                worker_id,
1181                account_proofs_processed,
1182                "Account multiproof receiver dropped, discarding result"
1183            );
1184        }
1185
1186        trace!(
1187            target: "trie::proof_task",
1188            proof_time_us = proof_elapsed.as_micros(),
1189            total_elapsed_us = total_elapsed.as_micros(),
1190            total_processed = account_proofs_processed,
1191            account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1192            account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1193            storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1194            storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1195            account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1196            account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1197            storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1198            storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1199            "Account multiproof completed"
1200        );
1201
1202        #[cfg(feature = "metrics")]
1203        // Accumulate per-proof metrics into the worker's cache
1204        cursor_metrics_cache.extend(&proof_cursor_metrics);
1205    }
1206
1207    /// Processes a blinded account node lookup request.
1208    fn process_blinded_node<Provider>(
1209        worker_id: usize,
1210        proof_tx: &ProofTaskTx<Provider>,
1211        path: Nibbles,
1212        result_sender: Sender<TrieNodeProviderResult>,
1213        account_nodes_processed: &mut u64,
1214    ) where
1215        Provider: TrieCursorFactory + HashedCursorFactory,
1216    {
1217        let span = debug_span!(
1218            target: "trie::proof_task",
1219            "Blinded account node calculation",
1220            ?path,
1221            worker_id,
1222        );
1223        let _span_guard = span.enter();
1224
1225        trace!(
1226            target: "trie::proof_task",
1227            "Processing blinded account node"
1228        );
1229
1230        let start = Instant::now();
1231        let result = proof_tx.process_blinded_account_node(&path);
1232        let elapsed = start.elapsed();
1233
1234        *account_nodes_processed += 1;
1235
1236        if result_sender.send(result).is_err() {
1237            trace!(
1238                target: "trie::proof_task",
1239                worker_id,
1240                ?path,
1241                account_nodes_processed,
1242                "Blinded account node receiver dropped, discarding result"
1243            );
1244        }
1245
1246        trace!(
1247            target: "trie::proof_task",
1248            node_time_us = elapsed.as_micros(),
1249            total_processed = account_nodes_processed,
1250            "Blinded account node completed"
1251        );
1252    }
1253}
1254
1255/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk.
1256///
1257/// This is a helper function used by account workers to build the account subtree proof
1258/// while storage proofs are still being computed. Receivers are consumed only when needed,
1259/// enabling interleaved parallelism between account trie traversal and storage proof computation.
1260///
1261/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs.
1262fn build_account_multiproof_with_storage_roots<P>(
1263    provider: &P,
1264    ctx: AccountMultiproofParams<'_>,
1265    tracker: &mut ParallelTrieTracker,
1266) -> Result<DecodedMultiProof, ParallelStateRootError>
1267where
1268    P: TrieCursorFactory + HashedCursorFactory,
1269{
1270    let accounts_added_removed_keys =
1271        ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
1272
1273    // Create local metrics caches for account cursors. We can't directly use the metrics caches in
1274    // the tracker due to the call to `inc_missed_leaves` which occurs on it.
1275    let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default();
1276    let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default();
1277
1278    // Wrap account trie cursor with instrumented cursor
1279    let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
1280    let account_trie_cursor =
1281        InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics);
1282
1283    // Create the walker.
1284    let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
1285        .with_added_removed_keys(accounts_added_removed_keys)
1286        .with_deletions_retained(true);
1287
1288    // Create a hash builder to rebuild the root node since it is not available in the database.
1289    let retainer = ctx
1290        .targets
1291        .keys()
1292        .map(Nibbles::unpack)
1293        .collect::<ProofRetainer>()
1294        .with_added_removed_keys(accounts_added_removed_keys);
1295    let mut hash_builder = HashBuilder::default()
1296        .with_proof_retainer(retainer)
1297        .with_updates(ctx.collect_branch_node_masks);
1298
1299    // Initialize storage multiproofs map with pre-allocated capacity.
1300    // Proofs will be inserted as they're consumed from receivers during trie walk.
1301    let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
1302        B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default());
1303    let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
1304
1305    // Wrap account hashed cursor with instrumented cursor
1306    let account_hashed_cursor =
1307        provider.hashed_account_cursor().map_err(ProviderError::Database)?;
1308    let account_hashed_cursor =
1309        InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics);
1310
1311    let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
1312
1313    let mut storage_proof_receivers = ctx.storage_proof_receivers;
1314
1315    while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
1316        match account_node {
1317            TrieElement::Branch(node) => {
1318                hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
1319            }
1320            TrieElement::Leaf(hashed_address, account) => {
1321                let root = match storage_proof_receivers.remove(&hashed_address) {
1322                    Some(receiver) => {
1323                        let _guard = debug_span!(
1324                            target: "trie::proof_task",
1325                            "Waiting for storage proof",
1326                            ?hashed_address,
1327                        );
1328                        // Block on this specific storage proof receiver - enables interleaved
1329                        // parallelism
1330                        let proof_msg = receiver.recv().map_err(|_| {
1331                            ParallelStateRootError::StorageRoot(
1332                                reth_execution_errors::StorageRootError::Database(
1333                                    DatabaseError::Other(format!(
1334                                        "Storage proof channel closed for {hashed_address}"
1335                                    )),
1336                                ),
1337                            )
1338                        })?;
1339
1340                        drop(_guard);
1341
1342                        // Extract storage proof from the result
1343                        let proof = match proof_msg.result? {
1344                            ProofResult::StorageProof { hashed_address: addr, proof } => {
1345                                debug_assert_eq!(
1346                                    addr,
1347                                    hashed_address,
1348                                    "storage worker must return same address: expected {hashed_address}, got {addr}"
1349                                );
1350                                proof
1351                            }
1352                            ProofResult::AccountMultiproof { .. } => {
1353                                unreachable!("storage worker only sends StorageProof variant")
1354                            }
1355                        };
1356
1357                        let root = proof.root;
1358                        collected_decoded_storages.insert(hashed_address, proof);
1359                        root
1360                    }
1361                    // Since we do not store all intermediate nodes in the database, there might
1362                    // be a possibility of re-adding a non-modified leaf to the hash builder.
1363                    None => {
1364                        tracker.inc_missed_leaves();
1365
1366                        match ctx.missed_leaves_storage_roots.entry(hashed_address) {
1367                            dashmap::Entry::Occupied(occ) => *occ.get(),
1368                            dashmap::Entry::Vacant(vac) => {
1369                                let root =
1370                                    StorageProof::new_hashed(provider, provider, hashed_address)
1371                                        .with_prefix_set_mut(Default::default())
1372                                        .with_trie_cursor_metrics(
1373                                            &mut tracker.cursor_metrics.storage_trie_cursor,
1374                                        )
1375                                        .with_hashed_cursor_metrics(
1376                                            &mut tracker.cursor_metrics.storage_hashed_cursor,
1377                                        )
1378                                        .storage_multiproof(
1379                                            ctx.targets
1380                                                .get(&hashed_address)
1381                                                .cloned()
1382                                                .unwrap_or_default(),
1383                                        )
1384                                        .map_err(|e| {
1385                                            ParallelStateRootError::StorageRoot(
1386                                                reth_execution_errors::StorageRootError::Database(
1387                                                    DatabaseError::Other(e.to_string()),
1388                                                ),
1389                                            )
1390                                        })?
1391                                        .root;
1392
1393                                vac.insert(root);
1394                                root
1395                            }
1396                        }
1397                    }
1398                };
1399
1400                // Encode account
1401                account_rlp.clear();
1402                let account = account.into_trie_account(root);
1403                account.encode(&mut account_rlp as &mut dyn BufMut);
1404
1405                hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
1406            }
1407        }
1408    }
1409
1410    // Consume remaining storage proof receivers for accounts not encountered during trie walk.
1411    for (hashed_address, receiver) in storage_proof_receivers {
1412        if let Ok(proof_msg) = receiver.recv() {
1413            // Extract storage proof from the result
1414            if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
1415                collected_decoded_storages.insert(hashed_address, proof);
1416            }
1417        }
1418    }
1419
1420    let _ = hash_builder.root();
1421
1422    let account_subtree_raw_nodes = hash_builder.take_proof_nodes();
1423    let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?;
1424
1425    let branch_node_masks = if ctx.collect_branch_node_masks {
1426        let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default();
1427        updated_branch_nodes
1428            .into_iter()
1429            .map(|(path, node)| {
1430                (path, BranchNodeMasks { hash_mask: node.hash_mask, tree_mask: node.tree_mask })
1431            })
1432            .collect()
1433    } else {
1434        BranchNodeMasksMap::default()
1435    };
1436
1437    // Extend tracker with accumulated metrics from account cursors
1438    tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics);
1439    tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics);
1440
1441    Ok(DecodedMultiProof {
1442        account_subtree: decoded_account_subtree,
1443        branch_node_masks,
1444        storages: collected_decoded_storages,
1445    })
1446}
1447/// Queues storage proofs for all accounts in the targets and returns receivers.
1448///
1449/// This function queues all storage proof tasks to the worker pool but returns immediately
1450/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1451/// computation. This enables interleaved parallelism for better performance.
1452///
1453/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1454fn dispatch_storage_proofs(
1455    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1456    targets: &MultiProofTargets,
1457    storage_prefix_sets: &mut B256Map<PrefixSet>,
1458    with_branch_node_masks: bool,
1459    multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
1460) -> Result<B256Map<CrossbeamReceiver<ProofResultMessage>>, ParallelStateRootError> {
1461    let mut storage_proof_receivers =
1462        B256Map::with_capacity_and_hasher(targets.len(), Default::default());
1463
1464    // Dispatch all storage proofs to worker pool
1465    for (hashed_address, target_slots) in targets.iter() {
1466        let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
1467
1468        // Create channel for receiving ProofResultMessage
1469        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1470        let start = Instant::now();
1471
1472        // Create computation input (data only, no communication channel)
1473        let input = StorageProofInput::new(
1474            *hashed_address,
1475            prefix_set,
1476            target_slots.clone(),
1477            with_branch_node_masks,
1478            multi_added_removed_keys.cloned(),
1479        );
1480
1481        // Always dispatch a storage proof so we obtain the storage root even when no slots are
1482        // requested.
1483        storage_work_tx
1484            .send(StorageWorkerJob::StorageProof {
1485                input,
1486                proof_result_sender: ProofResultContext::new(
1487                    result_tx,
1488                    0,
1489                    HashedPostState::default(),
1490                    start,
1491                ),
1492            })
1493            .map_err(|_| {
1494                ParallelStateRootError::Other(format!(
1495                    "Failed to queue storage proof for {}: storage worker pool unavailable",
1496                    hashed_address
1497                ))
1498            })?;
1499
1500        storage_proof_receivers.insert(*hashed_address, result_rx);
1501    }
1502
1503    Ok(storage_proof_receivers)
1504}
1505/// Input parameters for storage proof computation.
1506#[derive(Debug)]
1507pub struct StorageProofInput {
1508    /// The hashed address for which the proof is calculated.
1509    hashed_address: B256,
1510    /// The prefix set for the proof calculation.
1511    prefix_set: PrefixSet,
1512    /// The target slots for the proof calculation.
1513    target_slots: B256Set,
1514    /// Whether or not to collect branch node masks
1515    with_branch_node_masks: bool,
1516    /// Provided by the user to give the necessary context to retain extra proofs.
1517    multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1518}
1519
1520impl StorageProofInput {
1521    /// Creates a new [`StorageProofInput`] with the given hashed address, prefix set, and target
1522    /// slots.
1523    pub const fn new(
1524        hashed_address: B256,
1525        prefix_set: PrefixSet,
1526        target_slots: B256Set,
1527        with_branch_node_masks: bool,
1528        multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1529    ) -> Self {
1530        Self {
1531            hashed_address,
1532            prefix_set,
1533            target_slots,
1534            with_branch_node_masks,
1535            multi_added_removed_keys,
1536        }
1537    }
1538}
1539/// Input parameters for account multiproof computation.
1540#[derive(Debug, Clone)]
1541pub struct AccountMultiproofInput {
1542    /// The targets for which to compute the multiproof.
1543    pub targets: MultiProofTargets,
1544    /// The prefix sets for the proof calculation.
1545    pub prefix_sets: TriePrefixSets,
1546    /// Whether or not to collect branch node masks.
1547    pub collect_branch_node_masks: bool,
1548    /// Provided by the user to give the necessary context to retain extra proofs.
1549    pub multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
1550    /// Cached storage proof roots for missed leaves encountered during account trie walk.
1551    pub missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
1552    /// Context for sending the proof result.
1553    pub proof_result_sender: ProofResultContext,
1554}
1555
1556/// Parameters for building an account multiproof with pre-computed storage roots.
1557struct AccountMultiproofParams<'a> {
1558    /// The targets for which to compute the multiproof.
1559    targets: &'a MultiProofTargets,
1560    /// The prefix set for the account trie walk.
1561    prefix_set: PrefixSet,
1562    /// Whether or not to collect branch node masks.
1563    collect_branch_node_masks: bool,
1564    /// Provided by the user to give the necessary context to retain extra proofs.
1565    multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
1566    /// Receivers for storage proofs being computed in parallel.
1567    storage_proof_receivers: B256Map<CrossbeamReceiver<ProofResultMessage>>,
1568    /// Cached storage proof roots for missed leaves encountered during account trie walk.
1569    missed_leaves_storage_roots: &'a DashMap<B256, B256>,
1570}
1571
1572/// Internal message for account workers.
1573#[derive(Debug)]
1574enum AccountWorkerJob {
1575    /// Account multiproof computation request
1576    AccountMultiproof {
1577        /// Account multiproof input parameters
1578        input: Box<AccountMultiproofInput>,
1579    },
1580    /// Blinded account node retrieval request
1581    BlindedAccountNode {
1582        /// Path to the account node
1583        path: Nibbles,
1584        /// Channel to send result back to original caller
1585        result_sender: Sender<TrieNodeProviderResult>,
1586    },
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591    use super::*;
1592    use reth_provider::test_utils::create_test_provider_factory;
1593    use tokio::{runtime::Builder, task};
1594
1595    fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1596        ProofTaskCtx::new(factory)
1597    }
1598
1599    /// Ensures `ProofWorkerHandle::new` spawns workers correctly.
1600    #[test]
1601    fn spawn_proof_workers_creates_handle() {
1602        let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
1603        runtime.block_on(async {
1604            let handle = tokio::runtime::Handle::current();
1605            let provider_factory = create_test_provider_factory();
1606            let factory =
1607                reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
1608            let ctx = test_ctx(factory);
1609
1610            let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
1611
1612            // Verify handle can be cloned
1613            let _cloned_handle = proof_handle.clone();
1614
1615            // Workers shut down automatically when handle is dropped
1616            drop(proof_handle);
1617            task::yield_now().await;
1618        });
1619    }
1620}