Skip to main content

reth_trie_parallel/
proof_task.rs

1//! Parallel proof computation using worker pools with dedicated database transactions.
2//!
3//!
4//! # Architecture
5//!
6//! - **Worker Pools**: Pre-spawned workers with dedicated database transactions
7//!   - Storage pool: Handles storage proofs
8//!   - Account pool: Handles account multiproofs
9//! - **Direct Channel Access**: `ProofWorkerHandle` provides type-safe queue methods with direct
10//!   access to worker channels, eliminating routing overhead
11//! - **Automatic Shutdown**: Workers terminate gracefully when all handles are dropped
12//!
13//! # Message Flow
14//!
15//! 1. The `SparseTrieCacheTask` prepares a storage or account job and hands it to
16//!    `ProofWorkerHandle`. The job carries a `ProofResultContext` so the worker knows how to send
17//!    the result back.
18//! 2. A worker receives the job, runs the proof, and sends a `ProofResultMessage` through the
19//!    provided `ProofResultSender`.
20//! 3. The `SparseTrieCacheTask` receives the message and proceeds with its state-root logic.
21//!
22//! Each job gets its own direct channel so results go straight back to the `SparseTrieCacheTask`.
23//! That keeps ordering decisions in one place and lets workers run independently.
24//!
25//! ```text
26//! SparseTrieCacheTask -> ProofWorkerHandle -> Storage/Account Worker
27//!        ^                       |
28//!        |                       v
29//! ProofResultMessage <-- ProofResultSender
30//! ```
31
32use crate::{
33    root::ParallelStateRootError,
34    value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
35};
36use alloy_primitives::{
37    map::{B256Map, B256Set},
38    B256, U256,
39};
40use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
41use reth_execution_errors::StateProofError;
42use reth_primitives_traits::{dashmap::DashMap, FastInstant as Instant};
43use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
44use reth_storage_errors::db::DatabaseError;
45use reth_tasks::Runtime;
46use reth_trie::{
47    hashed_cursor::{HashedCursorFactory, HashedStorageCursor, InstrumentedHashedCursor},
48    proof_v2,
49    trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieStorageCursor},
50    DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, ProofTrieNodeV2, ProofV2Target,
51};
52use std::{
53    cell::RefCell,
54    rc::Rc,
55    sync::{
56        atomic::{AtomicBool, AtomicUsize, Ordering},
57        Arc,
58    },
59    time::Duration,
60};
61use tracing::{debug, debug_span, error, instrument, trace};
62
63#[cfg(feature = "metrics")]
64use crate::proof_task_metrics::{
65    ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
66};
67
68/// Type alias for the V2 account proof calculator with instrumented cursors.
69type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
70    InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::AccountTrieCursor<'a>>,
71    InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::AccountCursor<'a>>,
72    AsyncAccountValueEncoder<
73        InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
74        InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
75    >,
76>;
77
78/// Type alias for the V2 storage proof calculator with instrumented cursors.
79type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
80    InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
81    InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
82>;
83
84/// Tracks worker availability counts.
85///
86/// It uses cacheline-aligned flags to avoid core-to-core chatter.
87#[derive(Debug)]
88struct AvailabilitySheet {
89    /// One flag per worker, each on its own cacheline. Workers store `true` when idle,
90    /// `false` when busy. Only the owning worker writes; the dispatcher only reads.
91    flags: Vec<crossbeam_utils::CachePadded<AtomicBool>>,
92}
93
94impl AvailabilitySheet {
95    /// Creates a new sheet with `count` workers, all initially marked as busy.
96    fn new(count: usize) -> Self {
97        let flags =
98            (0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect();
99        Self { flags }
100    }
101
102    /// Returns `true` if more than one worker is currently idle.
103    ///
104    /// Note, that this is somewhat racy since a flag that was just saying `idle` and we counted it
105    /// as such might turn into `busy` right away.
106    fn has_multiple_idle(&self) -> bool {
107        let mut idle = 0u32;
108        for flag in &self.flags {
109            if flag.load(Ordering::Relaxed) {
110                idle += 1;
111                if idle > 1 {
112                    return true;
113                }
114            }
115        }
116        false
117    }
118
119    /// Marks the given worker as idle.
120    fn mark_idle(&self, worker_id: usize) {
121        self.flags[worker_id].store(true, Ordering::Relaxed);
122    }
123
124    /// Marks the given worker as busy.
125    fn mark_busy(&self, worker_id: usize) {
126        self.flags[worker_id].store(false, Ordering::Relaxed);
127    }
128}
129
130/// A handle that provides type-safe access to proof worker pools.
131///
132/// The handle stores direct senders to both storage and account worker pools,
133/// eliminating the need for a routing thread. All handles share reference-counted
134/// channels, and workers shut down gracefully when all handles are dropped.
135#[derive(Debug, Clone)]
136pub struct ProofWorkerHandle {
137    /// Direct sender to storage worker pool
138    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
139    /// Direct sender to account worker pool
140    account_work_tx: CrossbeamSender<AccountWorkerJob>,
141    /// Per-worker availability flags for storage workers. Used to determine whether to chunk
142    /// multiproofs.
143    storage_availability: Arc<AvailabilitySheet>,
144    /// Per-worker availability flags for account workers. Used to determine whether to chunk
145    /// multiproofs.
146    account_availability: Arc<AvailabilitySheet>,
147    /// Total number of storage workers spawned
148    storage_worker_count: usize,
149    /// Total number of account workers spawned
150    account_worker_count: usize,
151}
152
153impl ProofWorkerHandle {
154    /// Spawns storage and account worker pools with dedicated database transactions.
155    ///
156    /// Returns a handle for submitting proof tasks to the worker pools.
157    /// Workers run until the last handle is dropped.
158    ///
159    /// # Parameters
160    /// - `runtime`: The centralized runtime used to spawn blocking worker tasks
161    /// - `task_ctx`: Shared context with database view and prefix sets
162    /// - `halve_workers`: Whether to halve the worker pool size (for small blocks)
163    #[instrument(
164        name = "ProofWorkerHandle::new",
165        level = "debug",
166        target = "trie::proof_task",
167        skip_all
168    )]
169    pub fn new<Factory>(
170        runtime: &Runtime,
171        task_ctx: ProofTaskCtx<Factory>,
172        halve_workers: bool,
173    ) -> Self
174    where
175        Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
176            + Clone
177            + Send
178            + Sync
179            + 'static,
180    {
181        let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
182        let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
183
184        let cached_storage_roots = Arc::<DashMap<_, _>>::default();
185
186        let divisor = if halve_workers { 2 } else { 1 };
187        let storage_worker_count =
188            runtime.proof_storage_worker_pool().current_num_threads() / divisor;
189        let account_worker_count =
190            runtime.proof_account_worker_pool().current_num_threads() / divisor;
191
192        let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count));
193        let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count));
194
195        debug!(
196            target: "trie::proof_task",
197            storage_worker_count,
198            account_worker_count,
199            halve_workers,
200            "Spawning proof worker pools"
201        );
202
203        // broadcast blocks until all workers exit (channel close), so run on
204        // tokio's blocking pool.
205        let storage_rt = runtime.clone();
206        let storage_task_ctx = task_ctx.clone();
207        let storage_avail = storage_availability.clone();
208        let storage_roots = cached_storage_roots.clone();
209        let storage_parent_span = tracing::Span::current();
210        runtime.spawn_blocking_named("storage-workers", move || {
211            let worker_id = AtomicUsize::new(0);
212            storage_rt.proof_storage_worker_pool().broadcast(storage_worker_count, |_| {
213                let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
214                let span = debug_span!(target: "trie::proof_task", parent: storage_parent_span.clone(), "storage_worker", ?worker_id);
215                let _guard = span.enter();
216
217                #[cfg(feature = "metrics")]
218                let metrics = ProofTaskTrieMetrics::default();
219                #[cfg(feature = "metrics")]
220                let cursor_metrics = ProofTaskCursorMetrics::new();
221
222                let worker = StorageProofWorker::new(
223                    storage_task_ctx.clone(),
224                    storage_work_rx.clone(),
225                    worker_id,
226                    storage_avail.clone(),
227                    storage_roots.clone(),
228                    #[cfg(feature = "metrics")]
229                    metrics,
230                    #[cfg(feature = "metrics")]
231                    cursor_metrics,
232                );
233                if let Err(error) = worker.run() {
234                    error!(
235                        target: "trie::proof_task",
236                        worker_id,
237                        ?error,
238                        "Storage worker failed"
239                    );
240                }
241            });
242        });
243
244        let account_rt = runtime.clone();
245        let account_tx = storage_work_tx.clone();
246        let account_avail = account_availability.clone();
247        let account_parent_span = tracing::Span::current();
248        runtime.spawn_blocking_named("account-workers", move || {
249            let worker_id = AtomicUsize::new(0);
250            account_rt.proof_account_worker_pool().broadcast(account_worker_count, |_| {
251                let worker_id = worker_id.fetch_add(1, Ordering::Relaxed);
252                let span = debug_span!(target: "trie::proof_task", parent: account_parent_span.clone(), "account_worker", ?worker_id);
253                let _guard = span.enter();
254
255                #[cfg(feature = "metrics")]
256                let metrics = ProofTaskTrieMetrics::default();
257                #[cfg(feature = "metrics")]
258                let cursor_metrics = ProofTaskCursorMetrics::new();
259
260                let worker = AccountProofWorker::new(
261                    task_ctx.clone(),
262                    account_work_rx.clone(),
263                    worker_id,
264                    account_tx.clone(),
265                    account_avail.clone(),
266                    cached_storage_roots.clone(),
267                    #[cfg(feature = "metrics")]
268                    metrics,
269                    #[cfg(feature = "metrics")]
270                    cursor_metrics,
271                );
272                if let Err(error) = worker.run() {
273                    error!(
274                        target: "trie::proof_task",
275                        worker_id,
276                        ?error,
277                        "Account worker failed"
278                    );
279                }
280            });
281        });
282
283        Self {
284            storage_work_tx,
285            account_work_tx,
286            storage_availability,
287            account_availability,
288            storage_worker_count,
289            account_worker_count,
290        }
291    }
292
293    /// Returns `true` if more than one storage worker is currently idle.
294    pub fn has_multiple_idle_storage_workers(&self) -> bool {
295        self.storage_availability.has_multiple_idle()
296    }
297
298    /// Returns `true` if more than one account worker is currently idle.
299    pub fn has_multiple_idle_account_workers(&self) -> bool {
300        self.account_availability.has_multiple_idle()
301    }
302
303    /// Returns the number of pending storage tasks in the queue.
304    pub fn pending_storage_tasks(&self) -> usize {
305        self.storage_work_tx.len()
306    }
307
308    /// Returns the number of pending account tasks in the queue.
309    pub fn pending_account_tasks(&self) -> usize {
310        self.account_work_tx.len()
311    }
312
313    /// Returns the total number of storage workers in the pool.
314    pub const fn total_storage_workers(&self) -> usize {
315        self.storage_worker_count
316    }
317
318    /// Returns the total number of account workers in the pool.
319    pub const fn total_account_workers(&self) -> usize {
320        self.account_worker_count
321    }
322
323    /// Dispatch a storage proof computation to storage worker pool
324    ///
325    /// The result will be sent via the `proof_result_sender` channel.
326    pub fn dispatch_storage_proof(
327        &self,
328        input: StorageProofInput,
329        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
330    ) -> Result<(), ProviderError> {
331        let hashed_address = input.hashed_address;
332        self.storage_work_tx
333            .send(StorageWorkerJob::StorageProof { input, proof_result_sender })
334            .map_err(|err| {
335                let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0;
336                let _ = proof_result_sender.send(StorageProofResultMessage {
337                    hashed_address,
338                    result: Err(
339                        DatabaseError::Other("storage workers unavailable".to_string()).into()
340                    ),
341                });
342
343                ProviderError::other(std::io::Error::other("storage workers unavailable"))
344            })
345    }
346
347    /// Dispatch an account multiproof computation
348    ///
349    /// The result will be sent via the `result_sender` channel included in the input.
350    pub fn dispatch_account_multiproof(
351        &self,
352        input: AccountMultiproofInput,
353    ) -> Result<(), ProviderError> {
354        self.account_work_tx
355            .send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
356            .map_err(|err| {
357                let error =
358                    ProviderError::other(std::io::Error::other("account workers unavailable"));
359
360                let AccountWorkerJob::AccountMultiproof { input } = err.0;
361                let ProofResultContext { sender: result_tx, state, start_time: start } =
362                    input.into_proof_result_sender();
363
364                let _ = result_tx.send(ProofResultMessage {
365                    result: Err(ParallelStateRootError::Provider(error.clone())),
366                    elapsed: start.elapsed(),
367                    state,
368                });
369
370                error
371            })
372    }
373}
374
375/// Data used for initializing cursor factories that is shared across all proof worker instances.
376#[derive(Clone, Debug)]
377pub struct ProofTaskCtx<Factory> {
378    /// The factory for creating state providers.
379    factory: Factory,
380    /// Maximum random jitter to apply before each proof computation (trie-debug only).
381    #[cfg(feature = "trie-debug")]
382    proof_jitter: Option<Duration>,
383}
384
385impl<Factory> ProofTaskCtx<Factory> {
386    /// Creates a new [`ProofTaskCtx`] with the given factory.
387    pub const fn new(factory: Factory) -> Self {
388        Self {
389            factory,
390            #[cfg(feature = "trie-debug")]
391            proof_jitter: None,
392        }
393    }
394
395    /// Sets the maximum proof jitter duration (trie-debug only).
396    #[cfg(feature = "trie-debug")]
397    pub const fn with_proof_jitter(mut self, jitter: Option<Duration>) -> Self {
398        self.proof_jitter = jitter;
399        self
400    }
401}
402
403/// This contains all information shared between account proof worker instances.
404#[derive(Debug)]
405pub struct ProofTaskTx<Provider> {
406    /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
407    provider: Provider,
408
409    /// Identifier for the worker within the worker pool, used only for tracing.
410    id: usize,
411}
412
413impl<Provider> ProofTaskTx<Provider> {
414    /// Initializes a [`ProofTaskTx`] with the given provider and ID.
415    const fn new(provider: Provider, id: usize) -> Self {
416        Self { provider, id }
417    }
418}
419
420impl<Provider> ProofTaskTx<Provider>
421where
422    Provider: TrieCursorFactory + HashedCursorFactory,
423{
424    fn compute_v2_storage_proof<TC, HC>(
425        &self,
426        input: StorageProofInput,
427        calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
428    ) -> Result<StorageProofResult, StateProofError>
429    where
430        TC: TrieStorageCursor,
431        HC: HashedStorageCursor<Value = U256>,
432    {
433        let StorageProofInput { hashed_address, mut targets } = input;
434
435        let span = debug_span!(
436            target: "trie::proof_task",
437            "V2 Storage proof calculation",
438            n = %targets.len(),
439        );
440        let _span_guard = span.enter();
441
442        let proof_start = Instant::now();
443
444        // If targets is empty it means the caller only wants the root node.
445        let proof = if targets.is_empty() {
446            let root_node = calculator.storage_root_node(hashed_address)?;
447            vec![root_node]
448        } else {
449            calculator.storage_proof(hashed_address, &mut targets)?
450        };
451
452        let root = calculator.compute_root_hash(&proof)?;
453
454        trace!(
455            target: "trie::proof_task",
456            hashed_address = ?hashed_address,
457            proof_time_us = proof_start.elapsed().as_micros(),
458            ?root,
459            worker_id = self.id,
460            "Completed V2 storage proof calculation"
461        );
462
463        Ok(StorageProofResult { proof, root })
464    }
465}
466
467/// Channel used by worker threads to deliver `ProofResultMessage` items back to
468/// `SparseTrieCacheTask`.
469///
470/// Workers use this sender to deliver proof results directly to `SparseTrieCacheTask`.
471pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
472
473/// Message containing a completed proof result with metadata for direct delivery to
474/// `SparseTrieCacheTask`.
475///
476/// This type enables workers to send proof results directly to the `SparseTrieCacheTask` event
477/// loop.
478#[derive(Debug)]
479pub struct ProofResultMessage {
480    /// The proof calculation result
481    pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
482    /// Time taken for the entire proof calculation (from dispatch to completion)
483    pub elapsed: Duration,
484    /// Original state update that triggered this proof
485    pub state: HashedPostState,
486}
487
488/// Context for sending proof calculation results back to `SparseTrieCacheTask`.
489///
490/// This struct contains all context needed to send and track proof calculation results.
491/// Workers use this to deliver completed proofs back to the main event loop.
492#[derive(Debug, Clone)]
493pub struct ProofResultContext {
494    /// Channel sender for result delivery
495    pub sender: ProofResultSender,
496    /// Original state update that triggered this proof
497    pub state: HashedPostState,
498    /// Calculation start time for measuring elapsed duration
499    pub start_time: Instant,
500}
501
502impl ProofResultContext {
503    /// Creates a new proof result context.
504    pub const fn new(
505        sender: ProofResultSender,
506        state: HashedPostState,
507        start_time: Instant,
508    ) -> Self {
509        Self { sender, state, start_time }
510    }
511}
512
513/// The results of a storage proof calculation.
514#[derive(Debug)]
515pub(crate) struct StorageProofResult {
516    /// The calculated V2 proof nodes
517    pub proof: Vec<ProofTrieNodeV2>,
518    /// The storage root calculated by the V2 proof
519    pub root: Option<B256>,
520}
521
522impl StorageProofResult {
523    /// Returns the calculated root of the trie, if one can be calculated from the proof.
524    const fn root(&self) -> Option<B256> {
525        self.root
526    }
527}
528
529/// Message containing a completed storage proof result with metadata.
530#[derive(Debug)]
531pub struct StorageProofResultMessage {
532    /// The hashed address this storage proof belongs to
533    #[allow(dead_code)]
534    pub(crate) hashed_address: B256,
535    /// The storage proof calculation result
536    pub(crate) result: Result<StorageProofResult, StateProofError>,
537}
538
539/// Internal message for storage workers.
540#[derive(Debug)]
541pub(crate) enum StorageWorkerJob {
542    /// Storage proof computation request
543    StorageProof {
544        /// Storage proof input parameters
545        input: StorageProofInput,
546        /// Context for sending the proof result.
547        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
548    },
549}
550
551/// Worker for storage trie operations.
552///
553/// Each worker maintains a dedicated database transaction and processes
554/// storage proof requests.
555struct StorageProofWorker<Factory> {
556    /// Shared task context with database factory and prefix sets
557    task_ctx: ProofTaskCtx<Factory>,
558    /// Channel for receiving work
559    work_rx: CrossbeamReceiver<StorageWorkerJob>,
560    /// Unique identifier for this worker (used for tracing)
561    worker_id: usize,
562    /// Per-worker availability flags
563    availability: Arc<AvailabilitySheet>,
564    /// Cached storage roots
565    cached_storage_roots: Arc<DashMap<B256, B256>>,
566    /// Metrics collector for this worker
567    #[cfg(feature = "metrics")]
568    metrics: ProofTaskTrieMetrics,
569    /// Cursor metrics for this worker
570    #[cfg(feature = "metrics")]
571    cursor_metrics: ProofTaskCursorMetrics,
572}
573
574impl<Factory> StorageProofWorker<Factory>
575where
576    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
577{
578    /// Creates a new storage proof worker.
579    const fn new(
580        task_ctx: ProofTaskCtx<Factory>,
581        work_rx: CrossbeamReceiver<StorageWorkerJob>,
582        worker_id: usize,
583        availability: Arc<AvailabilitySheet>,
584        cached_storage_roots: Arc<DashMap<B256, B256>>,
585        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
586        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
587    ) -> Self {
588        Self {
589            task_ctx,
590            work_rx,
591            worker_id,
592            availability,
593            cached_storage_roots,
594            #[cfg(feature = "metrics")]
595            metrics,
596            #[cfg(feature = "metrics")]
597            cursor_metrics,
598        }
599    }
600
601    /// Runs the worker loop, processing jobs until the channel closes.
602    ///
603    /// # Lifecycle
604    ///
605    /// 1. Initializes database provider and transaction
606    /// 2. Advertises availability
607    /// 3. Processes jobs in a loop:
608    ///    - Receives job from channel
609    ///    - Marks worker as busy
610    ///    - Processes the job
611    ///    - Marks worker as available
612    /// 4. Shuts down when channel closes
613    ///
614    /// # Panic Safety
615    ///
616    /// If this function panics, the worker thread terminates but other workers
617    /// continue operating and the system degrades gracefully.
618    fn run(mut self) -> ProviderResult<()> {
619        // Create provider from factory
620        let provider = self.task_ctx.factory.database_provider_ro()?;
621        let proof_tx = ProofTaskTx::new(provider, self.worker_id);
622
623        trace!(
624            target: "trie::proof_task",
625            worker_id = self.worker_id,
626            "Storage worker started"
627        );
628
629        let mut storage_proofs_processed = 0u64;
630        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
631        let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
632        let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
633        let instrumented_trie_cursor =
634            InstrumentedTrieCursor::new(trie_cursor, &mut cursor_metrics_cache.storage_trie_cursor);
635        let instrumented_hashed_cursor = InstrumentedHashedCursor::new(
636            hashed_cursor,
637            &mut cursor_metrics_cache.storage_hashed_cursor,
638        );
639        let mut v2_calculator = proof_v2::StorageProofCalculator::new_storage(
640            instrumented_trie_cursor,
641            instrumented_hashed_cursor,
642        );
643
644        // Initially mark this worker as available.
645        self.availability.mark_idle(self.worker_id);
646
647        let mut total_idle_time = Duration::ZERO;
648        let mut idle_start = Instant::now();
649
650        while let Ok(job) = self.work_rx.recv() {
651            total_idle_time += idle_start.elapsed();
652
653            // Mark worker as busy.
654            self.availability.mark_busy(self.worker_id);
655
656            #[cfg(feature = "trie-debug")]
657            if let Some(max_jitter) = self.task_ctx.proof_jitter {
658                let jitter =
659                    Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
660                trace!(
661                    target: "trie::proof_task",
662                    worker_id = self.worker_id,
663                    jitter_us = jitter.as_micros(),
664                    "Storage worker applying proof jitter"
665                );
666                std::thread::sleep(jitter);
667            }
668
669            match job {
670                StorageWorkerJob::StorageProof { input, proof_result_sender } => {
671                    self.process_storage_proof(
672                        &proof_tx,
673                        &mut v2_calculator,
674                        input,
675                        proof_result_sender,
676                        &mut storage_proofs_processed,
677                    );
678                }
679            }
680
681            // Mark worker as available again.
682            self.availability.mark_idle(self.worker_id);
683
684            idle_start = Instant::now();
685        }
686
687        // Drop calculator to release mutable borrows on cursor_metrics_cache.
688        drop(v2_calculator);
689
690        trace!(
691            target: "trie::proof_task",
692            worker_id = self.worker_id,
693            storage_proofs_processed,
694            total_idle_time_us = total_idle_time.as_micros(),
695            "Storage worker shutting down"
696        );
697
698        #[cfg(feature = "metrics")]
699        {
700            self.metrics.record_storage_worker_idle_time(total_idle_time);
701            self.cursor_metrics.record(&mut cursor_metrics_cache);
702        }
703
704        Ok(())
705    }
706
707    /// Processes a storage proof request.
708    fn process_storage_proof<Provider, TC, HC>(
709        &self,
710        proof_tx: &ProofTaskTx<Provider>,
711        v2_calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
712        input: StorageProofInput,
713        proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
714        storage_proofs_processed: &mut u64,
715    ) where
716        Provider: TrieCursorFactory + HashedCursorFactory,
717        TC: TrieStorageCursor,
718        HC: HashedStorageCursor<Value = U256>,
719    {
720        let hashed_address = input.hashed_address;
721        let proof_start = Instant::now();
722
723        trace!(
724            target: "trie::proof_task",
725            worker_id = self.worker_id,
726            hashed_address = ?hashed_address,
727            targets_len = input.targets.len(),
728            "Processing V2 storage proof"
729        );
730
731        let result = proof_tx.compute_v2_storage_proof(input, v2_calculator);
732
733        let proof_elapsed = proof_start.elapsed();
734        *storage_proofs_processed += 1;
735
736        let root = result.as_ref().ok().and_then(|result| result.root());
737
738        if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
739            trace!(
740                target: "trie::proof_task",
741                worker_id = self.worker_id,
742                hashed_address = ?hashed_address,
743                storage_proofs_processed,
744                "Proof result receiver dropped, discarding result"
745            );
746        }
747
748        if let Some(root) = root {
749            self.cached_storage_roots.insert(hashed_address, root);
750        }
751
752        trace!(
753            target: "trie::proof_task",
754            worker_id = self.worker_id,
755            hashed_address = ?hashed_address,
756            proof_time_us = proof_elapsed.as_micros(),
757            total_processed = storage_proofs_processed,
758            ?root,
759            "Storage proof completed"
760        );
761    }
762}
763
764/// Worker for account trie operations.
765///
766/// Each worker maintains a dedicated database transaction and processes
767/// account multiproof requests.
768struct AccountProofWorker<Factory> {
769    /// Shared task context with database factory and prefix sets
770    task_ctx: ProofTaskCtx<Factory>,
771    /// Channel for receiving work
772    work_rx: CrossbeamReceiver<AccountWorkerJob>,
773    /// Unique identifier for this worker (used for tracing)
774    worker_id: usize,
775    /// Channel for dispatching storage proof work (for pre-dispatched target proofs)
776    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
777    /// Per-worker availability flags
778    availability: Arc<AvailabilitySheet>,
779    /// Cached storage roots
780    cached_storage_roots: Arc<DashMap<B256, B256>>,
781    /// Metrics collector for this worker
782    #[cfg(feature = "metrics")]
783    metrics: ProofTaskTrieMetrics,
784    /// Cursor metrics for this worker
785    #[cfg(feature = "metrics")]
786    cursor_metrics: ProofTaskCursorMetrics,
787}
788
789impl<Factory> AccountProofWorker<Factory>
790where
791    Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>,
792{
793    /// Creates a new account proof worker.
794    #[expect(clippy::too_many_arguments)]
795    const fn new(
796        task_ctx: ProofTaskCtx<Factory>,
797        work_rx: CrossbeamReceiver<AccountWorkerJob>,
798        worker_id: usize,
799        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
800        availability: Arc<AvailabilitySheet>,
801        cached_storage_roots: Arc<DashMap<B256, B256>>,
802        #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
803        #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
804    ) -> Self {
805        Self {
806            task_ctx,
807            work_rx,
808            worker_id,
809            storage_work_tx,
810            availability,
811            cached_storage_roots,
812            #[cfg(feature = "metrics")]
813            metrics,
814            #[cfg(feature = "metrics")]
815            cursor_metrics,
816        }
817    }
818
819    /// Runs the worker loop, processing jobs until the channel closes.
820    ///
821    /// # Lifecycle
822    ///
823    /// 1. Initializes database provider and transaction
824    /// 2. Advertises availability
825    /// 3. Processes jobs in a loop:
826    ///    - Receives job from channel
827    ///    - Marks worker as busy
828    ///    - Processes the job
829    ///    - Marks worker as available
830    /// 4. Shuts down when channel closes
831    ///
832    /// # Panic Safety
833    ///
834    /// If this function panics, the worker thread terminates but other workers
835    /// continue operating and the system degrades gracefully.
836    fn run(mut self) -> ProviderResult<()> {
837        let provider = self.task_ctx.factory.database_provider_ro()?;
838
839        trace!(
840            target: "trie::proof_task",
841            worker_id=self.worker_id,
842            "Account worker started"
843        );
844
845        let mut account_proofs_processed = 0u64;
846        let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
847
848        // Create both account and storage calculators for V2 proofs.
849        // The storage calculator is wrapped in Rc<RefCell<...>> for sharing with value encoders.
850        let account_trie_cursor = provider.account_trie_cursor()?;
851        let account_hashed_cursor = provider.hashed_account_cursor()?;
852
853        let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
854        let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
855
856        let instrumented_account_trie_cursor = InstrumentedTrieCursor::new(
857            account_trie_cursor,
858            &mut cursor_metrics_cache.account_trie_cursor,
859        );
860        let instrumented_account_hashed_cursor = InstrumentedHashedCursor::new(
861            account_hashed_cursor,
862            &mut cursor_metrics_cache.account_hashed_cursor,
863        );
864        let instrumented_storage_trie_cursor = InstrumentedTrieCursor::new(
865            storage_trie_cursor,
866            &mut cursor_metrics_cache.storage_trie_cursor,
867        );
868        let instrumented_storage_hashed_cursor = InstrumentedHashedCursor::new(
869            storage_hashed_cursor,
870            &mut cursor_metrics_cache.storage_hashed_cursor,
871        );
872
873        let mut v2_account_calculator =
874            proof_v2::ProofCalculator::<
875                _,
876                _,
877                AsyncAccountValueEncoder<
878                    InstrumentedTrieCursor<
879                        '_,
880                        <Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
881                    >,
882                    InstrumentedHashedCursor<
883                        '_,
884                        <Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
885                    >,
886                >,
887            >::new(instrumented_account_trie_cursor, instrumented_account_hashed_cursor);
888        let v2_storage_calculator =
889            Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
890                instrumented_storage_trie_cursor,
891                instrumented_storage_hashed_cursor,
892            )));
893
894        // Count this worker as available only after successful initialization.
895        self.availability.mark_idle(self.worker_id);
896
897        let mut total_idle_time = Duration::ZERO;
898        let mut idle_start = Instant::now();
899        let mut value_encoder_stats_cache = ValueEncoderStats::default();
900
901        while let Ok(job) = self.work_rx.recv() {
902            total_idle_time += idle_start.elapsed();
903
904            // Mark worker as busy.
905            self.availability.mark_busy(self.worker_id);
906
907            #[cfg(feature = "trie-debug")]
908            if let Some(max_jitter) = self.task_ctx.proof_jitter {
909                let jitter =
910                    Duration::from_nanos(rand::random_range(0..=max_jitter.as_nanos() as u64));
911                trace!(
912                    target: "trie::proof_task",
913                    worker_id = self.worker_id,
914                    jitter_us = jitter.as_micros(),
915                    "Account worker applying proof jitter"
916                );
917                std::thread::sleep(jitter);
918            }
919
920            match job {
921                AccountWorkerJob::AccountMultiproof { input } => {
922                    let value_encoder_stats = self.process_account_multiproof::<Factory::Provider>(
923                        &mut v2_account_calculator,
924                        v2_storage_calculator.clone(),
925                        *input,
926                        &mut account_proofs_processed,
927                    );
928                    total_idle_time += value_encoder_stats.storage_wait_time;
929                    value_encoder_stats_cache.extend(&value_encoder_stats);
930                }
931            }
932
933            // Mark worker as available again.
934            self.availability.mark_idle(self.worker_id);
935
936            idle_start = Instant::now();
937        }
938
939        // Drop calculators to release mutable borrows on cursor_metrics_cache.
940        drop(v2_account_calculator);
941        drop(v2_storage_calculator);
942
943        trace!(
944            target: "trie::proof_task",
945            worker_id=self.worker_id,
946            account_proofs_processed,
947            total_idle_time_us = total_idle_time.as_micros(),
948            "Account worker shutting down"
949        );
950
951        #[cfg(feature = "metrics")]
952        {
953            self.metrics.record_account_worker_idle_time(total_idle_time);
954            self.cursor_metrics.record(&mut cursor_metrics_cache);
955            self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
956        }
957
958        Ok(())
959    }
960
961    fn compute_v2_account_multiproof<'a, Provider>(
962        &self,
963        v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
964        v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
965        targets: MultiProofTargetsV2,
966    ) -> Result<(DecodedMultiProofV2, ValueEncoderStats), ParallelStateRootError>
967    where
968        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
969    {
970        let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
971
972        let span = debug_span!(
973            target: "trie::proof_task",
974            "Account V2 multiproof calculation",
975            account_targets = account_targets.len(),
976            storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
977        );
978        let _span_guard = span.enter();
979
980        trace!(target: "trie::proof_task", "Processing V2 account multiproof");
981
982        let storage_proof_receivers =
983            dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
984
985        let mut value_encoder = AsyncAccountValueEncoder::new(
986            storage_proof_receivers,
987            self.cached_storage_roots.clone(),
988            v2_storage_calculator,
989        );
990
991        let account_proofs =
992            v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
993
994        let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
995
996        let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
997
998        Ok((proof, value_encoder_stats))
999    }
1000
1001    /// Processes an account multiproof request.
1002    ///
1003    /// Returns stats from the value encoder used during proof computation.
1004    fn process_account_multiproof<'a, Provider>(
1005        &self,
1006        v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
1007        v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
1008        input: AccountMultiproofInput,
1009        account_proofs_processed: &mut u64,
1010    ) -> ValueEncoderStats
1011    where
1012        Provider: TrieCursorFactory + HashedCursorFactory + 'a,
1013    {
1014        let proof_start = Instant::now();
1015
1016        let AccountMultiproofInput { targets, proof_result_sender } = input;
1017        let (result, value_encoder_stats) = match self.compute_v2_account_multiproof::<Provider>(
1018            v2_account_calculator,
1019            v2_storage_calculator,
1020            targets,
1021        ) {
1022            Ok((proof, stats)) => (Ok(proof), stats),
1023            Err(e) => (Err(e), ValueEncoderStats::default()),
1024        };
1025
1026        let ProofResultContext { sender: result_tx, state, start_time: start } =
1027            proof_result_sender;
1028
1029        let proof_elapsed = proof_start.elapsed();
1030        let total_elapsed = start.elapsed();
1031        *account_proofs_processed += 1;
1032
1033        // Send result to SparseTrieCacheTask
1034        if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
1035            trace!(
1036                target: "trie::proof_task",
1037                worker_id=self.worker_id,
1038                account_proofs_processed,
1039                "Account multiproof receiver dropped, discarding result"
1040            );
1041        }
1042
1043        trace!(
1044            target: "trie::proof_task",
1045            proof_time_us = proof_elapsed.as_micros(),
1046            total_elapsed_us = total_elapsed.as_micros(),
1047            total_processed = account_proofs_processed,
1048            "Account multiproof completed"
1049        );
1050
1051        value_encoder_stats
1052    }
1053}
1054
1055/// Queues V2 storage proofs for all accounts in the targets and returns receivers.
1056///
1057/// This function queues all storage proof tasks to the worker pool but returns immediately
1058/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
1059/// computation. This enables interleaved parallelism for better performance.
1060///
1061/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
1062fn dispatch_v2_storage_proofs(
1063    storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
1064    account_targets: &[ProofV2Target],
1065    mut storage_targets: B256Map<Vec<ProofV2Target>>,
1066) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
1067    let mut storage_proof_receivers =
1068        B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
1069
1070    // Collect hashed addresses from account targets that need their storage roots computed
1071    let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
1072
1073    // For storage targets with associated account proofs, ensure the first target has
1074    // min_len(0) so the root node is returned for storage root computation
1075    for (hashed_address, targets) in &mut storage_targets {
1076        if account_target_addresses.contains(hashed_address) &&
1077            let Some(first) = targets.first_mut()
1078        {
1079            *first = first.with_min_len(0);
1080        }
1081    }
1082
1083    // Sort storage targets by address for optimal dispatch order.
1084    // Since trie walk processes accounts in lexicographical order, dispatching in the same order
1085    // reduces head-of-line blocking when consuming results.
1086    let mut sorted_storage_targets: Vec<_> = storage_targets.into_iter().collect();
1087    sorted_storage_targets.sort_unstable_by_key(|(addr, _)| *addr);
1088
1089    // Dispatch all proofs for targeted storage slots
1090    for (hashed_address, targets) in sorted_storage_targets {
1091        // Create channel for receiving StorageProofResultMessage
1092        let (result_tx, result_rx) = crossbeam_channel::unbounded();
1093        let input = StorageProofInput::new(hashed_address, targets);
1094
1095        storage_work_tx
1096            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
1097            .map_err(|_| {
1098                ParallelStateRootError::Other(format!(
1099                    "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
1100                ))
1101            })?;
1102
1103        storage_proof_receivers.insert(hashed_address, result_rx);
1104    }
1105
1106    Ok(storage_proof_receivers)
1107}
1108
1109/// Input parameters for storage proof computation.
1110#[derive(Debug)]
1111pub struct StorageProofInput {
1112    /// The hashed address for which the proof is calculated.
1113    pub hashed_address: B256,
1114    /// The set of proof targets
1115    pub targets: Vec<ProofV2Target>,
1116}
1117
1118impl StorageProofInput {
1119    /// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
1120    pub const fn new(hashed_address: B256, targets: Vec<ProofV2Target>) -> Self {
1121        Self { hashed_address, targets }
1122    }
1123}
1124
1125/// Input parameters for account multiproof computation.
1126#[derive(Debug)]
1127pub struct AccountMultiproofInput {
1128    /// The targets for which to compute the multiproof.
1129    pub targets: MultiProofTargetsV2,
1130    /// Context for sending the proof result.
1131    pub proof_result_sender: ProofResultContext,
1132}
1133
1134impl AccountMultiproofInput {
1135    /// Returns the [`ProofResultContext`] for this input, consuming the input.
1136    fn into_proof_result_sender(self) -> ProofResultContext {
1137        self.proof_result_sender
1138    }
1139}
1140
1141/// Internal message for account workers.
1142#[derive(Debug)]
1143enum AccountWorkerJob {
1144    /// Account multiproof computation request
1145    AccountMultiproof {
1146        /// Account multiproof input parameters
1147        input: Box<AccountMultiproofInput>,
1148    },
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use super::*;
1154    use reth_chainspec::ChainSpec;
1155    use reth_provider::test_utils::create_test_provider_factory_with_chain_spec;
1156    use std::sync::Arc;
1157
1158    fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
1159        ProofTaskCtx::new(factory)
1160    }
1161
1162    /// Ensures `ProofWorkerHandle::new` spawns workers correctly.
1163    #[test]
1164    fn spawn_proof_workers_creates_handle() {
1165        let chain_spec = Arc::new(ChainSpec::default());
1166        let anchor_hash = chain_spec.genesis_hash();
1167        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
1168        let changeset_cache = reth_trie_db::ChangesetCache::new();
1169        let factory = reth_provider::providers::OverlayStateProviderFactory::new(
1170            provider_factory,
1171            reth_provider::providers::OverlayBuilder::<reth_ethereum_primitives::EthPrimitives>::new(
1172                anchor_hash,
1173                changeset_cache,
1174            ),
1175        );
1176        let ctx = test_ctx(factory);
1177
1178        let runtime = reth_tasks::Runtime::test();
1179        let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
1180
1181        // Verify handle can be cloned
1182        let _cloned_handle = proof_handle.clone();
1183
1184        // Workers shut down automatically when handle is dropped
1185        drop(proof_handle);
1186    }
1187}