Skip to main content

reth_trie_parallel/
proof_task.rs

1//! Parallel proof computation using worker pools with dedicated database transactions.
2//!
3//!
4//! # Architecture
5//!
6//! - **Worker Pools**: Pre-spawned workers with dedicated database transactions
7//!   - Storage pool: Handles storage proofs and blinded storage node requests
8//!   - Account pool: Handles account multiproofs and blinded account node requests
9//! - **Direct Channel Access**: `ProofWorkerHandle` provides type-safe queue methods with direct
10//!   access to worker channels, eliminating routing overhead
11//! - **Automatic Shutdown**: Workers terminate gracefully when all handles are dropped
12//!
13//! # Message Flow
14//!
15//! 1. The `SparseTrieCacheTask` prepares a storage or account job and hands it to
16//!    `ProofWorkerHandle`. The job carries a `ProofResultContext` so the worker knows how to send
17//!    the result back.
18//! 2. A worker receives the job, runs the proof, and sends a `ProofResultMessage` through the
19//!    provided `ProofResultSender`.
20//! 3. The `SparseTrieCacheTask` receives the message and proceeds with its state-root logic.
21//!
22//! Each job gets its own direct channel so results go straight back to the `SparseTrieCacheTask`.
23//! That keeps ordering decisions in one place and lets workers run independently.
24//!
25//! ```text
26//! SparseTrieCacheTask -> ProofWorkerHandle -> Storage/Account Worker
27//!        ^                       |
28//!        |                       v
29//! ProofResultMessage <-- ProofResultSender
30//! ```
31
32use crate::{
33    root::ParallelStateRootError,
34    value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
35};
36use alloy_primitives::{
37    map::{B256Map, B256Set},
38    B256, U256,
39};
40use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
41use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
42use reth_primitives_traits::{dashmap::DashMap, FastInstant as Instant};
43use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
44use reth_storage_errors::db::DatabaseError;
45use reth_tasks::Runtime;
46use reth_trie::{
47    hashed_cursor::{HashedCursorFactory, HashedStorageCursor, InstrumentedHashedCursor},
48    proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider},
49    proof_v2,
50    trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieStorageCursor},
51    DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, Nibbles, ProofTrieNodeV2,
52    ProofV2Target,
53};
54use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
55use std::{
56    cell::RefCell,
57    rc::Rc,
58    sync::{
59        atomic::{AtomicBool, AtomicUsize, Ordering},
60        mpsc::{channel, Receiver, Sender},
61        Arc,
62    },
63    time::Duration,
64};
65use tracing::{debug, debug_span, error, instrument, trace};
66
67#[cfg(feature = "metrics")]
68use crate::proof_task_metrics::{
69    ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
70};
71
72type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
73
74/// Type alias for the V2 account proof calculator with instrumented cursors.
75type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
76    InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::AccountTrieCursor<'a>>,
77    InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::AccountCursor<'a>>,
78    AsyncAccountValueEncoder<
79        InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
80        InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
81    >,
82>;
83
84/// Type alias for the V2 storage proof calculator with instrumented cursors.
85type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
86    InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
87    InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
88>;
89
90/// Tracks worker availability counts.
91///
92/// It uses cacheline-aligned flags to avoid core-to-core chatter.
93#[derive(Debug)]
94struct AvailabilitySheet {
95    /// One flag per worker, each on its own cacheline. Workers store `true` when idle,
96    /// `false` when busy. Only the owning worker writes; the dispatcher only reads.
97    flags: Vec<crossbeam_utils::CachePadded<AtomicBool>>,
98}
99
100impl AvailabilitySheet {
101    /// Creates a new sheet with `count` workers, all initially marked as busy.
102    fn new(count: usize) -> Self {
103        let flags =
104            (0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect();
105        Self { flags }
106    }
107
108    /// Returns `true` if more than one worker is currently idle.
109    ///
110    /// Note, that this is somewhat racy since a flag that was just saying `idle` and we counted it
111    /// as such might turn into `busy` right away.
112    fn has_multiple_idle(&self) -> bool {
113        let mut idle = 0u32;
114        for flag in &self.flags {
115            if flag.load(Ordering::Relaxed) {
116                idle += 1;
117                if idle > 1 {
118                    return true;
119                }
120            }
121        }
122        false
123    }
124
125    /// Marks the given worker as idle.
126    fn mark_idle(&self, worker_id: usize) {
127        self.flags[worker_id].store(true, Ordering::Relaxed);
128    }
129
130    /// Marks the given worker as busy.
131    fn mark_busy(&self, worker_id: usize) {
132        self.flags[worker_id].store(false, Ordering::Relaxed);
133    }
134}
135
136/// A handle that provides type-safe access to proof worker pools.
137///
138/// The handle stores direct senders to both storage and account worker pools,
139/// eliminating the need for a routing thread. All handles share reference-counted
140/// channels, and workers shut down gracefully when all handles are dropped.
141#[derive(Debug, Clone)]
142pub struct ProofWorkerHandle {
143    /// Direct sender to storage worker pool
144    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
145    /// Direct sender to account worker pool
146    account_work_tx: CrossbeamSender<AccountWorkerJob>,
147    /// Per-worker availability flags for storage workers. Used to determine whether to chunk
148    /// multiproofs.
149    storage_availability: Arc<AvailabilitySheet>,
150    /// Per-worker availability flags for account workers. Used to determine whether to chunk
151    /// multiproofs.
152    account_availability: Arc<AvailabilitySheet>,
153    /// Total number of storage workers spawned
154    storage_worker_count: usize,
155    /// Total number of account workers spawned
156    account_worker_count: usize,
157}
158
159impl ProofWorkerHandle {
160    /// Spawns storage and account worker pools with dedicated database transactions.
161    ///
162    /// Returns a handle for submitting proof tasks to the worker pools.
163    /// Workers run until the last handle is dropped.
164    ///
165    /// # Parameters
166    /// - `runtime`: The centralized runtime used to spawn blocking worker tasks
167    /// - `task_ctx`: Shared context with database view and prefix sets
168    /// - `halve_workers`: Whether to halve the worker pool size (for small blocks)
169    #[instrument(
170        name = "ProofWorkerHandle::new",
171        level = "debug",
172        target = "trie::proof_task",
173        skip_all
174    )]
175    pub fn new<Factory>(
176        runtime: &Runtime,
177        task_ctx: ProofTaskCtx<Factory>,
178        halve_workers: bool,
179    ) -> Self
180    where
181        Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
182            + Clone
183            + Send
184            + Sync
185            + 'static,
186    {
187        let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
188        let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
189
190        let cached_storage_roots = Arc::<DashMap<_, _>>::default();
191
192        let divisor = if halve_workers { 2 } else { 1 };
193        let storage_worker_count =
194            runtime.proof_storage_worker_pool().current_num_threads() / divisor;
195        let account_worker_count =
196            runtime.proof_account_worker_pool().current_num_threads() / divisor;
197
198        let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count));
199        let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count));
200
201        debug!(
202            target: "trie::proof_task",
203            storage_worker_count,
204            account_worker_count,
205            halve_workers,
206            "Spawning proof worker pools"
207        );
208
209        // broadcast blocks until all workers exit (channel close), so run on
210        // tokio's blocking pool.
211        let storage_rt = runtime.clone();
212        let storage_task_ctx = task_ctx.clone();
213        let storage_avail = storage_availability.clone();
214        let storage_roots = cached_storage_roots.clone();
215        let storage_parent_span = tracing::Span::current();
216        runtime.spawn_blocking_named("storage-workers", move || {
217            let worker_id = AtomicUsize::new(0);
218            storage_rt.proof_storage_worker_pool().broadcast(storage_worker_count, |_| {
219                let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
220                let span = debug_span!(target: "trie::proof_task", parent: storage_parent_span.clone(), "storage_worker", ?worker_id);
221                let _guard = span.enter();
222
223                #[cfg(feature = "metrics")]
224                let metrics = ProofTaskTrieMetrics::default();
225                #[cfg(feature = "metrics")]
226                let cursor_metrics = ProofTaskCursorMetrics::new();
227
228                let worker = StorageProofWorker::new(
229                    storage_task_ctx.clone(),
230                    storage_work_rx.clone(),
231                    worker_id,
232                    storage_avail.clone(),
233                    storage_roots.clone(),
234                    #[cfg(feature = "metrics")]
235                    metrics,
236                    #[cfg(feature = "metrics")]
237                    cursor_metrics,
238                );
239                if let Err(error) = worker.run() {
240                    error!(
241                        target: "trie::proof_task",
242                        worker_id,
243                        ?error,
244                        "Storage worker failed"
245                    );
246                }
247            });
248        });
249
250        let account_rt = runtime.clone();
251        let account_tx = storage_work_tx.clone();
252        let account_avail = account_availability.clone();
253        let account_parent_span = tracing::Span::current();
254        runtime.spawn_blocking_named("account-workers", move || {
255            let worker_id = AtomicUsize::new(0);
256            account_rt.proof_account_worker_pool().broadcast(account_worker_count, |_| {
257                let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
258                let span = debug_span!(target: "trie::proof_task", parent: account_parent_span.clone(), "account_worker", ?worker_id);
259                let _guard = span.enter();
260
261                #[cfg(feature = "metrics")]
262                let metrics = ProofTaskTrieMetrics::default();
263                #[cfg(feature = "metrics")]
264                let cursor_metrics = ProofTaskCursorMetrics::new();
265
266                let worker = AccountProofWorker::new(
267                    task_ctx.clone(),
268                    account_work_rx.clone(),
269                    worker_id,
270                    account_tx.clone(),
271                    account_avail.clone(),
272                    cached_storage_roots.clone(),
273                    #[cfg(feature = "metrics")]
274                    metrics,
275                    #[cfg(feature = "metrics")]
276                    cursor_metrics,
277                );
278                if let Err(error) = worker.run() {
279                    error!(
280                        target: "trie::proof_task",
281                        worker_id,
282                        ?error,
283                        "Account worker failed"
284                    );
285                }
286            });
287        });
288
289        Self {
290            storage_work_tx,
291            account_work_tx,
292            storage_availability,
293            account_availability,
294            storage_worker_count,
295            account_worker_count,
296        }
297    }
298
299    /// Returns `true` if more than one storage worker is currently idle.
300    pub fn has_multiple_idle_storage_workers(&self) -> bool {
301        self.storage_availability.has_multiple_idle()
302    }
303
304    /// Returns `true` if more than one account worker is currently idle.
305    pub fn has_multiple_idle_account_workers(&self) -> bool {
306        self.account_availability.has_multiple_idle()
307    }
308
309    /// Returns the number of pending storage tasks in the queue.
310    pub fn pending_storage_tasks(&self) -> usize {
311        self.storage_work_tx.len()
312    }
313
314    /// Returns the number of pending account tasks in the queue.
315    pub fn pending_account_tasks(&self) -> usize {
316        self.account_work_tx.len()
317    }
318
319    /// Returns the total number of storage workers in the pool.
320    pub const fn total_storage_workers(&self) -> usize {
321        self.storage_worker_count
322    }
323
324    /// Returns the total number of account workers in the pool.
325    pub const fn total_account_workers(&self) -> usize {
326        self.account_worker_count
327    }
328
329    /// Dispatch a storage proof computation to storage worker pool
330    ///
331    /// The result will be sent via the `proof_result_sender` channel.
332    pub fn dispatch_storage_proof(
333        &self,
334        input: StorageProofInput,
335        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
336    ) -> Result<(), ProviderError> {
337        let hashed_address = input.hashed_address;
338        self.storage_work_tx
339            .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
340            .map_err(|err| {
341                if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
342                    let _ = proof_result_sender.send(StorageProofResultMessage {
343                        hashed_address,
344                        result: Err(DatabaseError::Other(
345                            "storage workers unavailable".to_string(),
346                        )
347                        .into()),
348                    });
349                }
350
351                ProviderError::other(std::io::Error::other("storage workers unavailable"))
352            })
353    }
354
355    /// Dispatch an account multiproof computation
356    ///
357    /// The result will be sent via the `result_sender` channel included in the input.
358    pub fn dispatch_account_multiproof(
359        &self,
360        input: AccountMultiproofInput,
361    ) -> Result<(), ProviderError> {
362        self.account_work_tx
363            .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
364            .map_err(|err| {
365                let error =
366                    ProviderError::other(std::io::Error::other("account workers unavailable"));
367
368                if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
369                    let ProofResultContext { sender: result_tx, state, start_time: start } =
370                        input.into_proof_result_sender();
371
372                    let _ = result_tx.send(ProofResultMessage {
373                        result: Err(ParallelStateRootError::Provider(error.clone())),
374                        elapsed: start.elapsed(),
375                        state,
376                    });
377                }
378
379                error
380            })
381    }
382
383    /// Dispatch blinded storage node request to storage worker pool
384    pub(crate) fn dispatch_blinded_storage_node(
385        &self,
386        account: B256,
387        path: Nibbles,
388    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
389        let (tx, rx) = channel();
390        self.storage_work_tx
391            .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
392            .map_err(|_| {
393                ProviderError::other(std::io::Error::other("storage workers unavailable"))
394            })?;
395
396        Ok(rx)
397    }
398
399    /// Dispatch blinded account node request to account worker pool
400    pub(crate) fn dispatch_blinded_account_node(
401        &self,
402        path: Nibbles,
403    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
404        let (tx, rx) = channel();
405        self.account_work_tx
406            .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
407            .map_err(|_| {
408                ProviderError::other(std::io::Error::other("account workers unavailable"))
409            })?;
410
411        Ok(rx)
412    }
413}
414
415/// Data used for initializing cursor factories that is shared across all proof worker instances.
416#[derive(Clone, Debug)]
417pub struct ProofTaskCtx<Factory> {
418    /// The factory for creating state providers.
419    factory: Factory,
420    /// Maximum random jitter to apply before each proof computation (trie-debug only).
421    #[cfg(feature = "trie-debug")]
422    proof_jitter: Option<Duration>,
423}
424
425impl<Factory> ProofTaskCtx<Factory> {
426    /// Creates a new [`ProofTaskCtx`] with the given factory.
427    pub const fn new(factory: Factory) -> Self {
428        Self {
429            factory,
430            #[cfg(feature = "trie-debug")]
431            proof_jitter: None,
432        }
433    }
434
435    /// Sets the maximum proof jitter duration (trie-debug only).
436    #[cfg(feature = "trie-debug")]
437    pub const fn with_proof_jitter(mut self, jitter: Option<Duration>) -> Self {
438        self.proof_jitter = jitter;
439        self
440    }
441}
442
443/// This contains all information shared between account proof worker instances.
444#[derive(Debug)]
445pub struct ProofTaskTx<Provider> {
446    /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
447    provider: Provider,
448
449    /// Identifier for the worker within the worker pool, used only for tracing.
450    id: usize,
451}
452
453impl<Provider> ProofTaskTx<Provider> {
454    /// Initializes a [`ProofTaskTx`] with the given provider and ID.
455    const fn new(provider: Provider, id: usize) -> Self {
456        Self { provider, id }
457    }
458}
459
460impl<Provider> ProofTaskTx<Provider>
461where
462    Provider: TrieCursorFactory + HashedCursorFactory,
463{
464    fn compute_v2_storage_proof<TC, HC>(
465        &self,
466        input: StorageProofInput,
467        calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
468    ) -> Result<StorageProofResult, StateProofError>
469    where
470        TC: TrieStorageCursor,
471        HC: HashedStorageCursor<Value = U256>,
472    {
473        let StorageProofInput { hashed_address, mut targets } = input;
474
475        let span = debug_span!(
476            target: "trie::proof_task",
477            "V2 Storage proof calculation",
478            n = %targets.len(),
479        );
480        let _span_guard = span.enter();
481
482        let proof_start = Instant::now();
483
484        // If targets is empty it means the caller only wants the root node.
485        let proof = if targets.is_empty() {
486            let root_node = calculator.storage_root_node(hashed_address)?;
487            vec![root_node]
488        } else {
489            calculator.storage_proof(hashed_address, &mut targets)?
490        };
491
492        let root = calculator.compute_root_hash(&proof)?;
493
494        trace!(
495            target: "trie::proof_task",
496            hashed_address = ?hashed_address,
497            proof_time_us = proof_start.elapsed().as_micros(),
498            ?root,
499            worker_id = self.id,
500            "Completed V2 storage proof calculation"
501        );
502
503        Ok(StorageProofResult { proof, root })
504    }
505
506    /// Process a blinded storage node request.
507    ///
508    /// Used by storage workers to retrieve blinded storage trie nodes for proof construction.
509    fn process_blinded_storage_node(
510        &self,
511        account: B256,
512        path: &Nibbles,
513    ) -> TrieNodeProviderResult {
514        let storage_node_provider =
515            ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
516        storage_node_provider.trie_node(path)
517    }
518}
519impl TrieNodeProviderFactory for ProofWorkerHandle {
520    type AccountNodeProvider = ProofTaskTrieNodeProvider;
521    type StorageNodeProvider = ProofTaskTrieNodeProvider;
522
523    fn account_node_provider(&self) -> Self::AccountNodeProvider {
524        ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
525    }
526
527    fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
528        ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
529    }
530}
531
532/// Trie node provider for retrieving trie nodes by path.
533#[derive(Debug)]
534pub enum ProofTaskTrieNodeProvider {
535    /// Blinded account trie node provider.
536    AccountNode {
537        /// Handle to the proof worker pools.
538        handle: ProofWorkerHandle,
539    },
540    /// Blinded storage trie node provider.
541    StorageNode {
542        /// Target account.
543        account: B256,
544        /// Handle to the proof worker pools.
545        handle: ProofWorkerHandle,
546    },
547}
548
549impl TrieNodeProvider for ProofTaskTrieNodeProvider {
550    fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
551        match self {
552            Self::AccountNode { handle } => {
553                let rx = handle
554                    .dispatch_blinded_account_node(*path)
555                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
556                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
557            }
558            Self::StorageNode { handle, account } => {
559                let rx = handle
560                    .dispatch_blinded_storage_node(*account, *path)
561                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
562                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
563            }
564        }
565    }
566}
567
568/// Channel used by worker threads to deliver `ProofResultMessage` items back to
569/// `SparseTrieCacheTask`.
570///
571/// Workers use this sender to deliver proof results directly to `SparseTrieCacheTask`.
572pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
573
574/// Message containing a completed proof result with metadata for direct delivery to
575/// `SparseTrieCacheTask`.
576///
577/// This type enables workers to send proof results directly to the `SparseTrieCacheTask` event
578/// loop.
579#[derive(Debug)]
580pub struct ProofResultMessage {
581    /// The proof calculation result
582    pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
583    /// Time taken for the entire proof calculation (from dispatch to completion)
584    pub elapsed: Duration,
585    /// Original state update that triggered this proof
586    pub state: HashedPostState,
587}
588
589/// Context for sending proof calculation results back to `SparseTrieCacheTask`.
590///
591/// This struct contains all context needed to send and track proof calculation results.
592/// Workers use this to deliver completed proofs back to the main event loop.
593#[derive(Debug, Clone)]
594pub struct ProofResultContext {
595    /// Channel sender for result delivery
596    pub sender: ProofResultSender,
597    /// Original state update that triggered this proof
598    pub state: HashedPostState,
599    /// Calculation start time for measuring elapsed duration
600    pub start_time: Instant,
601}
602
603impl ProofResultContext {
604    /// Creates a new proof result context.
605    pub const fn new(
606        sender: ProofResultSender,
607        state: HashedPostState,
608        start_time: Instant,
609    ) -> Self {
610        Self { sender, state, start_time }
611    }
612}
613
614/// The results of a storage proof calculation.
615#[derive(Debug)]
616pub(crate) struct StorageProofResult {
617    /// The calculated V2 proof nodes
618    pub proof: Vec<ProofTrieNodeV2>,
619    /// The storage root calculated by the V2 proof
620    pub root: Option<B256>,
621}
622
623impl StorageProofResult {
624    /// Returns the calculated root of the trie, if one can be calculated from the proof.
625    const fn root(&self) -> Option<B256> {
626        self.root
627    }
628}
629
630/// Message containing a completed storage proof result with metadata.
631#[derive(Debug)]
632pub struct StorageProofResultMessage {
633    /// The hashed address this storage proof belongs to
634    #[allow(dead_code)]
635    pub(crate) hashed_address: B256,
636    /// The storage proof calculation result
637    pub(crate) result: Result<StorageProofResult, StateProofError>,
638}
639
640/// Internal message for storage workers.
641#[derive(Debug)]
642pub(crate) enum StorageWorkerJob {
643    /// Storage proof computation request
644    StorageProof {
645        /// Storage proof input parameters
646        input: StorageProofInput,
647        /// Context for sending the proof result.
648        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
649    },
650    /// Blinded storage node retrieval request
651    BlindedStorageNode {
652        /// Target account
653        account: B256,
654        /// Path to the storage node
655        path: Nibbles,
656        /// Channel to send result back to original caller
657        result_sender: Sender<TrieNodeProviderResult>,
658    },
659}
660
661/// Worker for storage trie operations.
662///
663/// Each worker maintains a dedicated database transaction and processes
664/// storage proof requests and blinded node lookups.
665struct StorageProofWorker<Factory> {
666    /// Shared task context with database factory and prefix sets
667    task_ctx: ProofTaskCtx<Factory>,
668    /// Channel for receiving work
669    work_rx: CrossbeamReceiver<StorageWorkerJob>,
670    /// Unique identifier for this worker (used for tracing)
671    worker_id: usize,
672    /// Per-worker availability flags
673    availability: Arc<AvailabilitySheet>,
674    /// Cached storage roots
675    cached_storage_roots: Arc<DashMap<B256, B256>>,
676    /// Metrics collector for this worker
677    #[cfg(feature = "metrics")]
678    metrics: ProofTaskTrieMetrics,
679    /// Cursor metrics for this worker
680    #[cfg(feature = "metrics")]
681    cursor_metrics: ProofTaskCursorMetrics,
682}
683
684impl<Factory> StorageProofWorker<Factory>
685where
686    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
687{
688    /// Creates a new storage proof worker.
689    const fn new(
690        task_ctx: ProofTaskCtx<Factory>,
691        work_rx: CrossbeamReceiver<StorageWorkerJob>,
692        worker_id: usize,
693        availability: Arc<AvailabilitySheet>,
694        cached_storage_roots: Arc<DashMap<B256, B256>>,
695        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
696        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
697    ) -> Self {
698        Self {
699            task_ctx,
700            work_rx,
701            worker_id,
702            availability,
703            cached_storage_roots,
704            #[cfg(feature = "metrics")]
705            metrics,
706            #[cfg(feature = "metrics")]
707            cursor_metrics,
708        }
709    }
710
711    /// Runs the worker loop, processing jobs until the channel closes.
712    ///
713    /// # Lifecycle
714    ///
715    /// 1. Initializes database provider and transaction
716    /// 2. Advertises availability
717    /// 3. Processes jobs in a loop:
718    ///    - Receives job from channel
719    ///    - Marks worker as busy
720    ///    - Processes the job
721    ///    - Marks worker as available
722    /// 4. Shuts down when channel closes
723    ///
724    /// # Panic Safety
725    ///
726    /// If this function panics, the worker thread terminates but other workers
727    /// continue operating and the system degrades gracefully.
728    fn run(mut self) -> ProviderResult<()> {
729        // Create provider from factory
730        let provider = self.task_ctx.factory.database_provider_ro()?;
731        let proof_tx = ProofTaskTx::new(provider, self.worker_id);
732
733        trace!(
734            target: "trie::proof_task",
735            worker_id = self.worker_id,
736            "Storage worker started"
737        );
738
739        let mut storage_proofs_processed = 0u64;
740        let mut storage_nodes_processed = 0u64;
741        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
742        let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
743        let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
744        let instrumented_trie_cursor =
745            InstrumentedTrieCursor::new(trie_cursor, &mut cursor_metrics_cache.storage_trie_cursor);
746        let instrumented_hashed_cursor = InstrumentedHashedCursor::new(
747            hashed_cursor,
748            &mut cursor_metrics_cache.storage_hashed_cursor,
749        );
750        let mut v2_calculator = proof_v2::StorageProofCalculator::new_storage(
751            instrumented_trie_cursor,
752            instrumented_hashed_cursor,
753        );
754
755        // Initially mark this worker as available.
756        self.availability.mark_idle(self.worker_id);
757
758        let mut total_idle_time = Duration::ZERO;
759        let mut idle_start = Instant::now();
760
761        while let Ok(job) = self.work_rx.recv() {
762            total_idle_time += idle_start.elapsed();
763
764            // Mark worker as busy.
765            self.availability.mark_busy(self.worker_id);
766
767            #[cfg(feature = "trie-debug")]
768            if let Some(max_jitter) = self.task_ctx.proof_jitter {
769                let jitter =
770                    Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
771                trace!(
772                    target: "trie::proof_task",
773                    worker_id = self.worker_id,
774                    jitter_us = jitter.as_micros(),
775                    "Storage worker applying proof jitter"
776                );
777                std::thread::sleep(jitter);
778            }
779
780            match job {
781                StorageWorkerJob::StorageProof { input, proof_result_sender } => {
782                    self.process_storage_proof(
783                        &proof_tx,
784                        &mut v2_calculator,
785                        input,
786                        proof_result_sender,
787                        &mut storage_proofs_processed,
788                    );
789                }
790
791                StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
792                    Self::process_blinded_node(
793                        self.worker_id,
794                        &proof_tx,
795                        account,
796                        path,
797                        result_sender,
798                        &mut storage_nodes_processed,
799                    );
800                }
801            }
802
803            // Mark worker as available again.
804            self.availability.mark_idle(self.worker_id);
805
806            idle_start = Instant::now();
807        }
808
809        // Drop calculator to release mutable borrows on cursor_metrics_cache.
810        drop(v2_calculator);
811
812        trace!(
813            target: "trie::proof_task",
814            worker_id = self.worker_id,
815            storage_proofs_processed,
816            storage_nodes_processed,
817            total_idle_time_us = total_idle_time.as_micros(),
818            "Storage worker shutting down"
819        );
820
821        #[cfg(feature = "metrics")]
822        {
823            self.metrics.record_storage_nodes(storage_nodes_processed as usize);
824            self.metrics.record_storage_worker_idle_time(total_idle_time);
825            self.cursor_metrics.record(&mut cursor_metrics_cache);
826        }
827
828        Ok(())
829    }
830
831    /// Processes a storage proof request.
832    fn process_storage_proof<Provider, TC, HC>(
833        &self,
834        proof_tx: &ProofTaskTx<Provider>,
835        v2_calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
836        input: StorageProofInput,
837        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
838        storage_proofs_processed: &mut u64,
839    ) where
840        Provider: TrieCursorFactory + HashedCursorFactory,
841        TC: TrieStorageCursor,
842        HC: HashedStorageCursor<Value = U256>,
843    {
844        let hashed_address = input.hashed_address;
845        let proof_start = Instant::now();
846
847        trace!(
848            target: "trie::proof_task",
849            worker_id = self.worker_id,
850            hashed_address = ?hashed_address,
851            targets_len = input.targets.len(),
852            "Processing V2 storage proof"
853        );
854
855        let result = proof_tx.compute_v2_storage_proof(input, v2_calculator);
856
857        let proof_elapsed = proof_start.elapsed();
858        *storage_proofs_processed += 1;
859
860        let root = result.as_ref().ok().and_then(|result| result.root());
861
862        if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
863            trace!(
864                target: "trie::proof_task",
865                worker_id = self.worker_id,
866                hashed_address = ?hashed_address,
867                storage_proofs_processed,
868                "Proof result receiver dropped, discarding result"
869            );
870        }
871
872        if let Some(root) = root {
873            self.cached_storage_roots.insert(hashed_address, root);
874        }
875
876        trace!(
877            target: "trie::proof_task",
878            worker_id = self.worker_id,
879            hashed_address = ?hashed_address,
880            proof_time_us = proof_elapsed.as_micros(),
881            total_processed = storage_proofs_processed,
882            ?root,
883            "Storage proof completed"
884        );
885    }
886
887    /// Processes a blinded storage node lookup request.
888    fn process_blinded_node<Provider>(
889        worker_id: usize,
890        proof_tx: &ProofTaskTx<Provider>,
891        account: B256,
892        path: Nibbles,
893        result_sender: Sender<TrieNodeProviderResult>,
894        storage_nodes_processed: &mut u64,
895    ) where
896        Provider: TrieCursorFactory + HashedCursorFactory,
897    {
898        trace!(
899            target: "trie::proof_task",
900            worker_id,
901            ?account,
902            ?path,
903            "Processing blinded storage node"
904        );
905
906        let start = Instant::now();
907        let result = proof_tx.process_blinded_storage_node(account, &path);
908        let elapsed = start.elapsed();
909
910        *storage_nodes_processed += 1;
911
912        if result_sender.send(result).is_err() {
913            trace!(
914                target: "trie::proof_task",
915                worker_id,
916                ?account,
917                ?path,
918                storage_nodes_processed,
919                "Blinded storage node receiver dropped, discarding result"
920            );
921        }
922
923        trace!(
924            target: "trie::proof_task",
925            worker_id,
926            ?account,
927            ?path,
928            elapsed_us = elapsed.as_micros(),
929            total_processed = storage_nodes_processed,
930            "Blinded storage node completed"
931        );
932    }
933}
934
935/// Worker for account trie operations.
936///
937/// Each worker maintains a dedicated database transaction and processes
938/// account multiproof requests and blinded node lookups.
939struct AccountProofWorker<Factory> {
940    /// Shared task context with database factory and prefix sets
941    task_ctx: ProofTaskCtx<Factory>,
942    /// Channel for receiving work
943    work_rx: CrossbeamReceiver<AccountWorkerJob>,
944    /// Unique identifier for this worker (used for tracing)
945    worker_id: usize,
946    /// Channel for dispatching storage proof work (for pre-dispatched target proofs)
947    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
948    /// Per-worker availability flags
949    availability: Arc<AvailabilitySheet>,
950    /// Cached storage roots
951    cached_storage_roots: Arc<DashMap<B256, B256>>,
952    /// Metrics collector for this worker
953    #[cfg(feature = "metrics")]
954    metrics: ProofTaskTrieMetrics,
955    /// Cursor metrics for this worker
956    #[cfg(feature = "metrics")]
957    cursor_metrics: ProofTaskCursorMetrics,
958}
959
960impl<Factory> AccountProofWorker<Factory>
961where
962    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
963{
964    /// Creates a new account proof worker.
965    #[expect(clippy::too_many_arguments)]
966    const fn new(
967        task_ctx: ProofTaskCtx<Factory>,
968        work_rx: CrossbeamReceiver<AccountWorkerJob>,
969        worker_id: usize,
970        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
971        availability: Arc<AvailabilitySheet>,
972        cached_storage_roots: Arc<DashMap<B256, B256>>,
973        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
974        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
975    ) -> Self {
976        Self {
977            task_ctx,
978            work_rx,
979            worker_id,
980            storage_work_tx,
981            availability,
982            cached_storage_roots,
983            #[cfg(feature = "metrics")]
984            metrics,
985            #[cfg(feature = "metrics")]
986            cursor_metrics,
987        }
988    }
989
990    /// Runs the worker loop, processing jobs until the channel closes.
991    ///
992    /// # Lifecycle
993    ///
994    /// 1. Initializes database provider and transaction
995    /// 2. Advertises availability
996    /// 3. Processes jobs in a loop:
997    ///    - Receives job from channel
998    ///    - Marks worker as busy
999    ///    - Processes the job
1000    ///    - Marks worker as available
1001    /// 4. Shuts down when channel closes
1002    ///
1003    /// # Panic Safety
1004    ///
1005    /// If this function panics, the worker thread terminates but other workers
1006    /// continue operating and the system degrades gracefully.
1007    fn run(mut self) -> ProviderResult<()> {
1008        let provider = self.task_ctx.factory.database_provider_ro()?;
1009
1010        trace!(
1011            target: "trie::proof_task",
1012            worker_id=self.worker_id,
1013            "Account worker started"
1014        );
1015
1016        let mut account_proofs_processed = 0u64;
1017        let mut account_nodes_processed = 0u64;
1018        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
1019
1020        // Create both account and storage calculators for V2 proofs.
1021        // The storage calculator is wrapped in Rc<RefCell<...>> for sharing with value encoders.
1022        let account_trie_cursor = provider.account_trie_cursor()?;
1023        let account_hashed_cursor = provider.hashed_account_cursor()?;
1024
1025        let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
1026        let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
1027
1028        let instrumented_account_trie_cursor = InstrumentedTrieCursor::new(
1029            account_trie_cursor,
1030            &mut cursor_metrics_cache.account_trie_cursor,
1031        );
1032        let instrumented_account_hashed_cursor = InstrumentedHashedCursor::new(
1033            account_hashed_cursor,
1034            &mut cursor_metrics_cache.account_hashed_cursor,
1035        );
1036        let instrumented_storage_trie_cursor = InstrumentedTrieCursor::new(
1037            storage_trie_cursor,
1038            &mut cursor_metrics_cache.storage_trie_cursor,
1039        );
1040        let instrumented_storage_hashed_cursor = InstrumentedHashedCursor::new(
1041            storage_hashed_cursor,
1042            &mut cursor_metrics_cache.storage_hashed_cursor,
1043        );
1044
1045        let mut v2_account_calculator =
1046            proof_v2::ProofCalculator::<
1047                _,
1048                _,
1049                AsyncAccountValueEncoder<
1050                    InstrumentedTrieCursor<
1051                        '_,
1052                        <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
1053                    >,
1054                    InstrumentedHashedCursor<
1055                        '_,
1056                        <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
1057                    >,
1058                >,
1059            >::new(instrumented_account_trie_cursor, instrumented_account_hashed_cursor);
1060        let v2_storage_calculator =
1061            Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
1062                instrumented_storage_trie_cursor,
1063                instrumented_storage_hashed_cursor,
1064            )));
1065
1066        // Count this worker as available only after successful initialization.
1067        self.availability.mark_idle(self.worker_id);
1068
1069        let mut total_idle_time = Duration::ZERO;
1070        let mut idle_start = Instant::now();
1071        let mut value_encoder_stats_cache = ValueEncoderStats::default();
1072
1073        while let Ok(job) = self.work_rx.recv() {
1074            total_idle_time += idle_start.elapsed();
1075
1076            // Mark worker as busy.
1077            self.availability.mark_busy(self.worker_id);
1078
1079            #[cfg(feature = "trie-debug")]
1080            if let Some(max_jitter) = self.task_ctx.proof_jitter {
1081                let jitter =
1082                    Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
1083                trace!(
1084                    target: "trie::proof_task",
1085                    worker_id = self.worker_id,
1086                    jitter_us = jitter.as_micros(),
1087                    "Account worker applying proof jitter"
1088                );
1089                std::thread::sleep(jitter);
1090            }
1091
1092            match job {
1093                AccountWorkerJob::AccountMultiproof { input } => {
1094                    let value_encoder_stats = self.process_account_multiproof::<Factory::Provider>(
1095                        &mut v2_account_calculator,
1096                        v2_storage_calculator.clone(),
1097                        *input,
1098                        &mut account_proofs_processed,
1099                    );
1100                    total_idle_time += value_encoder_stats.storage_wait_time;
1101                    value_encoder_stats_cache.extend(&value_encoder_stats);
1102                }
1103
1104                AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1105                    Self::process_blinded_node(
1106                        self.worker_id,
1107                        &provider,
1108                        path,
1109                        result_sender,
1110                        &mut account_nodes_processed,
1111                    );
1112                }
1113            }
1114
1115            // Mark worker as available again.
1116            self.availability.mark_idle(self.worker_id);
1117
1118            idle_start = Instant::now();
1119        }
1120
1121        // Drop calculators to release mutable borrows on cursor_metrics_cache.
1122        drop(v2_account_calculator);
1123        drop(v2_storage_calculator);
1124
1125        trace!(
1126            target: "trie::proof_task",
1127            worker_id=self.worker_id,
1128            account_proofs_processed,
1129            account_nodes_processed,
1130            total_idle_time_us = total_idle_time.as_micros(),
1131            "Account worker shutting down"
1132        );
1133
1134        #[cfg(feature = "metrics")]
1135        {
1136            self.metrics.record_account_nodes(account_nodes_processed as usize);
1137            self.metrics.record_account_worker_idle_time(total_idle_time);
1138            self.cursor_metrics.record(&mut cursor_metrics_cache);
1139            self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
1140        }
1141
1142        Ok(())
1143    }
1144
1145    fn compute_v2_account_multiproof<'a, Provider>(
1146        &self,
1147        v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1148        v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1149        targets: MultiProofTargetsV2,
1150    ) -> Result<(DecodedMultiProofV2, ValueEncoderStats), ParallelStateRootError>
1151    where
1152        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1153    {
1154        let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1155
1156        let span = debug_span!(
1157            target: "trie::proof_task",
1158            "Account V2 multiproof calculation",
1159            account_targets = account_targets.len(),
1160            storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1161        );
1162        let _span_guard = span.enter();
1163
1164        trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1165
1166        let storage_proof_receivers =
1167            dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1168
1169        let mut value_encoder = AsyncAccountValueEncoder::new(
1170            storage_proof_receivers,
1171            self.cached_storage_roots.clone(),
1172            v2_storage_calculator,
1173        );
1174
1175        let account_proofs =
1176            v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
1177
1178        let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
1179
1180        let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
1181
1182        Ok((proof, value_encoder_stats))
1183    }
1184
1185    /// Processes an account multiproof request.
1186    ///
1187    /// Returns stats from the value encoder used during proof computation.
1188    fn process_account_multiproof<'a, Provider>(
1189        &self,
1190        v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1191        v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1192        input: AccountMultiproofInput,
1193        account_proofs_processed: &mut u64,
1194    ) -> ValueEncoderStats
1195    where
1196        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1197    {
1198        let proof_start = Instant::now();
1199
1200        let AccountMultiproofInput { targets, proof_result_sender } = input;
1201        let (result, value_encoder_stats) = match self.compute_v2_account_multiproof::<Provider>(
1202            v2_account_calculator,
1203            v2_storage_calculator,
1204            targets,
1205        ) {
1206            Ok((proof, stats)) => (Ok(proof), stats),
1207            Err(e) => (Err(e), ValueEncoderStats::default()),
1208        };
1209
1210        let ProofResultContext { sender: result_tx, state, start_time: start } =
1211            proof_result_sender;
1212
1213        let proof_elapsed = proof_start.elapsed();
1214        let total_elapsed = start.elapsed();
1215        *account_proofs_processed += 1;
1216
1217        // Send result to SparseTrieCacheTask
1218        if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
1219            trace!(
1220                target: "trie::proof_task",
1221                worker_id=self.worker_id,
1222                account_proofs_processed,
1223                "Account multiproof receiver dropped, discarding result"
1224            );
1225        }
1226
1227        trace!(
1228            target: "trie::proof_task",
1229            proof_time_us = proof_elapsed.as_micros(),
1230            total_elapsed_us = total_elapsed.as_micros(),
1231            total_processed = account_proofs_processed,
1232            "Account multiproof completed"
1233        );
1234
1235        value_encoder_stats
1236    }
1237
1238    /// Processes a blinded account node lookup request.
1239    fn process_blinded_node<Provider>(
1240        worker_id: usize,
1241        provider: &Provider,
1242        path: Nibbles,
1243        result_sender: Sender<TrieNodeProviderResult>,
1244        account_nodes_processed: &mut u64,
1245    ) where
1246        Provider: TrieCursorFactory + HashedCursorFactory,
1247    {
1248        let span = debug_span!(
1249            target: "trie::proof_task",
1250            "Blinded account node calculation",
1251            ?path,
1252        );
1253        let _span_guard = span.enter();
1254
1255        trace!(
1256            target: "trie::proof_task",
1257            "Processing blinded account node"
1258        );
1259
1260        let start = Instant::now();
1261        let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
1262        let result = account_node_provider.trie_node(&path);
1263        let elapsed = start.elapsed();
1264
1265        *account_nodes_processed += 1;
1266
1267        if result_sender.send(result).is_err() {
1268            trace!(
1269                target: "trie::proof_task",
1270                worker_id,
1271                ?path,
1272                account_nodes_processed,
1273                "Blinded account node receiver dropped, discarding result"
1274            );
1275        }
1276
1277        trace!(
1278            target: "trie::proof_task",
1279            node_time_us = elapsed.as_micros(),
1280            total_processed = account_nodes_processed,
1281            "Blinded account node completed"
1282        );
1283    }
1284}
1285
1286/// Queues V2 storage proofs for all accounts in the targets and returns receivers.
1287///
1288/// This function queues all storage proof tasks to the worker pool but returns immediately
1289/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1290/// computation. This enables interleaved parallelism for better performance.
1291///
1292/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1293fn dispatch_v2_storage_proofs(
1294    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1295    account_targets: &[ProofV2Target],
1296    mut storage_targets: B256Map<Vec<ProofV2Target>>,
1297) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1298    let mut storage_proof_receivers =
1299        B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1300
1301    // Collect hashed addresses from account targets that need their storage roots computed
1302    let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1303
1304    // For storage targets with associated account proofs, ensure the first target has
1305    // min_len(0) so the root node is returned for storage root computation
1306    for (hashed_address, targets) in &mut storage_targets {
1307        if account_target_addresses.contains(hashed_address) &&
1308            let Some(first) = targets.first_mut()
1309        {
1310            *first = first.with_min_len(0);
1311        }
1312    }
1313
1314    // Sort storage targets by address for optimal dispatch order.
1315    // Since trie walk processes accounts in lexicographical order, dispatching in the same order
1316    // reduces head-of-line blocking when consuming results.
1317    let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1318    sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1319
1320    // Dispatch all proofs for targeted storage slots
1321    for (hashed_address, targets) in sorted_storage_targets {
1322        // Create channel for receiving StorageProofResultMessage
1323        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1324        let input = StorageProofInput::new(hashed_address, targets);
1325
1326        storage_work_tx
1327            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1328            .map_err(|_| {
1329                ParallelStateRootError::Other(format!(
1330                    "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1331                ))
1332            })?;
1333
1334        storage_proof_receivers.insert(hashed_address, result_rx);
1335    }
1336
1337    Ok(storage_proof_receivers)
1338}
1339
1340/// Input parameters for storage proof computation.
1341#[derive(Debug)]
1342pub struct StorageProofInput {
1343    /// The hashed address for which the proof is calculated.
1344    pub hashed_address: B256,
1345    /// The set of proof targets
1346    pub targets: Vec<ProofV2Target>,
1347}
1348
1349impl StorageProofInput {
1350    /// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
1351    pub const fn new(hashed_address: B256, targets: Vec<ProofV2Target>) -> Self {
1352        Self { hashed_address, targets }
1353    }
1354}
1355
1356/// Input parameters for account multiproof computation.
1357#[derive(Debug)]
1358pub struct AccountMultiproofInput {
1359    /// The targets for which to compute the multiproof.
1360    pub targets: MultiProofTargetsV2,
1361    /// Context for sending the proof result.
1362    pub proof_result_sender: ProofResultContext,
1363}
1364
1365impl AccountMultiproofInput {
1366    /// Returns the [`ProofResultContext`] for this input, consuming the input.
1367    fn into_proof_result_sender(self) -> ProofResultContext {
1368        self.proof_result_sender
1369    }
1370}
1371
1372/// Internal message for account workers.
1373#[derive(Debug)]
1374enum AccountWorkerJob {
1375    /// Account multiproof computation request
1376    AccountMultiproof {
1377        /// Account multiproof input parameters
1378        input: Box<AccountMultiproofInput>,
1379    },
1380    /// Blinded account node retrieval request
1381    BlindedAccountNode {
1382        /// Path to the account node
1383        path: Nibbles,
1384        /// Channel to send result back to original caller
1385        result_sender: Sender<TrieNodeProviderResult>,
1386    },
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391    use super::*;
1392    use reth_provider::test_utils::create_test_provider_factory;
1393
1394    fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1395        ProofTaskCtx::new(factory)
1396    }
1397
1398    /// Ensures `ProofWorkerHandle::new` spawns workers correctly.
1399    #[test]
1400    fn spawn_proof_workers_creates_handle() {
1401        let provider_factory = create_test_provider_factory();
1402        let changeset_cache = reth_trie_db::ChangesetCache::new();
1403        let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1404            provider_factory,
1405            changeset_cache,
1406        );
1407        let ctx = test_ctx(factory);
1408
1409        let runtime = reth_tasks::Runtime::test();
1410        let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
1411
1412        // Verify handle can be cloned
1413        let _cloned_handle = proof_handle.clone();
1414
1415        // Workers shut down automatically when handle is dropped
1416        drop(proof_handle);
1417    }
1418}