Skip to main content

reth_trie_parallel/
proof_task.rs

1//! Parallel proof computation using worker pools with dedicated database transactions.
2//!
3//!
4//! # Architecture
5//!
6//! - **Worker Pools**: Pre-spawned workers with dedicated database transactions
7//!   - Storage pool: Handles storage proofs and blinded storage node requests
8//!   - Account pool: Handles account multiproofs and blinded account node requests
9//! - **Direct Channel Access**: `ProofWorkerHandle` provides type-safe queue methods with direct
10//!   access to worker channels, eliminating routing overhead
11//! - **Automatic Shutdown**: Workers terminate gracefully when all handles are dropped
12//!
13//! # Message Flow
14//!
15//! 1. The multiproof task prepares a storage or account job and hands it to `ProofWorkerHandle`.
16//!    The job carries a `ProofResultContext` so the worker knows how to send the result back.
17//! 2. A worker receives the job, runs the proof, and sends a `ProofResultMessage` through the
18//!    provided `ProofResultSender`.
19//! 3. The multiproof task receives the message and proceeds with its state-root logic.
20//!
21//! Each job gets its own direct channel so results go straight back to the multiproof task. That
22//! keeps ordering decisions in one place and lets workers run independently.
23//!
24//! ```text
25//! SparseTrieCacheTask -> ProofWorkerHandle -> Storage/Account Worker
26//!        ^                       |
27//!        |                       v
28//! ProofResultMessage <-- ProofResultSender
29//! ```
30
31use crate::{
32    root::ParallelStateRootError,
33    value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
34};
35use alloy_primitives::{
36    map::{B256Map, B256Set},
37    B256,
38};
39use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
40use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
41use reth_primitives_traits::{dashmap::DashMap, FastInstant as Instant};
42use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
43use reth_storage_errors::db::DatabaseError;
44use reth_tasks::Runtime;
45use reth_trie::{
46    hashed_cursor::HashedCursorFactory,
47    proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider},
48    proof_v2,
49    trie_cursor::TrieCursorFactory,
50    DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, Nibbles, ProofTrieNodeV2,
51    ProofV2Target,
52};
53use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
54use std::{
55    cell::RefCell,
56    rc::Rc,
57    sync::{
58        atomic::{AtomicUsize, Ordering},
59        mpsc::{channel, Receiver, Sender},
60        Arc,
61    },
62    time::Duration,
63};
64use tracing::{debug, debug_span, error, instrument, trace};
65
66#[cfg(feature = "metrics")]
67use crate::proof_task_metrics::{
68    ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
69};
70
71type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
72
73/// Type alias for the V2 account proof calculator.
74type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
75    <Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
76    <Provider as HashedCursorFactory>::AccountCursor<'a>,
77    AsyncAccountValueEncoder<
78        <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
79        <Provider as HashedCursorFactory>::StorageCursor<'a>,
80    >,
81>;
82
83/// Type alias for the V2 storage proof calculator.
84type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
85    <Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
86    <Provider as HashedCursorFactory>::StorageCursor<'a>,
87>;
88
89/// A handle that provides type-safe access to proof worker pools.
90///
91/// The handle stores direct senders to both storage and account worker pools,
92/// eliminating the need for a routing thread. All handles share reference-counted
93/// channels, and workers shut down gracefully when all handles are dropped.
94#[derive(Debug, Clone)]
95pub struct ProofWorkerHandle {
96    /// Direct sender to storage worker pool
97    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
98    /// Direct sender to account worker pool
99    account_work_tx: CrossbeamSender<AccountWorkerJob>,
100    /// Counter tracking available storage workers. Workers decrement when starting work,
101    /// increment when finishing. Used to determine whether to chunk multiproofs.
102    storage_available_workers: Arc<AtomicUsize>,
103    /// Counter tracking available account workers. Workers decrement when starting work,
104    /// increment when finishing. Used to determine whether to chunk multiproofs.
105    account_available_workers: Arc<AtomicUsize>,
106    /// Total number of storage workers spawned
107    storage_worker_count: usize,
108    /// Total number of account workers spawned
109    account_worker_count: usize,
110}
111
112impl ProofWorkerHandle {
113    /// Spawns storage and account worker pools with dedicated database transactions.
114    ///
115    /// Returns a handle for submitting proof tasks to the worker pools.
116    /// Workers run until the last handle is dropped.
117    ///
118    /// # Parameters
119    /// - `runtime`: The centralized runtime used to spawn blocking worker tasks
120    /// - `task_ctx`: Shared context with database view and prefix sets
121    /// - `halve_workers`: Whether to halve the worker pool size (for small blocks)
122    #[instrument(
123        name = "ProofWorkerHandle::new",
124        level = "debug",
125        target = "trie::proof_task",
126        skip_all
127    )]
128    pub fn new<Factory>(
129        runtime: &Runtime,
130        task_ctx: ProofTaskCtx<Factory>,
131        halve_workers: bool,
132    ) -> Self
133    where
134        Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
135            + Clone
136            + Send
137            + Sync
138            + 'static,
139    {
140        let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
141        let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
142
143        let storage_available_workers = Arc::<AtomicUsize>::default();
144        let account_available_workers = Arc::<AtomicUsize>::default();
145
146        let cached_storage_roots = Arc::<DashMap<_, _>>::default();
147
148        let divisor = if halve_workers { 2 } else { 1 };
149        let storage_worker_count =
150            runtime.proof_storage_worker_pool().current_num_threads() / divisor;
151        let account_worker_count =
152            runtime.proof_account_worker_pool().current_num_threads() / divisor;
153
154        debug!(
155            target: "trie::proof_task",
156            storage_worker_count,
157            account_worker_count,
158            halve_workers,
159            "Spawning proof worker pools"
160        );
161
162        // broadcast blocks until all workers exit (channel close), so run on
163        // tokio's blocking pool.
164        let storage_rt = runtime.clone();
165        let storage_task_ctx = task_ctx.clone();
166        let storage_avail = storage_available_workers.clone();
167        let storage_roots = cached_storage_roots.clone();
168        let storage_parent_span = tracing::Span::current();
169        runtime.spawn_blocking_named("storage-workers", move || {
170            let worker_id = AtomicUsize::new(0);
171            storage_rt.proof_storage_worker_pool().broadcast(storage_worker_count, |_| {
172                let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
173                let span = debug_span!(target: "trie::proof_task", parent: storage_parent_span.clone(), "storage_worker", ?worker_id);
174                let _guard = span.enter();
175
176                #[cfg(feature = "metrics")]
177                let metrics = ProofTaskTrieMetrics::default();
178                #[cfg(feature = "metrics")]
179                let cursor_metrics = ProofTaskCursorMetrics::new();
180
181                let worker = StorageProofWorker::new(
182                    storage_task_ctx.clone(),
183                    storage_work_rx.clone(),
184                    worker_id,
185                    storage_avail.clone(),
186                    storage_roots.clone(),
187                    #[cfg(feature = "metrics")]
188                    metrics,
189                    #[cfg(feature = "metrics")]
190                    cursor_metrics,
191                );
192                if let Err(error) = worker.run() {
193                    error!(
194                        target: "trie::proof_task",
195                        worker_id,
196                        ?error,
197                        "Storage worker failed"
198                    );
199                }
200            });
201        });
202
203        let account_rt = runtime.clone();
204        let account_tx = storage_work_tx.clone();
205        let account_avail = account_available_workers.clone();
206        let account_parent_span = tracing::Span::current();
207        runtime.spawn_blocking_named("account-workers", move || {
208            let worker_id = AtomicUsize::new(0);
209            account_rt.proof_account_worker_pool().broadcast(account_worker_count, |_| {
210                let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
211                let span = debug_span!(target: "trie::proof_task", parent: account_parent_span.clone(), "account_worker", ?worker_id);
212                let _guard = span.enter();
213
214                #[cfg(feature = "metrics")]
215                let metrics = ProofTaskTrieMetrics::default();
216                #[cfg(feature = "metrics")]
217                let cursor_metrics = ProofTaskCursorMetrics::new();
218
219                let worker = AccountProofWorker::new(
220                    task_ctx.clone(),
221                    account_work_rx.clone(),
222                    worker_id,
223                    account_tx.clone(),
224                    account_avail.clone(),
225                    cached_storage_roots.clone(),
226                    #[cfg(feature = "metrics")]
227                    metrics,
228                    #[cfg(feature = "metrics")]
229                    cursor_metrics,
230                );
231                if let Err(error) = worker.run() {
232                    error!(
233                        target: "trie::proof_task",
234                        worker_id,
235                        ?error,
236                        "Account worker failed"
237                    );
238                }
239            });
240        });
241
242        Self {
243            storage_work_tx,
244            account_work_tx,
245            storage_available_workers,
246            account_available_workers,
247            storage_worker_count,
248            account_worker_count,
249        }
250    }
251
252    /// Returns how many storage workers are currently available/idle.
253    pub fn available_storage_workers(&self) -> usize {
254        self.storage_available_workers.load(Ordering::Relaxed)
255    }
256
257    /// Returns how many account workers are currently available/idle.
258    pub fn available_account_workers(&self) -> usize {
259        self.account_available_workers.load(Ordering::Relaxed)
260    }
261
262    /// Returns the number of pending storage tasks in the queue.
263    pub fn pending_storage_tasks(&self) -> usize {
264        self.storage_work_tx.len()
265    }
266
267    /// Returns the number of pending account tasks in the queue.
268    pub fn pending_account_tasks(&self) -> usize {
269        self.account_work_tx.len()
270    }
271
272    /// Returns the total number of storage workers in the pool.
273    pub const fn total_storage_workers(&self) -> usize {
274        self.storage_worker_count
275    }
276
277    /// Returns the total number of account workers in the pool.
278    pub const fn total_account_workers(&self) -> usize {
279        self.account_worker_count
280    }
281
282    /// Returns the number of storage workers currently processing tasks.
283    ///
284    /// This is calculated as total workers minus available workers.
285    pub fn active_storage_workers(&self) -> usize {
286        self.storage_worker_count.saturating_sub(self.available_storage_workers())
287    }
288
289    /// Returns the number of account workers currently processing tasks.
290    ///
291    /// This is calculated as total workers minus available workers.
292    pub fn active_account_workers(&self) -> usize {
293        self.account_worker_count.saturating_sub(self.available_account_workers())
294    }
295
296    /// Dispatch a storage proof computation to storage worker pool
297    ///
298    /// The result will be sent via the `proof_result_sender` channel.
299    pub fn dispatch_storage_proof(
300        &self,
301        input: StorageProofInput,
302        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
303    ) -> Result<(), ProviderError> {
304        let hashed_address = input.hashed_address;
305        self.storage_work_tx
306            .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
307            .map_err(|err| {
308                if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
309                    let _ = proof_result_sender.send(StorageProofResultMessage {
310                        hashed_address,
311                        result: Err(DatabaseError::Other(
312                            "storage workers unavailable".to_string(),
313                        )
314                        .into()),
315                    });
316                }
317
318                ProviderError::other(std::io::Error::other("storage workers unavailable"))
319            })
320    }
321
322    /// Dispatch an account multiproof computation
323    ///
324    /// The result will be sent via the `result_sender` channel included in the input.
325    pub fn dispatch_account_multiproof(
326        &self,
327        input: AccountMultiproofInput,
328    ) -> Result<(), ProviderError> {
329        self.account_work_tx
330            .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
331            .map_err(|err| {
332                let error =
333                    ProviderError::other(std::io::Error::other("account workers unavailable"));
334
335                if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
336                    let ProofResultContext { sender: result_tx, state, start_time: start } =
337                        input.into_proof_result_sender();
338
339                    let _ = result_tx.send(ProofResultMessage {
340                        result: Err(ParallelStateRootError::Provider(error.clone())),
341                        elapsed: start.elapsed(),
342                        state,
343                    });
344                }
345
346                error
347            })
348    }
349
350    /// Dispatch blinded storage node request to storage worker pool
351    pub(crate) fn dispatch_blinded_storage_node(
352        &self,
353        account: B256,
354        path: Nibbles,
355    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
356        let (tx, rx) = channel();
357        self.storage_work_tx
358            .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: tx })
359            .map_err(|_| {
360                ProviderError::other(std::io::Error::other("storage workers unavailable"))
361            })?;
362
363        Ok(rx)
364    }
365
366    /// Dispatch blinded account node request to account worker pool
367    pub(crate) fn dispatch_blinded_account_node(
368        &self,
369        path: Nibbles,
370    ) -> Result<Receiver<TrieNodeProviderResult>, ProviderError> {
371        let (tx, rx) = channel();
372        self.account_work_tx
373            .send(AccountWorkerJob::BlindedAccountNode { path, result_sender: tx })
374            .map_err(|_| {
375                ProviderError::other(std::io::Error::other("account workers unavailable"))
376            })?;
377
378        Ok(rx)
379    }
380}
381
382/// Data used for initializing cursor factories that is shared across all proof worker instances.
383#[derive(Clone, Debug)]
384pub struct ProofTaskCtx<Factory> {
385    /// The factory for creating state providers.
386    factory: Factory,
387    /// Maximum random jitter to apply before each proof computation (trie-debug only).
388    #[cfg(feature = "trie-debug")]
389    proof_jitter: Option<Duration>,
390}
391
392impl<Factory> ProofTaskCtx<Factory> {
393    /// Creates a new [`ProofTaskCtx`] with the given factory.
394    pub const fn new(factory: Factory) -> Self {
395        Self {
396            factory,
397            #[cfg(feature = "trie-debug")]
398            proof_jitter: None,
399        }
400    }
401
402    /// Sets the maximum proof jitter duration (trie-debug only).
403    #[cfg(feature = "trie-debug")]
404    pub const fn with_proof_jitter(mut self, jitter: Option<Duration>) -> Self {
405        self.proof_jitter = jitter;
406        self
407    }
408}
409
410/// This contains all information shared between account proof worker instances.
411#[derive(Debug)]
412pub struct ProofTaskTx<Provider> {
413    /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
414    provider: Provider,
415
416    /// Identifier for the worker within the worker pool, used only for tracing.
417    id: usize,
418}
419
420impl<Provider> ProofTaskTx<Provider> {
421    /// Initializes a [`ProofTaskTx`] with the given provider and ID.
422    const fn new(provider: Provider, id: usize) -> Self {
423        Self { provider, id }
424    }
425}
426
427impl<Provider> ProofTaskTx<Provider>
428where
429    Provider: TrieCursorFactory + HashedCursorFactory,
430{
431    fn compute_v2_storage_proof(
432        &self,
433        input: StorageProofInput,
434        calculator: &mut proof_v2::StorageProofCalculator<
435            <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
436            <Provider as HashedCursorFactory>::StorageCursor<'_>,
437        >,
438    ) -> Result<StorageProofResult, StateProofError> {
439        let StorageProofInput { hashed_address, mut targets } = input;
440
441        let span = debug_span!(
442            target: "trie::proof_task",
443            "V2 Storage proof calculation",
444            n = %targets.len(),
445        );
446        let _span_guard = span.enter();
447
448        let proof_start = Instant::now();
449
450        // If targets is empty it means the caller only wants the root node.
451        let proof = if targets.is_empty() {
452            let root_node = calculator.storage_root_node(hashed_address)?;
453            vec![root_node]
454        } else {
455            calculator.storage_proof(hashed_address, &mut targets)?
456        };
457
458        let root = calculator.compute_root_hash(&proof)?;
459
460        trace!(
461            target: "trie::proof_task",
462            hashed_address = ?hashed_address,
463            proof_time_us = proof_start.elapsed().as_micros(),
464            ?root,
465            worker_id = self.id,
466            "Completed V2 storage proof calculation"
467        );
468
469        Ok(StorageProofResult { proof, root })
470    }
471
472    /// Process a blinded storage node request.
473    ///
474    /// Used by storage workers to retrieve blinded storage trie nodes for proof construction.
475    fn process_blinded_storage_node(
476        &self,
477        account: B256,
478        path: &Nibbles,
479    ) -> TrieNodeProviderResult {
480        let storage_node_provider =
481            ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
482        storage_node_provider.trie_node(path)
483    }
484}
485impl TrieNodeProviderFactory for ProofWorkerHandle {
486    type AccountNodeProvider = ProofTaskTrieNodeProvider;
487    type StorageNodeProvider = ProofTaskTrieNodeProvider;
488
489    fn account_node_provider(&self) -> Self::AccountNodeProvider {
490        ProofTaskTrieNodeProvider::AccountNode { handle: self.clone() }
491    }
492
493    fn storage_node_provider(&self, account: B256) -> Self::StorageNodeProvider {
494        ProofTaskTrieNodeProvider::StorageNode { account, handle: self.clone() }
495    }
496}
497
498/// Trie node provider for retrieving trie nodes by path.
499#[derive(Debug)]
500pub enum ProofTaskTrieNodeProvider {
501    /// Blinded account trie node provider.
502    AccountNode {
503        /// Handle to the proof worker pools.
504        handle: ProofWorkerHandle,
505    },
506    /// Blinded storage trie node provider.
507    StorageNode {
508        /// Target account.
509        account: B256,
510        /// Handle to the proof worker pools.
511        handle: ProofWorkerHandle,
512    },
513}
514
515impl TrieNodeProvider for ProofTaskTrieNodeProvider {
516    fn trie_node(&self, path: &Nibbles) -> Result<Option<RevealedNode>, SparseTrieError> {
517        match self {
518            Self::AccountNode { handle } => {
519                let rx = handle
520                    .dispatch_blinded_account_node(*path)
521                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
522                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
523            }
524            Self::StorageNode { handle, account } => {
525                let rx = handle
526                    .dispatch_blinded_storage_node(*account, *path)
527                    .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
528                rx.recv().map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?
529            }
530        }
531    }
532}
533
534/// Channel used by worker threads to deliver `ProofResultMessage` items back to
535/// `MultiProofTask`.
536///
537/// Workers use this sender to deliver proof results directly to `MultiProofTask`.
538pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
539
540/// Message containing a completed proof result with metadata for direct delivery to
541/// `MultiProofTask`.
542///
543/// This type enables workers to send proof results directly to the `MultiProofTask` event loop.
544#[derive(Debug)]
545pub struct ProofResultMessage {
546    /// The proof calculation result
547    pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
548    /// Time taken for the entire proof calculation (from dispatch to completion)
549    pub elapsed: Duration,
550    /// Original state update that triggered this proof
551    pub state: HashedPostState,
552}
553
554/// Context for sending proof calculation results back to `MultiProofTask`.
555///
556/// This struct contains all context needed to send and track proof calculation results.
557/// Workers use this to deliver completed proofs back to the main event loop.
558#[derive(Debug, Clone)]
559pub struct ProofResultContext {
560    /// Channel sender for result delivery
561    pub sender: ProofResultSender,
562    /// Original state update that triggered this proof
563    pub state: HashedPostState,
564    /// Calculation start time for measuring elapsed duration
565    pub start_time: Instant,
566}
567
568impl ProofResultContext {
569    /// Creates a new proof result context.
570    pub const fn new(
571        sender: ProofResultSender,
572        state: HashedPostState,
573        start_time: Instant,
574    ) -> Self {
575        Self { sender, state, start_time }
576    }
577}
578
579/// The results of a storage proof calculation.
580#[derive(Debug)]
581pub(crate) struct StorageProofResult {
582    /// The calculated V2 proof nodes
583    pub proof: Vec<ProofTrieNodeV2>,
584    /// The storage root calculated by the V2 proof
585    pub root: Option<B256>,
586}
587
588impl StorageProofResult {
589    /// Returns the calculated root of the trie, if one can be calculated from the proof.
590    const fn root(&self) -> Option<B256> {
591        self.root
592    }
593}
594
595/// Message containing a completed storage proof result with metadata.
596#[derive(Debug)]
597pub struct StorageProofResultMessage {
598    /// The hashed address this storage proof belongs to
599    #[allow(dead_code)]
600    pub(crate) hashed_address: B256,
601    /// The storage proof calculation result
602    pub(crate) result: Result<StorageProofResult, StateProofError>,
603}
604
605/// Internal message for storage workers.
606#[derive(Debug)]
607pub(crate) enum StorageWorkerJob {
608    /// Storage proof computation request
609    StorageProof {
610        /// Storage proof input parameters
611        input: StorageProofInput,
612        /// Context for sending the proof result.
613        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
614    },
615    /// Blinded storage node retrieval request
616    BlindedStorageNode {
617        /// Target account
618        account: B256,
619        /// Path to the storage node
620        path: Nibbles,
621        /// Channel to send result back to original caller
622        result_sender: Sender<TrieNodeProviderResult>,
623    },
624}
625
626/// Worker for storage trie operations.
627///
628/// Each worker maintains a dedicated database transaction and processes
629/// storage proof requests and blinded node lookups.
630struct StorageProofWorker<Factory> {
631    /// Shared task context with database factory and prefix sets
632    task_ctx: ProofTaskCtx<Factory>,
633    /// Channel for receiving work
634    work_rx: CrossbeamReceiver<StorageWorkerJob>,
635    /// Unique identifier for this worker (used for tracing)
636    worker_id: usize,
637    /// Counter tracking worker availability
638    available_workers: Arc<AtomicUsize>,
639    /// Cached storage roots
640    cached_storage_roots: Arc<DashMap<B256, B256>>,
641    /// Metrics collector for this worker
642    #[cfg(feature = "metrics")]
643    metrics: ProofTaskTrieMetrics,
644    /// Cursor metrics for this worker
645    #[cfg(feature = "metrics")]
646    cursor_metrics: ProofTaskCursorMetrics,
647}
648
649impl<Factory> StorageProofWorker<Factory>
650where
651    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
652{
653    /// Creates a new storage proof worker.
654    const fn new(
655        task_ctx: ProofTaskCtx<Factory>,
656        work_rx: CrossbeamReceiver<StorageWorkerJob>,
657        worker_id: usize,
658        available_workers: Arc<AtomicUsize>,
659        cached_storage_roots: Arc<DashMap<B256, B256>>,
660        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
661        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
662    ) -> Self {
663        Self {
664            task_ctx,
665            work_rx,
666            worker_id,
667            available_workers,
668            cached_storage_roots,
669            #[cfg(feature = "metrics")]
670            metrics,
671            #[cfg(feature = "metrics")]
672            cursor_metrics,
673        }
674    }
675
676    /// Runs the worker loop, processing jobs until the channel closes.
677    ///
678    /// # Lifecycle
679    ///
680    /// 1. Initializes database provider and transaction
681    /// 2. Advertises availability
682    /// 3. Processes jobs in a loop:
683    ///    - Receives job from channel
684    ///    - Marks worker as busy
685    ///    - Processes the job
686    ///    - Marks worker as available
687    /// 4. Shuts down when channel closes
688    ///
689    /// # Panic Safety
690    ///
691    /// If this function panics, the worker thread terminates but other workers
692    /// continue operating and the system degrades gracefully.
693    fn run(mut self) -> ProviderResult<()> {
694        // Create provider from factory
695        let provider = self.task_ctx.factory.database_provider_ro()?;
696        let proof_tx = ProofTaskTx::new(provider, self.worker_id);
697
698        trace!(
699            target: "trie::proof_task",
700            worker_id = self.worker_id,
701            "Storage worker started"
702        );
703
704        let mut storage_proofs_processed = 0u64;
705        let mut storage_nodes_processed = 0u64;
706        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
707        let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
708        let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
709        let mut v2_calculator =
710            proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor);
711
712        // Initially mark this worker as available.
713        self.available_workers.fetch_add(1, Ordering::Relaxed);
714
715        let mut total_idle_time = Duration::ZERO;
716        let mut idle_start = Instant::now();
717
718        while let Ok(job) = self.work_rx.recv() {
719            total_idle_time += idle_start.elapsed();
720
721            // Mark worker as busy.
722            self.available_workers.fetch_sub(1, Ordering::Relaxed);
723
724            #[cfg(feature = "trie-debug")]
725            if let Some(max_jitter) = self.task_ctx.proof_jitter {
726                let jitter =
727                    Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
728                trace!(
729                    target: "trie::proof_task",
730                    worker_id = self.worker_id,
731                    jitter_us = jitter.as_micros(),
732                    "Storage worker applying proof jitter"
733                );
734                std::thread::sleep(jitter);
735            }
736
737            match job {
738                StorageWorkerJob::StorageProof { input, proof_result_sender } => {
739                    self.process_storage_proof(
740                        &proof_tx,
741                        &mut v2_calculator,
742                        input,
743                        proof_result_sender,
744                        &mut storage_proofs_processed,
745                    );
746                }
747
748                StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
749                    Self::process_blinded_node(
750                        self.worker_id,
751                        &proof_tx,
752                        account,
753                        path,
754                        result_sender,
755                        &mut storage_nodes_processed,
756                    );
757                }
758            }
759
760            // Mark worker as available again.
761            self.available_workers.fetch_add(1, Ordering::Relaxed);
762
763            idle_start = Instant::now();
764        }
765
766        trace!(
767            target: "trie::proof_task",
768            worker_id = self.worker_id,
769            storage_proofs_processed,
770            storage_nodes_processed,
771            total_idle_time_us = total_idle_time.as_micros(),
772            "Storage worker shutting down"
773        );
774
775        #[cfg(feature = "metrics")]
776        {
777            self.metrics.record_storage_nodes(storage_nodes_processed as usize);
778            self.metrics.record_storage_worker_idle_time(total_idle_time);
779            self.cursor_metrics.record(&mut cursor_metrics_cache);
780        }
781
782        Ok(())
783    }
784
785    /// Processes a storage proof request.
786    fn process_storage_proof<Provider>(
787        &self,
788        proof_tx: &ProofTaskTx<Provider>,
789        v2_calculator: &mut proof_v2::StorageProofCalculator<
790            <Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
791            <Provider as HashedCursorFactory>::StorageCursor<'_>,
792        >,
793        input: StorageProofInput,
794        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
795        storage_proofs_processed: &mut u64,
796    ) where
797        Provider: TrieCursorFactory + HashedCursorFactory,
798    {
799        let hashed_address = input.hashed_address;
800        let proof_start = Instant::now();
801
802        trace!(
803            target: "trie::proof_task",
804            worker_id = self.worker_id,
805            hashed_address = ?hashed_address,
806            targets_len = input.targets.len(),
807            "Processing V2 storage proof"
808        );
809
810        let result = proof_tx.compute_v2_storage_proof(input, v2_calculator);
811
812        let proof_elapsed = proof_start.elapsed();
813        *storage_proofs_processed += 1;
814
815        let root = result.as_ref().ok().and_then(|result| result.root());
816
817        if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
818            trace!(
819                target: "trie::proof_task",
820                worker_id = self.worker_id,
821                hashed_address = ?hashed_address,
822                storage_proofs_processed,
823                "Proof result receiver dropped, discarding result"
824            );
825        }
826
827        if let Some(root) = root {
828            self.cached_storage_roots.insert(hashed_address, root);
829        }
830
831        trace!(
832            target: "trie::proof_task",
833            worker_id = self.worker_id,
834            hashed_address = ?hashed_address,
835            proof_time_us = proof_elapsed.as_micros(),
836            total_processed = storage_proofs_processed,
837            ?root,
838            "Storage proof completed"
839        );
840    }
841
842    /// Processes a blinded storage node lookup request.
843    fn process_blinded_node<Provider>(
844        worker_id: usize,
845        proof_tx: &ProofTaskTx<Provider>,
846        account: B256,
847        path: Nibbles,
848        result_sender: Sender<TrieNodeProviderResult>,
849        storage_nodes_processed: &mut u64,
850    ) where
851        Provider: TrieCursorFactory + HashedCursorFactory,
852    {
853        trace!(
854            target: "trie::proof_task",
855            worker_id,
856            ?account,
857            ?path,
858            "Processing blinded storage node"
859        );
860
861        let start = Instant::now();
862        let result = proof_tx.process_blinded_storage_node(account, &path);
863        let elapsed = start.elapsed();
864
865        *storage_nodes_processed += 1;
866
867        if result_sender.send(result).is_err() {
868            trace!(
869                target: "trie::proof_task",
870                worker_id,
871                ?account,
872                ?path,
873                storage_nodes_processed,
874                "Blinded storage node receiver dropped, discarding result"
875            );
876        }
877
878        trace!(
879            target: "trie::proof_task",
880            worker_id,
881            ?account,
882            ?path,
883            elapsed_us = elapsed.as_micros(),
884            total_processed = storage_nodes_processed,
885            "Blinded storage node completed"
886        );
887    }
888}
889
890/// Worker for account trie operations.
891///
892/// Each worker maintains a dedicated database transaction and processes
893/// account multiproof requests and blinded node lookups.
894struct AccountProofWorker<Factory> {
895    /// Shared task context with database factory and prefix sets
896    task_ctx: ProofTaskCtx<Factory>,
897    /// Channel for receiving work
898    work_rx: CrossbeamReceiver<AccountWorkerJob>,
899    /// Unique identifier for this worker (used for tracing)
900    worker_id: usize,
901    /// Channel for dispatching storage proof work (for pre-dispatched target proofs)
902    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
903    /// Counter tracking worker availability
904    available_workers: Arc<AtomicUsize>,
905    /// Cached storage roots
906    cached_storage_roots: Arc<DashMap<B256, B256>>,
907    /// Metrics collector for this worker
908    #[cfg(feature = "metrics")]
909    metrics: ProofTaskTrieMetrics,
910    /// Cursor metrics for this worker
911    #[cfg(feature = "metrics")]
912    cursor_metrics: ProofTaskCursorMetrics,
913}
914
915impl<Factory> AccountProofWorker<Factory>
916where
917    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
918{
919    /// Creates a new account proof worker.
920    #[allow(clippy::too_many_arguments)]
921    const fn new(
922        task_ctx: ProofTaskCtx<Factory>,
923        work_rx: CrossbeamReceiver<AccountWorkerJob>,
924        worker_id: usize,
925        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
926        available_workers: Arc<AtomicUsize>,
927        cached_storage_roots: Arc<DashMap<B256, B256>>,
928        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
929        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
930    ) -> Self {
931        Self {
932            task_ctx,
933            work_rx,
934            worker_id,
935            storage_work_tx,
936            available_workers,
937            cached_storage_roots,
938            #[cfg(feature = "metrics")]
939            metrics,
940            #[cfg(feature = "metrics")]
941            cursor_metrics,
942        }
943    }
944
945    /// Runs the worker loop, processing jobs until the channel closes.
946    ///
947    /// # Lifecycle
948    ///
949    /// 1. Initializes database provider and transaction
950    /// 2. Advertises availability
951    /// 3. Processes jobs in a loop:
952    ///    - Receives job from channel
953    ///    - Marks worker as busy
954    ///    - Processes the job
955    ///    - Marks worker as available
956    /// 4. Shuts down when channel closes
957    ///
958    /// # Panic Safety
959    ///
960    /// If this function panics, the worker thread terminates but other workers
961    /// continue operating and the system degrades gracefully.
962    fn run(mut self) -> ProviderResult<()> {
963        let provider = self.task_ctx.factory.database_provider_ro()?;
964
965        trace!(
966            target: "trie::proof_task",
967            worker_id=self.worker_id,
968            "Account worker started"
969        );
970
971        let mut account_proofs_processed = 0u64;
972        let mut account_nodes_processed = 0u64;
973        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
974
975        // Create both account and storage calculators for V2 proofs.
976        // The storage calculator is wrapped in Rc<RefCell<...>> for sharing with value encoders.
977        let account_trie_cursor = provider.account_trie_cursor()?;
978        let account_hashed_cursor = provider.hashed_account_cursor()?;
979
980        let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
981        let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
982
983        let mut v2_account_calculator = proof_v2::ProofCalculator::<
984            _,
985            _,
986            AsyncAccountValueEncoder<
987                <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
988                <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
989            >,
990        >::new(account_trie_cursor, account_hashed_cursor);
991        let v2_storage_calculator =
992            Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
993                storage_trie_cursor,
994                storage_hashed_cursor,
995            )));
996
997        // Count this worker as available only after successful initialization.
998        self.available_workers.fetch_add(1, Ordering::Relaxed);
999
1000        let mut total_idle_time = Duration::ZERO;
1001        let mut idle_start = Instant::now();
1002        let mut value_encoder_stats_cache = ValueEncoderStats::default();
1003
1004        while let Ok(job) = self.work_rx.recv() {
1005            total_idle_time += idle_start.elapsed();
1006
1007            // Mark worker as busy.
1008            self.available_workers.fetch_sub(1, Ordering::Relaxed);
1009
1010            #[cfg(feature = "trie-debug")]
1011            if let Some(max_jitter) = self.task_ctx.proof_jitter {
1012                let jitter =
1013                    Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
1014                trace!(
1015                    target: "trie::proof_task",
1016                    worker_id = self.worker_id,
1017                    jitter_us = jitter.as_micros(),
1018                    "Account worker applying proof jitter"
1019                );
1020                std::thread::sleep(jitter);
1021            }
1022
1023            match job {
1024                AccountWorkerJob::AccountMultiproof { input } => {
1025                    let value_encoder_stats = self.process_account_multiproof::<Factory::Provider>(
1026                        &mut v2_account_calculator,
1027                        v2_storage_calculator.clone(),
1028                        *input,
1029                        &mut account_proofs_processed,
1030                        &mut cursor_metrics_cache,
1031                    );
1032                    total_idle_time += value_encoder_stats.storage_wait_time;
1033                    value_encoder_stats_cache.extend(&value_encoder_stats);
1034                }
1035
1036                AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
1037                    Self::process_blinded_node(
1038                        self.worker_id,
1039                        &provider,
1040                        path,
1041                        result_sender,
1042                        &mut account_nodes_processed,
1043                    );
1044                }
1045            }
1046
1047            // Mark worker as available again.
1048            self.available_workers.fetch_add(1, Ordering::Relaxed);
1049
1050            idle_start = Instant::now();
1051        }
1052
1053        trace!(
1054            target: "trie::proof_task",
1055            worker_id=self.worker_id,
1056            account_proofs_processed,
1057            account_nodes_processed,
1058            total_idle_time_us = total_idle_time.as_micros(),
1059            "Account worker shutting down"
1060        );
1061
1062        #[cfg(feature = "metrics")]
1063        {
1064            self.metrics.record_account_nodes(account_nodes_processed as usize);
1065            self.metrics.record_account_worker_idle_time(total_idle_time);
1066            self.cursor_metrics.record(&mut cursor_metrics_cache);
1067            self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
1068        }
1069
1070        Ok(())
1071    }
1072
1073    fn compute_v2_account_multiproof<'a, Provider>(
1074        &self,
1075        v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1076        v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1077        targets: MultiProofTargetsV2,
1078    ) -> Result<(DecodedMultiProofV2, ValueEncoderStats), ParallelStateRootError>
1079    where
1080        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1081    {
1082        let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
1083
1084        let span = debug_span!(
1085            target: "trie::proof_task",
1086            "Account V2 multiproof calculation",
1087            account_targets = account_targets.len(),
1088            storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
1089        );
1090        let _span_guard = span.enter();
1091
1092        trace!(target: "trie::proof_task", "Processing V2 account multiproof");
1093
1094        let storage_proof_receivers =
1095            dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
1096
1097        let mut value_encoder = AsyncAccountValueEncoder::new(
1098            storage_proof_receivers,
1099            self.cached_storage_roots.clone(),
1100            v2_storage_calculator,
1101        );
1102
1103        let account_proofs =
1104            v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
1105
1106        let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
1107
1108        let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
1109
1110        Ok((proof, value_encoder_stats))
1111    }
1112
1113    /// Processes an account multiproof request.
1114    ///
1115    /// Returns stats from the value encoder used during proof computation.
1116    fn process_account_multiproof<'a, Provider>(
1117        &self,
1118        v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1119        v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1120        input: AccountMultiproofInput,
1121        account_proofs_processed: &mut u64,
1122        cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
1123    ) -> ValueEncoderStats
1124    where
1125        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1126    {
1127        let proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
1128        let proof_start = Instant::now();
1129
1130        let AccountMultiproofInput { targets, proof_result_sender } = input;
1131        let (result, value_encoder_stats) = match self.compute_v2_account_multiproof::<Provider>(
1132            v2_account_calculator,
1133            v2_storage_calculator,
1134            targets,
1135        ) {
1136            Ok((proof, stats)) => (Ok(proof), stats),
1137            Err(e) => (Err(e), ValueEncoderStats::default()),
1138        };
1139
1140        let ProofResultContext { sender: result_tx, state, start_time: start } =
1141            proof_result_sender;
1142
1143        let proof_elapsed = proof_start.elapsed();
1144        let total_elapsed = start.elapsed();
1145        *account_proofs_processed += 1;
1146
1147        // Send result to MultiProofTask
1148        if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
1149            trace!(
1150                target: "trie::proof_task",
1151                worker_id=self.worker_id,
1152                account_proofs_processed,
1153                "Account multiproof receiver dropped, discarding result"
1154            );
1155        }
1156
1157        proof_cursor_metrics.record_spans();
1158
1159        trace!(
1160            target: "trie::proof_task",
1161            proof_time_us = proof_elapsed.as_micros(),
1162            total_elapsed_us = total_elapsed.as_micros(),
1163            total_processed = account_proofs_processed,
1164            account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
1165            account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
1166            storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
1167            storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
1168            account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
1169            account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
1170            storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
1171            storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
1172            "Account multiproof completed"
1173        );
1174
1175        #[cfg(feature = "metrics")]
1176        // Accumulate per-proof metrics into the worker's cache
1177        cursor_metrics_cache.extend(&proof_cursor_metrics);
1178
1179        value_encoder_stats
1180    }
1181
1182    /// Processes a blinded account node lookup request.
1183    fn process_blinded_node<Provider>(
1184        worker_id: usize,
1185        provider: &Provider,
1186        path: Nibbles,
1187        result_sender: Sender<TrieNodeProviderResult>,
1188        account_nodes_processed: &mut u64,
1189    ) where
1190        Provider: TrieCursorFactory + HashedCursorFactory,
1191    {
1192        let span = debug_span!(
1193            target: "trie::proof_task",
1194            "Blinded account node calculation",
1195            ?path,
1196        );
1197        let _span_guard = span.enter();
1198
1199        trace!(
1200            target: "trie::proof_task",
1201            "Processing blinded account node"
1202        );
1203
1204        let start = Instant::now();
1205        let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
1206        let result = account_node_provider.trie_node(&path);
1207        let elapsed = start.elapsed();
1208
1209        *account_nodes_processed += 1;
1210
1211        if result_sender.send(result).is_err() {
1212            trace!(
1213                target: "trie::proof_task",
1214                worker_id,
1215                ?path,
1216                account_nodes_processed,
1217                "Blinded account node receiver dropped, discarding result"
1218            );
1219        }
1220
1221        trace!(
1222            target: "trie::proof_task",
1223            node_time_us = elapsed.as_micros(),
1224            total_processed = account_nodes_processed,
1225            "Blinded account node completed"
1226        );
1227    }
1228}
1229
1230/// Queues V2 storage proofs for all accounts in the targets and returns receivers.
1231///
1232/// This function queues all storage proof tasks to the worker pool but returns immediately
1233/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1234/// computation. This enables interleaved parallelism for better performance.
1235///
1236/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1237fn dispatch_v2_storage_proofs(
1238    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1239    account_targets: &[ProofV2Target],
1240    mut storage_targets: B256Map<Vec<ProofV2Target>>,
1241) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1242    let mut storage_proof_receivers =
1243        B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1244
1245    // Collect hashed addresses from account targets that need their storage roots computed
1246    let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1247
1248    // For storage targets with associated account proofs, ensure the first target has
1249    // min_len(0) so the root node is returned for storage root computation
1250    for (hashed_address, targets) in &mut storage_targets {
1251        if account_target_addresses.contains(hashed_address) &&
1252            let Some(first) = targets.first_mut()
1253        {
1254            *first = first.with_min_len(0);
1255        }
1256    }
1257
1258    // Sort storage targets by address for optimal dispatch order.
1259    // Since trie walk processes accounts in lexicographical order, dispatching in the same order
1260    // reduces head-of-line blocking when consuming results.
1261    let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1262    sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1263
1264    // Dispatch all proofs for targeted storage slots
1265    for (hashed_address, targets) in sorted_storage_targets {
1266        // Create channel for receiving StorageProofResultMessage
1267        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1268        let input = StorageProofInput::new(hashed_address, targets);
1269
1270        storage_work_tx
1271            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1272            .map_err(|_| {
1273                ParallelStateRootError::Other(format!(
1274                    "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1275                ))
1276            })?;
1277
1278        storage_proof_receivers.insert(hashed_address, result_rx);
1279    }
1280
1281    Ok(storage_proof_receivers)
1282}
1283
1284/// Input parameters for storage proof computation.
1285#[derive(Debug)]
1286pub struct StorageProofInput {
1287    /// The hashed address for which the proof is calculated.
1288    pub hashed_address: B256,
1289    /// The set of proof targets
1290    pub targets: Vec<ProofV2Target>,
1291}
1292
1293impl StorageProofInput {
1294    /// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
1295    pub const fn new(hashed_address: B256, targets: Vec<ProofV2Target>) -> Self {
1296        Self { hashed_address, targets }
1297    }
1298}
1299
1300/// Input parameters for account multiproof computation.
1301#[derive(Debug)]
1302pub struct AccountMultiproofInput {
1303    /// The targets for which to compute the multiproof.
1304    pub targets: MultiProofTargetsV2,
1305    /// Context for sending the proof result.
1306    pub proof_result_sender: ProofResultContext,
1307}
1308
1309impl AccountMultiproofInput {
1310    /// Returns the [`ProofResultContext`] for this input, consuming the input.
1311    fn into_proof_result_sender(self) -> ProofResultContext {
1312        self.proof_result_sender
1313    }
1314}
1315
1316/// Internal message for account workers.
1317#[derive(Debug)]
1318enum AccountWorkerJob {
1319    /// Account multiproof computation request
1320    AccountMultiproof {
1321        /// Account multiproof input parameters
1322        input: Box<AccountMultiproofInput>,
1323    },
1324    /// Blinded account node retrieval request
1325    BlindedAccountNode {
1326        /// Path to the account node
1327        path: Nibbles,
1328        /// Channel to send result back to original caller
1329        result_sender: Sender<TrieNodeProviderResult>,
1330    },
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335    use super::*;
1336    use reth_provider::test_utils::create_test_provider_factory;
1337
1338    fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1339        ProofTaskCtx::new(factory)
1340    }
1341
1342    /// Ensures `ProofWorkerHandle::new` spawns workers correctly.
1343    #[test]
1344    fn spawn_proof_workers_creates_handle() {
1345        let provider_factory = create_test_provider_factory();
1346        let changeset_cache = reth_trie_db::ChangesetCache::new();
1347        let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1348            provider_factory,
1349            changeset_cache,
1350        );
1351        let ctx = test_ctx(factory);
1352
1353        let runtime = reth_tasks::Runtime::test();
1354        let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
1355
1356        // Verify handle can be cloned
1357        let _cloned_handle = proof_handle.clone();
1358
1359        // Workers shut down automatically when handle is dropped
1360        drop(proof_handle);
1361    }
1362}