reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use alloy_evm::block::StateChangeSource;
4use alloy_primitives::{
5    keccak256,
6    map::{B256Set, HashSet},
7    B256,
8};
9use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
10use dashmap::DashMap;
11use derive_more::derive::Deref;
12use metrics::{Gauge, Histogram};
13use reth_metrics::Metrics;
14use reth_revm::state::EvmState;
15use reth_trie::{
16    added_removed_keys::MultiAddedRemovedKeys, prefix_set::TriePrefixSetsMut,
17    updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState, HashedPostStateSorted,
18    HashedStorage, MultiProofTargets, TrieInput,
19};
20use reth_trie_parallel::{
21    proof::ParallelProof,
22    proof_task::{
23        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
24        StorageProofInput,
25    },
26};
27use std::{collections::BTreeMap, ops::DerefMut, sync::Arc, time::Instant};
28use tracing::{debug, error, instrument, trace};
29
30/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
31/// state.
32#[derive(Default, Debug)]
33pub struct SparseTrieUpdate {
34    /// The state update that was used to calculate the proof
35    pub(crate) state: HashedPostState,
36    /// The calculated multiproof
37    pub(crate) multiproof: DecodedMultiProof,
38}
39
40impl SparseTrieUpdate {
41    /// Returns true if the update is empty.
42    pub(super) fn is_empty(&self) -> bool {
43        self.state.is_empty() && self.multiproof.is_empty()
44    }
45
46    /// Construct update from multiproof.
47    #[cfg(test)]
48    pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
49        Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
50    }
51
52    /// Extend update with contents of the other.
53    pub(super) fn extend(&mut self, other: Self) {
54        self.state.extend(other.state);
55        self.multiproof.extend(other.multiproof);
56    }
57}
58
59/// Common configuration for multi proof tasks
60#[derive(Debug, Clone, Default)]
61pub(crate) struct MultiProofConfig {
62    /// The sorted collection of cached in-memory intermediate trie nodes that
63    /// can be reused for computation.
64    pub nodes_sorted: Arc<TrieUpdatesSorted>,
65    /// The sorted in-memory overlay hashed state.
66    pub state_sorted: Arc<HashedPostStateSorted>,
67    /// The collection of prefix sets for the computation. Since the prefix sets _always_
68    /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here,
69    /// if we have cached nodes for them.
70    pub prefix_sets: Arc<TriePrefixSetsMut>,
71}
72
73impl MultiProofConfig {
74    /// Creates a new state root config from the trie input.
75    ///
76    /// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the
77    /// [`TrieInput`].
78    pub(crate) fn from_input(mut input: TrieInput) -> (TrieInput, Self) {
79        let config = Self {
80            nodes_sorted: Arc::new(input.nodes.drain_into_sorted()),
81            state_sorted: Arc::new(input.state.drain_into_sorted()),
82            prefix_sets: Arc::new(input.prefix_sets.clone()),
83        };
84        (input.cleared(), config)
85    }
86}
87
88/// Messages used internally by the multi proof task.
89#[derive(Debug)]
90pub(super) enum MultiProofMessage {
91    /// Prefetch proof targets
92    PrefetchProofs(MultiProofTargets),
93    /// New state update from transaction execution with its source
94    StateUpdate(StateChangeSource, EvmState),
95    /// State update that can be applied to the sparse trie without any new proofs.
96    ///
97    /// It can be the case when all accounts and storage slots from the state update were already
98    /// fetched and revealed.
99    EmptyProof {
100        /// The index of this proof in the sequence of state updates
101        sequence_number: u64,
102        /// The state update that was used to calculate the proof
103        state: HashedPostState,
104    },
105    /// Signals state update stream end.
106    ///
107    /// This is triggered by block execution, indicating that no additional state updates are
108    /// expected.
109    FinishedStateUpdates,
110}
111
112/// Handle to track proof calculation ordering.
113#[derive(Debug, Default)]
114struct ProofSequencer {
115    /// The next proof sequence number to be produced.
116    next_sequence: u64,
117    /// The next sequence number expected to be delivered.
118    next_to_deliver: u64,
119    /// Buffer for out-of-order proofs and corresponding state updates
120    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
121}
122
123impl ProofSequencer {
124    /// Gets the next sequence number and increments the counter
125    const fn next_sequence(&mut self) -> u64 {
126        let seq = self.next_sequence;
127        self.next_sequence += 1;
128        seq
129    }
130
131    /// Adds a proof with the corresponding state update and returns all sequential proofs and state
132    /// updates if we have a continuous sequence
133    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
134        if sequence >= self.next_to_deliver {
135            self.pending_proofs.insert(sequence, update);
136        }
137
138        // return early if we don't have the next expected proof
139        if !self.pending_proofs.contains_key(&self.next_to_deliver) {
140            return Vec::new()
141        }
142
143        let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
144        let mut current_sequence = self.next_to_deliver;
145
146        // keep collecting proofs and state updates as long as we have consecutive sequence numbers
147        while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
148            consecutive_proofs.push(pending);
149            current_sequence += 1;
150
151            // if we don't have the next number, stop collecting
152            if !self.pending_proofs.contains_key(&current_sequence) {
153                break;
154            }
155        }
156
157        self.next_to_deliver += consecutive_proofs.len() as u64;
158
159        consecutive_proofs
160    }
161
162    /// Returns true if we still have pending proofs
163    pub(crate) fn has_pending(&self) -> bool {
164        !self.pending_proofs.is_empty()
165    }
166}
167
168/// A wrapper for the sender that signals completion when dropped.
169///
170/// This type is intended to be used in combination with the evm executor statehook.
171/// This should trigger once the block has been executed (after) the last state update has been
172/// sent. This triggers the exit condition of the multi proof task.
173#[derive(Deref, Debug)]
174pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
175
176impl StateHookSender {
177    pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
178        Self(inner)
179    }
180}
181
182impl Drop for StateHookSender {
183    fn drop(&mut self) {
184        // Send completion signal when the sender is dropped
185        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
186    }
187}
188
189pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
190    let mut hashed_state = HashedPostState::with_capacity(update.len());
191
192    for (address, account) in update {
193        if account.is_touched() {
194            let hashed_address = keccak256(address);
195            trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
196
197            let destroyed = account.is_selfdestructed();
198            let info = if destroyed { None } else { Some(account.info.into()) };
199            hashed_state.accounts.insert(hashed_address, info);
200
201            let mut changed_storage_iter = account
202                .storage
203                .into_iter()
204                .filter(|(_slot, value)| value.is_changed())
205                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
206                .peekable();
207
208            if destroyed {
209                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
210            } else if changed_storage_iter.peek().is_some() {
211                hashed_state
212                    .storages
213                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
214            }
215        }
216    }
217
218    hashed_state
219}
220
221/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
222#[derive(Debug)]
223enum PendingMultiproofTask {
224    /// A storage multiproof task input.
225    Storage(StorageMultiproofInput),
226    /// A regular multiproof task input.
227    Regular(MultiproofInput),
228}
229
230impl PendingMultiproofTask {
231    /// Returns the proof sequence number of the task.
232    const fn proof_sequence_number(&self) -> u64 {
233        match self {
234            Self::Storage(input) => input.proof_sequence_number,
235            Self::Regular(input) => input.proof_sequence_number,
236        }
237    }
238
239    /// Returns whether or not the proof targets are empty.
240    fn proof_targets_is_empty(&self) -> bool {
241        match self {
242            Self::Storage(input) => input.proof_targets.is_empty(),
243            Self::Regular(input) => input.proof_targets.is_empty(),
244        }
245    }
246
247    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
248    fn send_empty_proof(self) {
249        match self {
250            Self::Storage(input) => input.send_empty_proof(),
251            Self::Regular(input) => input.send_empty_proof(),
252        }
253    }
254}
255
256impl From<StorageMultiproofInput> for PendingMultiproofTask {
257    fn from(input: StorageMultiproofInput) -> Self {
258        Self::Storage(input)
259    }
260}
261
262impl From<MultiproofInput> for PendingMultiproofTask {
263    fn from(input: MultiproofInput) -> Self {
264        Self::Regular(input)
265    }
266}
267
268/// Input parameters for dispatching a dedicated storage multiproof calculation.
269#[derive(Debug)]
270struct StorageMultiproofInput {
271    hashed_state_update: HashedPostState,
272    hashed_address: B256,
273    proof_targets: B256Set,
274    proof_sequence_number: u64,
275    state_root_message_sender: CrossbeamSender<MultiProofMessage>,
276    multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
277}
278
279impl StorageMultiproofInput {
280    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
281    fn send_empty_proof(self) {
282        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
283            sequence_number: self.proof_sequence_number,
284            state: self.hashed_state_update,
285        });
286    }
287}
288
289/// Input parameters for dispatching a multiproof calculation.
290#[derive(Debug)]
291struct MultiproofInput {
292    source: Option<StateChangeSource>,
293    hashed_state_update: HashedPostState,
294    proof_targets: MultiProofTargets,
295    proof_sequence_number: u64,
296    state_root_message_sender: CrossbeamSender<MultiProofMessage>,
297    multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
298}
299
300impl MultiproofInput {
301    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
302    fn send_empty_proof(self) {
303        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
304            sequence_number: self.proof_sequence_number,
305            state: self.hashed_state_update,
306        });
307    }
308}
309
310/// Coordinates multiproof dispatch between `MultiProofTask` and the parallel trie workers.
311///
312/// # Flow
313/// 1. `MultiProofTask` asks the manager to dispatch either storage or account proof work.
314/// 2. The manager builds the request, clones `proof_result_tx`, and hands everything to
315///    [`ProofWorkerHandle`].
316/// 3. A worker finishes the proof and sends a [`ProofResultMessage`] through the channel included
317///    in the job.
318/// 4. `MultiProofTask` consumes the message from the same channel and sequences it with
319///    `ProofSequencer`.
320#[derive(Debug)]
321pub struct MultiproofManager {
322    /// Handle to the proof worker pools (storage and account).
323    proof_worker_handle: ProofWorkerHandle,
324    /// Cached storage proof roots for missed leaves; this maps
325    /// hashed (missed) addresses to their storage proof roots.
326    ///
327    /// It is important to cache these. Otherwise, a common account
328    /// (popular ERC-20, etc.) having missed leaves in its path would
329    /// repeatedly calculate these proofs per interacting transaction
330    /// (same account different slots).
331    ///
332    /// This also works well with chunking multiproofs, which may break
333    /// a big account change into different chunks, which may repeatedly
334    /// revisit missed leaves.
335    missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
336    /// Channel sender cloned into each dispatched job so workers can send back the
337    /// `ProofResultMessage`.
338    proof_result_tx: CrossbeamSender<ProofResultMessage>,
339    /// Metrics
340    metrics: MultiProofTaskMetrics,
341}
342
343impl MultiproofManager {
344    /// Creates a new [`MultiproofManager`].
345    fn new(
346        metrics: MultiProofTaskMetrics,
347        proof_worker_handle: ProofWorkerHandle,
348        proof_result_tx: CrossbeamSender<ProofResultMessage>,
349    ) -> Self {
350        // Initialize the max worker gauges with the worker pool sizes
351        metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
352        metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
353
354        Self {
355            metrics,
356            proof_worker_handle,
357            missed_leaves_storage_roots: Default::default(),
358            proof_result_tx,
359        }
360    }
361
362    /// Dispatches a new multiproof calculation to worker pools.
363    fn dispatch(&self, input: PendingMultiproofTask) {
364        // If there are no proof targets, we can just send an empty multiproof back immediately
365        if input.proof_targets_is_empty() {
366            debug!(
367                sequence_number = input.proof_sequence_number(),
368                "No proof targets, sending empty multiproof back immediately"
369            );
370            input.send_empty_proof();
371            return
372        }
373
374        match input {
375            PendingMultiproofTask::Storage(storage_input) => {
376                self.dispatch_storage_proof(storage_input);
377            }
378            PendingMultiproofTask::Regular(multiproof_input) => {
379                self.dispatch_multiproof(multiproof_input);
380            }
381        }
382    }
383
384    /// Dispatches a single storage proof calculation to worker pool.
385    fn dispatch_storage_proof(&self, storage_multiproof_input: StorageMultiproofInput) {
386        let StorageMultiproofInput {
387            hashed_state_update,
388            hashed_address,
389            proof_targets,
390            proof_sequence_number,
391            multi_added_removed_keys,
392            state_root_message_sender: _,
393        } = storage_multiproof_input;
394
395        let storage_targets = proof_targets.len();
396
397        trace!(
398            target: "engine::tree::payload_processor::multiproof",
399            proof_sequence_number,
400            ?proof_targets,
401            storage_targets,
402            "Dispatching storage proof to workers"
403        );
404
405        let start = Instant::now();
406
407        // Create prefix set from targets
408        let prefix_set = reth_trie::prefix_set::PrefixSetMut::from(
409            proof_targets.iter().map(reth_trie::Nibbles::unpack),
410        );
411        let prefix_set = prefix_set.freeze();
412
413        // Build computation input (data only)
414        let input = StorageProofInput::new(
415            hashed_address,
416            prefix_set,
417            proof_targets,
418            true, // with_branch_node_masks
419            Some(multi_added_removed_keys),
420        );
421
422        // Dispatch to storage worker
423        if let Err(e) = self.proof_worker_handle.dispatch_storage_proof(
424            input,
425            ProofResultContext::new(
426                self.proof_result_tx.clone(),
427                proof_sequence_number,
428                hashed_state_update,
429                start,
430            ),
431        ) {
432            error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof");
433            return;
434        }
435
436        self.metrics
437            .active_storage_workers_histogram
438            .record(self.proof_worker_handle.active_storage_workers() as f64);
439        self.metrics
440            .active_account_workers_histogram
441            .record(self.proof_worker_handle.active_account_workers() as f64);
442        self.metrics
443            .pending_storage_multiproofs_histogram
444            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
445        self.metrics
446            .pending_account_multiproofs_histogram
447            .record(self.proof_worker_handle.pending_account_tasks() as f64);
448    }
449
450    /// Signals that a multiproof calculation has finished.
451    fn on_calculation_complete(&self) {
452        self.metrics
453            .active_storage_workers_histogram
454            .record(self.proof_worker_handle.active_storage_workers() as f64);
455        self.metrics
456            .active_account_workers_histogram
457            .record(self.proof_worker_handle.active_account_workers() as f64);
458        self.metrics
459            .pending_storage_multiproofs_histogram
460            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
461        self.metrics
462            .pending_account_multiproofs_histogram
463            .record(self.proof_worker_handle.pending_account_tasks() as f64);
464    }
465
466    /// Dispatches a single multiproof calculation to worker pool.
467    fn dispatch_multiproof(&self, multiproof_input: MultiproofInput) {
468        let MultiproofInput {
469            source,
470            hashed_state_update,
471            proof_targets,
472            proof_sequence_number,
473            state_root_message_sender: _,
474            multi_added_removed_keys,
475        } = multiproof_input;
476
477        let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
478        let account_targets = proof_targets.len();
479        let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
480
481        trace!(
482            target: "engine::tree::payload_processor::multiproof",
483            proof_sequence_number,
484            ?proof_targets,
485            account_targets,
486            storage_targets,
487            ?source,
488            "Dispatching multiproof to workers"
489        );
490
491        let start = Instant::now();
492
493        // Extend prefix sets with targets
494        let frozen_prefix_sets =
495            ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets);
496
497        // Dispatch account multiproof to worker pool with result sender
498        let input = AccountMultiproofInput {
499            targets: proof_targets,
500            prefix_sets: frozen_prefix_sets,
501            collect_branch_node_masks: true,
502            multi_added_removed_keys,
503            missed_leaves_storage_roots,
504            // Workers will send ProofResultMessage directly to proof_result_rx
505            proof_result_sender: ProofResultContext::new(
506                self.proof_result_tx.clone(),
507                proof_sequence_number,
508                hashed_state_update,
509                start,
510            ),
511        };
512
513        if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
514            error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
515            return;
516        }
517
518        self.metrics
519            .active_storage_workers_histogram
520            .record(self.proof_worker_handle.active_storage_workers() as f64);
521        self.metrics
522            .active_account_workers_histogram
523            .record(self.proof_worker_handle.active_account_workers() as f64);
524        self.metrics
525            .pending_storage_multiproofs_histogram
526            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
527        self.metrics
528            .pending_account_multiproofs_histogram
529            .record(self.proof_worker_handle.pending_account_tasks() as f64);
530    }
531}
532
533#[derive(Metrics, Clone)]
534#[metrics(scope = "tree.root")]
535pub(crate) struct MultiProofTaskMetrics {
536    /// Histogram of active storage workers processing proofs.
537    pub active_storage_workers_histogram: Histogram,
538    /// Histogram of active account workers processing proofs.
539    pub active_account_workers_histogram: Histogram,
540    /// Gauge for the maximum number of storage workers in the pool.
541    pub max_storage_workers: Gauge,
542    /// Gauge for the maximum number of account workers in the pool.
543    pub max_account_workers: Gauge,
544    /// Histogram of pending storage multiproofs in the queue.
545    pub pending_storage_multiproofs_histogram: Histogram,
546    /// Histogram of pending account multiproofs in the queue.
547    pub pending_account_multiproofs_histogram: Histogram,
548
549    /// Histogram of the number of prefetch proof target accounts.
550    pub prefetch_proof_targets_accounts_histogram: Histogram,
551    /// Histogram of the number of prefetch proof target storages.
552    pub prefetch_proof_targets_storages_histogram: Histogram,
553    /// Histogram of the number of prefetch proof target chunks.
554    pub prefetch_proof_chunks_histogram: Histogram,
555
556    /// Histogram of the number of state update proof target accounts.
557    pub state_update_proof_targets_accounts_histogram: Histogram,
558    /// Histogram of the number of state update proof target storages.
559    pub state_update_proof_targets_storages_histogram: Histogram,
560    /// Histogram of the number of state update proof target chunks.
561    pub state_update_proof_chunks_histogram: Histogram,
562
563    /// Histogram of proof calculation durations.
564    pub proof_calculation_duration_histogram: Histogram,
565
566    /// Histogram of sparse trie update durations.
567    pub sparse_trie_update_duration_histogram: Histogram,
568    /// Histogram of sparse trie final update durations.
569    pub sparse_trie_final_update_duration_histogram: Histogram,
570    /// Histogram of sparse trie total durations.
571    pub sparse_trie_total_duration_histogram: Histogram,
572
573    /// Histogram of state updates received.
574    pub state_updates_received_histogram: Histogram,
575    /// Histogram of proofs processed.
576    pub proofs_processed_histogram: Histogram,
577    /// Histogram of total time spent in the multiproof task.
578    pub multiproof_task_total_duration_histogram: Histogram,
579    /// Total time spent waiting for the first state update or prefetch request.
580    pub first_update_wait_time_histogram: Histogram,
581    /// Total time spent waiting for the last proof result.
582    pub last_proof_wait_time_histogram: Histogram,
583}
584
585/// Standalone task that receives a transaction state stream and updates relevant
586/// data structures to calculate state root.
587///
588/// ## Architecture: Dual-Channel Multiproof System
589///
590/// This task orchestrates parallel proof computation using a dual-channel architecture that
591/// separates control messages from proof computation results:
592///
593/// ```text
594/// ┌─────────────────────────────────────────────────────────────────┐
595/// │                        MultiProofTask                            │
596/// │                  Event Loop (crossbeam::select!)                 │
597/// └──┬──────────────────────────────────────────────────────────▲───┘
598///    │                                                           │
599///    │ (1) Send proof request                                   │
600///    │     via tx (control channel)                             │
601///    │                                                           │
602///    ▼                                                           │
603/// ┌──────────────────────────────────────────────────────────────┐ │
604/// │             MultiproofManager                                │ │
605/// │  - Deduplicates against fetched_proof_targets                │ │
606/// │  - Routes to appropriate worker pool                         │ │
607/// └──┬───────────────────────────────────────────────────────────┘ │
608///    │                                                             │
609///    │ (2) Dispatch to workers                                    │
610///    │     OR send EmptyProof (fast path)                         │
611///    ▼                                                             │
612/// ┌──────────────────────────────────────────────────────────────┐ │
613/// │              ProofWorkerHandle                                │ │
614/// │  ┌─────────────────────┐   ┌────────────────────────┐        │ │
615/// │  │ Storage Worker Pool │   │ Account Worker Pool     │        │ │
616/// │  │ (spawn_blocking)    │   │ (spawn_blocking)        │        │ │
617/// │  └─────────────────────┘   └────────────────────────┘        │ │
618/// └──┬───────────────────────────────────────────────────────────┘ │
619///    │                                                             │
620///    │ (3) Compute proofs in parallel                             │
621///    │     Send results back                                      │
622///    │                                                             │
623///    ▼                                                             │
624/// ┌──────────────────────────────────────────────────────────────┐ │
625/// │  proof_result_tx (crossbeam unbounded channel)                │ │
626/// │    → ProofResultMessage { multiproof, sequence_number, ... }  │ │
627/// └──────────────────────────────────────────────────────────────┘ │
628///                                                                   │
629///   (4) Receive via crossbeam::select! on two channels: ───────────┘
630///       - rx: Control messages (PrefetchProofs, StateUpdate,
631///             EmptyProof, FinishedStateUpdates)
632///       - proof_result_rx: Computed proof results from workers
633/// ```
634///
635/// ## Component Responsibilities
636///
637/// - **[`MultiProofTask`]**: Event loop coordinator
638///   - Receives state updates from transaction execution
639///   - Deduplicates proof targets against already-fetched proofs
640///   - Sequences proofs to maintain transaction ordering
641///   - Feeds sequenced updates to sparse trie task
642///
643/// - **[`MultiproofManager`]**: Calculation orchestrator
644///   - Decides between fast path ([`EmptyProof`]) and worker dispatch
645///   - Routes storage-only vs full multiproofs to appropriate workers
646///   - Records metrics for monitoring
647///
648/// - **[`ProofWorkerHandle`]**: Worker pool manager
649///   - Maintains separate pools for storage and account proofs
650///   - Dispatches work to blocking threads (CPU-intensive)
651///   - Sends results directly via `proof_result_tx` (bypasses control channel)
652///
653/// [`EmptyProof`]: MultiProofMessage::EmptyProof
654/// [`ProofWorkerHandle`]: reth_trie_parallel::proof_task::ProofWorkerHandle
655///
656/// ## Dual-Channel Design Rationale
657///
658/// The system uses two separate crossbeam channels:
659///
660/// 1. **Control Channel (`tx`/`rx`)**: For orchestration messages
661///    - `PrefetchProofs`: Pre-fetch proofs before execution
662///    - `StateUpdate`: New transaction execution results
663///    - `EmptyProof`: Fast path when all targets already fetched
664///    - `FinishedStateUpdates`: Signal to drain pending work
665///
666/// 2. **Proof Result Channel (`proof_result_tx`/`proof_result_rx`)**: For worker results
667///    - `ProofResultMessage`: Computed multiproofs from worker pools
668///    - Direct path from workers to event loop (no intermediate hops)
669///    - Keeps control messages separate from high-throughput proof data
670///
671/// This separation enables:
672/// - **Non-blocking control**: Control messages never wait behind large proof data
673/// - **Backpressure management**: Each channel can apply different policies
674/// - **Clear ownership**: Workers only need proof result sender, not control channel
675///
676/// ## Initialization and Lifecycle
677///
678/// The task initializes a blinded sparse trie and subscribes to transaction state streams.
679/// As it receives transaction execution results, it fetches proofs for relevant accounts
680/// from the database and reveals them to the tree, then updates relevant leaves according
681/// to transaction results. This feeds updates to the sparse trie task.
682///
683/// See the `run()` method documentation for detailed lifecycle flow.
684#[derive(Debug)]
685pub(super) struct MultiProofTask {
686    /// The size of proof targets chunk to spawn in one calculation.
687    /// If None, chunking is disabled and all targets are processed in a single proof.
688    chunk_size: Option<usize>,
689    /// Receiver for state root related messages (prefetch, state updates, finish signal).
690    rx: CrossbeamReceiver<MultiProofMessage>,
691    /// Sender for state root related messages.
692    tx: CrossbeamSender<MultiProofMessage>,
693    /// Receiver for proof results directly from workers.
694    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
695    /// Sender for state updates emitted by this type.
696    to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
697    /// Proof targets that have been already fetched.
698    fetched_proof_targets: MultiProofTargets,
699    /// Tracks keys which have been added and removed throughout the entire block.
700    multi_added_removed_keys: MultiAddedRemovedKeys,
701    /// Proof sequencing handler.
702    proof_sequencer: ProofSequencer,
703    /// Manages calculation of multiproofs.
704    multiproof_manager: MultiproofManager,
705    /// multi proof task metrics
706    metrics: MultiProofTaskMetrics,
707}
708
709impl MultiProofTask {
710    /// Creates a new multi proof task with the unified message channel
711    pub(super) fn new(
712        proof_worker_handle: ProofWorkerHandle,
713        to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
714        chunk_size: Option<usize>,
715    ) -> Self {
716        let (tx, rx) = unbounded();
717        let (proof_result_tx, proof_result_rx) = unbounded();
718        let metrics = MultiProofTaskMetrics::default();
719
720        Self {
721            chunk_size,
722            rx,
723            tx,
724            proof_result_rx,
725            to_sparse_trie,
726            fetched_proof_targets: Default::default(),
727            multi_added_removed_keys: MultiAddedRemovedKeys::new(),
728            proof_sequencer: ProofSequencer::default(),
729            multiproof_manager: MultiproofManager::new(
730                metrics.clone(),
731                proof_worker_handle,
732                proof_result_tx,
733            ),
734            metrics,
735        }
736    }
737
738    /// Returns a sender that can be used to send arbitrary [`MultiProofMessage`]s to this task.
739    pub(super) fn state_root_message_sender(&self) -> CrossbeamSender<MultiProofMessage> {
740        self.tx.clone()
741    }
742
743    /// Handles request for proof prefetch.
744    ///
745    /// Returns a number of proofs that were spawned.
746    #[instrument(
747        level = "debug",
748        target = "engine::tree::payload_processor::multiproof",
749        skip_all,
750        fields(accounts = targets.len(), chunks = 0)
751    )]
752    fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
753        let proof_targets = self.get_prefetch_proof_targets(targets);
754        self.fetched_proof_targets.extend_ref(&proof_targets);
755
756        // Make sure all target accounts have an `AddedRemovedKeySet` in the
757        // [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
758        // we still want to optimistically fetch extension children for the leaf addition case.
759        self.multi_added_removed_keys.touch_accounts(proof_targets.keys().copied());
760
761        // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
762        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
763
764        self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
765        self.metrics
766            .prefetch_proof_targets_storages_histogram
767            .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
768
769        // Process proof targets in chunks.
770        let mut chunks = 0;
771
772        // Only chunk if multiple account or storage workers are available to take advantage of
773        // parallelism.
774        let should_chunk = self.multiproof_manager.proof_worker_handle.available_account_workers() >
775            1 ||
776            self.multiproof_manager.proof_worker_handle.available_storage_workers() > 1;
777
778        let mut dispatch = |proof_targets| {
779            self.multiproof_manager.dispatch(
780                MultiproofInput {
781                    source: None,
782                    hashed_state_update: Default::default(),
783                    proof_targets,
784                    proof_sequence_number: self.proof_sequencer.next_sequence(),
785                    state_root_message_sender: self.tx.clone(),
786                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
787                }
788                .into(),
789            );
790            chunks += 1;
791        };
792
793        if should_chunk &&
794            let Some(chunk_size) = self.chunk_size &&
795            proof_targets.chunking_length() > chunk_size
796        {
797            let mut chunks = 0usize;
798            for proof_targets_chunk in proof_targets.chunks(chunk_size) {
799                dispatch(proof_targets_chunk);
800                chunks += 1;
801            }
802            tracing::Span::current().record("chunks", chunks);
803        } else {
804            dispatch(proof_targets);
805        }
806
807        self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
808
809        chunks
810    }
811
812    // Returns true if all state updates finished and all proofs processed.
813    fn is_done(
814        &self,
815        proofs_processed: u64,
816        state_update_proofs_requested: u64,
817        prefetch_proofs_requested: u64,
818        updates_finished: bool,
819    ) -> bool {
820        let all_proofs_processed =
821            proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
822        let no_pending = !self.proof_sequencer.has_pending();
823        trace!(
824            target: "engine::tree::payload_processor::multiproof",
825            proofs_processed,
826            state_update_proofs_requested,
827            prefetch_proofs_requested,
828            no_pending,
829            updates_finished,
830            "Checking end condition"
831        );
832        all_proofs_processed && no_pending && updates_finished
833    }
834
835    /// Calls `get_proof_targets` with existing proof targets for prefetching.
836    fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
837        // Here we want to filter out any targets that are already fetched
838        //
839        // This means we need to remove any storage slots that have already been fetched
840        let mut duplicates = 0;
841
842        // First remove all storage targets that are subsets of already fetched storage slots
843        targets.retain(|hashed_address, target_storage| {
844            let keep = self
845                .fetched_proof_targets
846                .get(hashed_address)
847                // do NOT remove if None, because that means the account has not been fetched yet
848                .is_none_or(|fetched_storage| {
849                    // remove if a subset
850                    !target_storage.is_subset(fetched_storage)
851                });
852
853            if !keep {
854                duplicates += target_storage.len();
855            }
856
857            keep
858        });
859
860        // For all non-subset remaining targets, we have to calculate the difference
861        for (hashed_address, target_storage) in targets.deref_mut() {
862            let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
863                // this means the account has not been fetched yet, so we must fetch everything
864                // associated with this account
865                continue
866            };
867
868            let prev_target_storage_len = target_storage.len();
869
870            // keep only the storage slots that have not been fetched yet
871            //
872            // we already removed subsets, so this should only remove duplicates
873            target_storage.retain(|slot| !fetched_storage.contains(slot));
874
875            duplicates += prev_target_storage_len - target_storage.len();
876        }
877
878        if duplicates > 0 {
879            trace!(target: "engine::tree::payload_processor::multiproof", duplicates, "Removed duplicate prefetch proof targets");
880        }
881
882        targets
883    }
884
885    /// Handles state updates.
886    ///
887    /// Returns a number of proofs that were spawned.
888    #[instrument(
889        level = "debug",
890        target = "engine::tree::payload_processor::multiproof",
891        skip(self, update),
892        fields(accounts = update.len(), chunks = 0)
893    )]
894    fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
895        let hashed_state_update = evm_state_to_hashed_post_state(update);
896
897        // Update removed keys based on the state update.
898        self.multi_added_removed_keys.update_with_state(&hashed_state_update);
899
900        // Split the state update into already fetched and not fetched according to the proof
901        // targets.
902        let (fetched_state_update, not_fetched_state_update) = hashed_state_update
903            .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
904
905        let mut state_updates = 0;
906        // If there are any accounts or storage slots that we already fetched the proofs for,
907        // send them immediately, as they don't require dispatching any additional multiproofs.
908        if !fetched_state_update.is_empty() {
909            let _ = self.tx.send(MultiProofMessage::EmptyProof {
910                sequence_number: self.proof_sequencer.next_sequence(),
911                state: fetched_state_update,
912            });
913            state_updates += 1;
914        }
915
916        // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
917        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
918
919        // Process state updates in chunks.
920        let mut chunks = 0;
921
922        let mut spawned_proof_targets = MultiProofTargets::default();
923
924        // Only chunk if multiple account or storage workers are available to take advantage of
925        // parallelism.
926        let should_chunk = self.multiproof_manager.proof_worker_handle.available_account_workers() >
927            1 ||
928            self.multiproof_manager.proof_worker_handle.available_storage_workers() > 1;
929
930        let mut dispatch = |hashed_state_update| {
931            let proof_targets = get_proof_targets(
932                &hashed_state_update,
933                &self.fetched_proof_targets,
934                &multi_added_removed_keys,
935            );
936            spawned_proof_targets.extend_ref(&proof_targets);
937
938            self.multiproof_manager.dispatch(
939                MultiproofInput {
940                    source: Some(source),
941                    hashed_state_update,
942                    proof_targets,
943                    proof_sequence_number: self.proof_sequencer.next_sequence(),
944                    state_root_message_sender: self.tx.clone(),
945                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
946                }
947                .into(),
948            );
949
950            chunks += 1;
951        };
952
953        if should_chunk &&
954            let Some(chunk_size) = self.chunk_size &&
955            not_fetched_state_update.chunking_length() > chunk_size
956        {
957            let mut chunks = 0usize;
958            for chunk in not_fetched_state_update.chunks(chunk_size) {
959                dispatch(chunk);
960                chunks += 1;
961            }
962            tracing::Span::current().record("chunks", chunks);
963        } else {
964            dispatch(not_fetched_state_update);
965        }
966
967        self.metrics
968            .state_update_proof_targets_accounts_histogram
969            .record(spawned_proof_targets.len() as f64);
970        self.metrics
971            .state_update_proof_targets_storages_histogram
972            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
973        self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
974
975        self.fetched_proof_targets.extend(spawned_proof_targets);
976
977        state_updates + chunks
978    }
979
980    /// Handler for new proof calculated, aggregates all the existing sequential proofs.
981    fn on_proof(
982        &mut self,
983        sequence_number: u64,
984        update: SparseTrieUpdate,
985    ) -> Option<SparseTrieUpdate> {
986        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
987
988        ready_proofs
989            .into_iter()
990            // Merge all ready proofs and state updates
991            .reduce(|mut acc_update, update| {
992                acc_update.extend(update);
993                acc_update
994            })
995            // Return None if the resulting proof is empty
996            .filter(|proof| !proof.is_empty())
997    }
998
999    /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the
1000    /// sparse trie, updates the sparse trie, and eventually returns the state root.
1001    ///
1002    /// The lifecycle is the following:
1003    /// 1. Either [`MultiProofMessage::PrefetchProofs`] or [`MultiProofMessage::StateUpdate`] is
1004    ///    received from the engine.
1005    ///    * For [`MultiProofMessage::StateUpdate`], the state update is hashed with
1006    ///      [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are
1007    ///      extracted with [`get_proof_targets`].
1008    ///    * For both messages, proof targets are deduplicated according to `fetched_proof_targets`,
1009    ///      so that the proofs for accounts and storage slots that were already fetched are not
1010    ///      requested again.
1011    /// 2. Using the proof targets, a new multiproof is calculated using
1012    ///    [`MultiproofManager::dispatch`].
1013    ///    * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
1014    ///      sent back to this task along with the original state update.
1015    ///    * Otherwise, the multiproof is dispatched to worker pools and results are sent directly
1016    ///      to this task via the `proof_result_rx` channel as [`ProofResultMessage`].
1017    /// 3. Either [`MultiProofMessage::EmptyProof`] (via control channel) or [`ProofResultMessage`]
1018    ///    (via proof result channel) is received.
1019    ///    * The multiproof is added to the [`ProofSequencer`].
1020    ///    * If the proof sequencer has a contiguous sequence of multiproofs in the same order as
1021    ///      state updates arrived (i.e. transaction order), such sequence is returned.
1022    /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state
1023    ///    updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse
1024    ///    trie task.
1025    /// 5. Steps above are repeated until this task receives a
1026    ///    [`MultiProofMessage::FinishedStateUpdates`].
1027    ///    * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
1028    ///      [`ProofResultMessage`], we check if all proofs have been processed and if there are any
1029    ///      pending proofs in the proof sequencer left to be revealed.
1030    /// 6. This task exits after all pending proofs are processed.
1031    #[instrument(
1032        level = "debug",
1033        name = "MultiProofTask::run",
1034        target = "engine::tree::payload_processor::multiproof",
1035        skip_all
1036    )]
1037    pub(crate) fn run(mut self) {
1038        // TODO convert those into fields
1039        let mut prefetch_proofs_requested = 0;
1040        let mut state_update_proofs_requested = 0;
1041        let mut proofs_processed = 0;
1042
1043        let mut updates_finished = false;
1044
1045        // Timestamp before the first state update or prefetch was received
1046        let start = Instant::now();
1047
1048        // Timestamp when the first state update or prefetch was received
1049        let mut first_update_time = None;
1050        // Timestamp when state updates have finished
1051        let mut updates_finished_time = None;
1052
1053        loop {
1054            trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop");
1055
1056            crossbeam_channel::select_biased! {
1057                recv(self.proof_result_rx) -> proof_msg => {
1058                    match proof_msg {
1059                        Ok(proof_result) => {
1060                            proofs_processed += 1;
1061
1062                            self.metrics
1063                                .proof_calculation_duration_histogram
1064                                .record(proof_result.elapsed);
1065
1066                            self.multiproof_manager.on_calculation_complete();
1067
1068                            // Convert ProofResultMessage to SparseTrieUpdate
1069                            match proof_result.result {
1070                                Ok(proof_result_data) => {
1071                                    debug!(
1072                                        target: "engine::tree::payload_processor::multiproof",
1073                                        sequence = proof_result.sequence_number,
1074                                        total_proofs = proofs_processed,
1075                                        "Processing calculated proof from worker"
1076                                    );
1077
1078                                    let update = SparseTrieUpdate {
1079                                        state: proof_result.state,
1080                                        multiproof: proof_result_data.into_multiproof(),
1081                                    };
1082
1083                                    if let Some(combined_update) =
1084                                        self.on_proof(proof_result.sequence_number, update)
1085                                    {
1086                                        let _ = self.to_sparse_trie.send(combined_update);
1087                                    }
1088                                }
1089                                Err(error) => {
1090                                    error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
1091                                    return
1092                                }
1093                            }
1094
1095                            if self.is_done(
1096                                proofs_processed,
1097                                state_update_proofs_requested,
1098                                prefetch_proofs_requested,
1099                                updates_finished,
1100                            ) {
1101                                debug!(
1102                                    target: "engine::tree::payload_processor::multiproof",
1103                                    "State updates finished and all proofs processed, ending calculation"
1104                                );
1105                                break
1106                            }
1107                        }
1108                        Err(_) => {
1109                            error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
1110                            return
1111                        }
1112                    }
1113                },
1114                recv(self.rx) -> message => {
1115                    match message {
1116                        Ok(msg) => match msg {
1117                            MultiProofMessage::PrefetchProofs(targets) => {
1118                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::PrefetchProofs");
1119
1120                                if first_update_time.is_none() {
1121                                    // record the wait time
1122                                    self.metrics
1123                                        .first_update_wait_time_histogram
1124                                        .record(start.elapsed().as_secs_f64());
1125                                    first_update_time = Some(Instant::now());
1126                                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1127                                }
1128
1129                                let account_targets = targets.len();
1130                                let storage_targets =
1131                                    targets.values().map(|slots| slots.len()).sum::<usize>();
1132                                prefetch_proofs_requested += self.on_prefetch_proof(targets);
1133                                debug!(
1134                                    target: "engine::tree::payload_processor::multiproof",
1135                                    account_targets,
1136                                    storage_targets,
1137                                    prefetch_proofs_requested,
1138                                    "Prefetching proofs"
1139                                );
1140                            }
1141                            MultiProofMessage::StateUpdate(source, update) => {
1142                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
1143
1144                                if first_update_time.is_none() {
1145                                    // record the wait time
1146                                    self.metrics
1147                                        .first_update_wait_time_histogram
1148                                        .record(start.elapsed().as_secs_f64());
1149                                    first_update_time = Some(Instant::now());
1150                                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1151                                }
1152
1153                                let len = update.len();
1154                                state_update_proofs_requested += self.on_state_update(source, update);
1155                                debug!(
1156                                    target: "engine::tree::payload_processor::multiproof",
1157                                    ?source,
1158                                    len,
1159                                    ?state_update_proofs_requested,
1160                                    "Received new state update"
1161                                );
1162                            }
1163                            MultiProofMessage::FinishedStateUpdates => {
1164                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::FinishedStateUpdates");
1165
1166                                updates_finished = true;
1167                                updates_finished_time = Some(Instant::now());
1168
1169                                if self.is_done(
1170                                    proofs_processed,
1171                                    state_update_proofs_requested,
1172                                    prefetch_proofs_requested,
1173                                    updates_finished,
1174                                ) {
1175                                    debug!(
1176                                        target: "engine::tree::payload_processor::multiproof",
1177                                        "State updates finished and all proofs processed, ending calculation"
1178                                    );
1179                                    break
1180                                }
1181                            }
1182                            MultiProofMessage::EmptyProof { sequence_number, state } => {
1183                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::EmptyProof");
1184
1185                                proofs_processed += 1;
1186
1187                                if let Some(combined_update) = self.on_proof(
1188                                    sequence_number,
1189                                    SparseTrieUpdate { state, multiproof: Default::default() },
1190                                ) {
1191                                    let _ = self.to_sparse_trie.send(combined_update);
1192                                }
1193
1194                                if self.is_done(
1195                                    proofs_processed,
1196                                    state_update_proofs_requested,
1197                                    prefetch_proofs_requested,
1198                                    updates_finished,
1199                                ) {
1200                                    debug!(
1201                                        target: "engine::tree::payload_processor::multiproof",
1202                                        "State updates finished and all proofs processed, ending calculation"
1203                                    );
1204                                    break
1205                                }
1206                            }
1207                        },
1208                        Err(_) => {
1209                            error!(target: "engine::tree::payload_processor::multiproof", "State root related message channel closed unexpectedly");
1210                            return
1211                        }
1212                    }
1213                }
1214            }
1215        }
1216
1217        debug!(
1218            target: "engine::tree::payload_processor::multiproof",
1219            total_updates = state_update_proofs_requested,
1220            total_proofs = proofs_processed,
1221            total_time = ?first_update_time.map(|t|t.elapsed()),
1222            time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1223            "All proofs processed, ending calculation"
1224        );
1225
1226        // update total metrics on finish
1227        self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1228        self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1229        if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1230            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1231        }
1232
1233        if let Some(updates_finished_time) = updates_finished_time {
1234            self.metrics
1235                .last_proof_wait_time_histogram
1236                .record(updates_finished_time.elapsed().as_secs_f64());
1237        }
1238    }
1239}
1240
1241/// Returns accounts only with those storages that were not already fetched, and
1242/// if there are no such storages and the account itself was already fetched, the
1243/// account shouldn't be included.
1244fn get_proof_targets(
1245    state_update: &HashedPostState,
1246    fetched_proof_targets: &MultiProofTargets,
1247    multi_added_removed_keys: &MultiAddedRemovedKeys,
1248) -> MultiProofTargets {
1249    let mut targets = MultiProofTargets::default();
1250
1251    // first collect all new accounts (not previously fetched)
1252    for &hashed_address in state_update.accounts.keys() {
1253        if !fetched_proof_targets.contains_key(&hashed_address) {
1254            targets.insert(hashed_address, HashSet::default());
1255        }
1256    }
1257
1258    // then process storage slots for all accounts in the state update
1259    for (hashed_address, storage) in &state_update.storages {
1260        let fetched = fetched_proof_targets.get(hashed_address);
1261        let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1262        let mut changed_slots = storage
1263            .storage
1264            .keys()
1265            .filter(|slot| {
1266                !fetched.is_some_and(|f| f.contains(*slot)) ||
1267                    storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1268            })
1269            .peekable();
1270
1271        // If the storage is wiped, we still need to fetch the account proof.
1272        if storage.wiped && fetched.is_none() {
1273            targets.entry(*hashed_address).or_default();
1274        }
1275
1276        if changed_slots.peek().is_some() {
1277            targets.entry(*hashed_address).or_default().extend(changed_slots);
1278        }
1279    }
1280
1281    targets
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286    use super::*;
1287    use alloy_primitives::map::B256Set;
1288    use reth_provider::{
1289        providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
1290        BlockReader, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader,
1291        TrieReader,
1292    };
1293    use reth_trie::MultiProof;
1294    use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
1295    use revm_primitives::{B256, U256};
1296    use std::sync::OnceLock;
1297    use tokio::runtime::{Handle, Runtime};
1298
1299    /// Get a handle to the test runtime, creating it if necessary
1300    fn get_test_runtime_handle() -> Handle {
1301        static TEST_RT: OnceLock<Runtime> = OnceLock::new();
1302        TEST_RT
1303            .get_or_init(|| {
1304                tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
1305            })
1306            .handle()
1307            .clone()
1308    }
1309
1310    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
1311    where
1312        F: DatabaseProviderFactory<
1313                Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
1314            > + Clone
1315            + Send
1316            + 'static,
1317    {
1318        let rt_handle = get_test_runtime_handle();
1319        let overlay_factory = OverlayStateProviderFactory::new(factory);
1320        let task_ctx = ProofTaskCtx::new(overlay_factory, Default::default());
1321        let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1);
1322        let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
1323
1324        MultiProofTask::new(proof_handle, to_sparse_trie, Some(1))
1325    }
1326
1327    #[test]
1328    fn test_add_proof_in_sequence() {
1329        let mut sequencer = ProofSequencer::default();
1330        let proof1 = MultiProof::default();
1331        let proof2 = MultiProof::default();
1332        sequencer.next_sequence = 2;
1333
1334        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1335        assert_eq!(ready.len(), 1);
1336        assert!(!sequencer.has_pending());
1337
1338        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1339        assert_eq!(ready.len(), 1);
1340        assert!(!sequencer.has_pending());
1341    }
1342
1343    #[test]
1344    fn test_add_proof_out_of_order() {
1345        let mut sequencer = ProofSequencer::default();
1346        let proof1 = MultiProof::default();
1347        let proof2 = MultiProof::default();
1348        let proof3 = MultiProof::default();
1349        sequencer.next_sequence = 3;
1350
1351        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1352        assert_eq!(ready.len(), 0);
1353        assert!(sequencer.has_pending());
1354
1355        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1356        assert_eq!(ready.len(), 1);
1357        assert!(sequencer.has_pending());
1358
1359        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1360        assert_eq!(ready.len(), 2);
1361        assert!(!sequencer.has_pending());
1362    }
1363
1364    #[test]
1365    fn test_add_proof_with_gaps() {
1366        let mut sequencer = ProofSequencer::default();
1367        let proof1 = MultiProof::default();
1368        let proof3 = MultiProof::default();
1369        sequencer.next_sequence = 3;
1370
1371        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1372        assert_eq!(ready.len(), 1);
1373
1374        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1375        assert_eq!(ready.len(), 0);
1376        assert!(sequencer.has_pending());
1377    }
1378
1379    #[test]
1380    fn test_add_proof_duplicate_sequence() {
1381        let mut sequencer = ProofSequencer::default();
1382        let proof1 = MultiProof::default();
1383        let proof2 = MultiProof::default();
1384
1385        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1386        assert_eq!(ready.len(), 1);
1387
1388        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1389        assert_eq!(ready.len(), 0);
1390        assert!(!sequencer.has_pending());
1391    }
1392
1393    #[test]
1394    fn test_add_proof_batch_processing() {
1395        let mut sequencer = ProofSequencer::default();
1396        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1397        sequencer.next_sequence = 5;
1398
1399        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1400        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1401        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1402        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1403
1404        let ready =
1405            sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1406        assert_eq!(ready.len(), 5);
1407        assert!(!sequencer.has_pending());
1408    }
1409
1410    fn create_get_proof_targets_state() -> HashedPostState {
1411        let mut state = HashedPostState::default();
1412
1413        let addr1 = B256::random();
1414        let addr2 = B256::random();
1415        state.accounts.insert(addr1, Some(Default::default()));
1416        state.accounts.insert(addr2, Some(Default::default()));
1417
1418        let mut storage = HashedStorage::default();
1419        let slot1 = B256::random();
1420        let slot2 = B256::random();
1421        storage.storage.insert(slot1, U256::ZERO);
1422        storage.storage.insert(slot2, U256::from(1));
1423        state.storages.insert(addr1, storage);
1424
1425        state
1426    }
1427
1428    #[test]
1429    fn test_get_proof_targets_new_account_targets() {
1430        let state = create_get_proof_targets_state();
1431        let fetched = MultiProofTargets::default();
1432
1433        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1434
1435        // should return all accounts as targets since nothing was fetched before
1436        assert_eq!(targets.len(), state.accounts.len());
1437        for addr in state.accounts.keys() {
1438            assert!(targets.contains_key(addr));
1439        }
1440    }
1441
1442    #[test]
1443    fn test_get_proof_targets_new_storage_targets() {
1444        let state = create_get_proof_targets_state();
1445        let fetched = MultiProofTargets::default();
1446
1447        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1448
1449        // verify storage slots are included for accounts with storage
1450        for (addr, storage) in &state.storages {
1451            assert!(targets.contains_key(addr));
1452            let target_slots = &targets[addr];
1453            assert_eq!(target_slots.len(), storage.storage.len());
1454            for slot in storage.storage.keys() {
1455                assert!(target_slots.contains(slot));
1456            }
1457        }
1458    }
1459
1460    #[test]
1461    fn test_get_proof_targets_filter_already_fetched_accounts() {
1462        let state = create_get_proof_targets_state();
1463        let mut fetched = MultiProofTargets::default();
1464
1465        // select an account that has no storage updates
1466        let fetched_addr = state
1467            .accounts
1468            .keys()
1469            .find(|&&addr| !state.storages.contains_key(&addr))
1470            .expect("Should have an account without storage");
1471
1472        // mark the account as already fetched
1473        fetched.insert(*fetched_addr, HashSet::default());
1474
1475        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1476
1477        // should not include the already fetched account since it has no storage updates
1478        assert!(!targets.contains_key(fetched_addr));
1479        // other accounts should still be included
1480        assert_eq!(targets.len(), state.accounts.len() - 1);
1481    }
1482
1483    #[test]
1484    fn test_get_proof_targets_filter_already_fetched_storage() {
1485        let state = create_get_proof_targets_state();
1486        let mut fetched = MultiProofTargets::default();
1487
1488        // mark one storage slot as already fetched
1489        let (addr, storage) = state.storages.iter().next().unwrap();
1490        let mut fetched_slots = HashSet::default();
1491        let fetched_slot = *storage.storage.keys().next().unwrap();
1492        fetched_slots.insert(fetched_slot);
1493        fetched.insert(*addr, fetched_slots);
1494
1495        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1496
1497        // should not include the already fetched storage slot
1498        let target_slots = &targets[addr];
1499        assert!(!target_slots.contains(&fetched_slot));
1500        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1501    }
1502
1503    #[test]
1504    fn test_get_proof_targets_empty_state() {
1505        let state = HashedPostState::default();
1506        let fetched = MultiProofTargets::default();
1507
1508        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1509
1510        assert!(targets.is_empty());
1511    }
1512
1513    #[test]
1514    fn test_get_proof_targets_mixed_fetched_state() {
1515        let mut state = HashedPostState::default();
1516        let mut fetched = MultiProofTargets::default();
1517
1518        let addr1 = B256::random();
1519        let addr2 = B256::random();
1520        let slot1 = B256::random();
1521        let slot2 = B256::random();
1522
1523        state.accounts.insert(addr1, Some(Default::default()));
1524        state.accounts.insert(addr2, Some(Default::default()));
1525
1526        let mut storage = HashedStorage::default();
1527        storage.storage.insert(slot1, U256::ZERO);
1528        storage.storage.insert(slot2, U256::from(1));
1529        state.storages.insert(addr1, storage);
1530
1531        let mut fetched_slots = HashSet::default();
1532        fetched_slots.insert(slot1);
1533        fetched.insert(addr1, fetched_slots);
1534
1535        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1536
1537        assert!(targets.contains_key(&addr2));
1538        assert!(!targets[&addr1].contains(&slot1));
1539        assert!(targets[&addr1].contains(&slot2));
1540    }
1541
1542    #[test]
1543    fn test_get_proof_targets_unmodified_account_with_storage() {
1544        let mut state = HashedPostState::default();
1545        let fetched = MultiProofTargets::default();
1546
1547        let addr = B256::random();
1548        let slot1 = B256::random();
1549        let slot2 = B256::random();
1550
1551        // don't add the account to state.accounts (simulating unmodified account)
1552        // but add storage updates for this account
1553        let mut storage = HashedStorage::default();
1554        storage.storage.insert(slot1, U256::from(1));
1555        storage.storage.insert(slot2, U256::from(2));
1556        state.storages.insert(addr, storage);
1557
1558        assert!(!state.accounts.contains_key(&addr));
1559        assert!(!fetched.contains_key(&addr));
1560
1561        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1562
1563        // verify that we still get the storage slots for the unmodified account
1564        assert!(targets.contains_key(&addr));
1565
1566        let target_slots = &targets[&addr];
1567        assert_eq!(target_slots.len(), 2);
1568        assert!(target_slots.contains(&slot1));
1569        assert!(target_slots.contains(&slot2));
1570    }
1571
1572    #[test]
1573    fn test_get_prefetch_proof_targets_no_duplicates() {
1574        let test_provider_factory = create_test_provider_factory();
1575        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1576
1577        // populate some targets
1578        let mut targets = MultiProofTargets::default();
1579        let addr1 = B256::random();
1580        let addr2 = B256::random();
1581        let slot1 = B256::random();
1582        let slot2 = B256::random();
1583        targets.insert(addr1, std::iter::once(slot1).collect());
1584        targets.insert(addr2, std::iter::once(slot2).collect());
1585
1586        let prefetch_proof_targets =
1587            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1588
1589        // check that the prefetch proof targets are the same because there are no fetched proof
1590        // targets yet
1591        assert_eq!(prefetch_proof_targets, targets);
1592
1593        // add a different addr and slot to fetched proof targets
1594        let addr3 = B256::random();
1595        let slot3 = B256::random();
1596        test_state_root_task.fetched_proof_targets.insert(addr3, std::iter::once(slot3).collect());
1597
1598        let prefetch_proof_targets =
1599            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1600
1601        // check that the prefetch proof targets are the same because the fetched proof targets
1602        // don't overlap with the prefetch targets
1603        assert_eq!(prefetch_proof_targets, targets);
1604    }
1605
1606    #[test]
1607    fn test_get_prefetch_proof_targets_remove_subset() {
1608        let test_provider_factory = create_test_provider_factory();
1609        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1610
1611        // populate some targe
1612        let mut targets = MultiProofTargets::default();
1613        let addr1 = B256::random();
1614        let addr2 = B256::random();
1615        let slot1 = B256::random();
1616        let slot2 = B256::random();
1617        targets.insert(addr1, std::iter::once(slot1).collect());
1618        targets.insert(addr2, std::iter::once(slot2).collect());
1619
1620        // add a subset of the first target to fetched proof targets
1621        test_state_root_task.fetched_proof_targets.insert(addr1, std::iter::once(slot1).collect());
1622
1623        let prefetch_proof_targets =
1624            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1625
1626        // check that the prefetch proof targets do not include the subset
1627        assert_eq!(prefetch_proof_targets.len(), 1);
1628        assert!(!prefetch_proof_targets.contains_key(&addr1));
1629        assert!(prefetch_proof_targets.contains_key(&addr2));
1630
1631        // now add one more slot to the prefetch targets
1632        let slot3 = B256::random();
1633        targets.get_mut(&addr1).unwrap().insert(slot3);
1634
1635        let prefetch_proof_targets =
1636            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1637
1638        // check that the prefetch proof targets do not include the subset
1639        // but include the new slot
1640        assert_eq!(prefetch_proof_targets.len(), 2);
1641        assert!(prefetch_proof_targets.contains_key(&addr1));
1642        assert_eq!(
1643            *prefetch_proof_targets.get(&addr1).unwrap(),
1644            std::iter::once(slot3).collect::<B256Set>()
1645        );
1646        assert!(prefetch_proof_targets.contains_key(&addr2));
1647        assert_eq!(
1648            *prefetch_proof_targets.get(&addr2).unwrap(),
1649            std::iter::once(slot2).collect::<B256Set>()
1650        );
1651    }
1652
1653    #[test]
1654    fn test_get_proof_targets_with_removed_storage_keys() {
1655        let mut state = HashedPostState::default();
1656        let mut fetched = MultiProofTargets::default();
1657        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1658
1659        let addr = B256::random();
1660        let slot1 = B256::random();
1661        let slot2 = B256::random();
1662
1663        // add account to state
1664        state.accounts.insert(addr, Some(Default::default()));
1665
1666        // add storage updates
1667        let mut storage = HashedStorage::default();
1668        storage.storage.insert(slot1, U256::from(100));
1669        storage.storage.insert(slot2, U256::from(200));
1670        state.storages.insert(addr, storage);
1671
1672        // mark slot1 as already fetched
1673        let mut fetched_slots = HashSet::default();
1674        fetched_slots.insert(slot1);
1675        fetched.insert(addr, fetched_slots);
1676
1677        // update multi_added_removed_keys to mark slot1 as removed
1678        let mut removed_state = HashedPostState::default();
1679        let mut removed_storage = HashedStorage::default();
1680        removed_storage.storage.insert(slot1, U256::ZERO); // U256::ZERO marks as removed
1681        removed_state.storages.insert(addr, removed_storage);
1682        multi_added_removed_keys.update_with_state(&removed_state);
1683
1684        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1685
1686        // slot1 should be included despite being fetched, because it's marked as removed
1687        assert!(targets.contains_key(&addr));
1688        let target_slots = &targets[&addr];
1689        assert_eq!(target_slots.len(), 2);
1690        assert!(target_slots.contains(&slot1)); // included because it's removed
1691        assert!(target_slots.contains(&slot2)); // included because it's not fetched
1692    }
1693
1694    #[test]
1695    fn test_get_proof_targets_with_wiped_storage() {
1696        let mut state = HashedPostState::default();
1697        let fetched = MultiProofTargets::default();
1698        let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1699
1700        let addr = B256::random();
1701        let slot1 = B256::random();
1702
1703        // add account to state
1704        state.accounts.insert(addr, Some(Default::default()));
1705
1706        // add wiped storage
1707        let mut storage = HashedStorage::new(true);
1708        storage.storage.insert(slot1, U256::from(100));
1709        state.storages.insert(addr, storage);
1710
1711        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1712
1713        // account should be included because storage is wiped and account wasn't fetched
1714        assert!(targets.contains_key(&addr));
1715        let target_slots = &targets[&addr];
1716        assert_eq!(target_slots.len(), 1);
1717        assert!(target_slots.contains(&slot1));
1718    }
1719
1720    #[test]
1721    fn test_get_proof_targets_removed_keys_not_in_state_update() {
1722        let mut state = HashedPostState::default();
1723        let mut fetched = MultiProofTargets::default();
1724        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1725
1726        let addr = B256::random();
1727        let slot1 = B256::random();
1728        let slot2 = B256::random();
1729        let slot3 = B256::random();
1730
1731        // add account to state
1732        state.accounts.insert(addr, Some(Default::default()));
1733
1734        // add storage updates for slot1 and slot2 only
1735        let mut storage = HashedStorage::default();
1736        storage.storage.insert(slot1, U256::from(100));
1737        storage.storage.insert(slot2, U256::from(200));
1738        state.storages.insert(addr, storage);
1739
1740        // mark all slots as already fetched
1741        let mut fetched_slots = HashSet::default();
1742        fetched_slots.insert(slot1);
1743        fetched_slots.insert(slot2);
1744        fetched_slots.insert(slot3); // slot3 is fetched but not in state update
1745        fetched.insert(addr, fetched_slots);
1746
1747        // mark slot3 as removed (even though it's not in the state update)
1748        let mut removed_state = HashedPostState::default();
1749        let mut removed_storage = HashedStorage::default();
1750        removed_storage.storage.insert(slot3, U256::ZERO);
1751        removed_state.storages.insert(addr, removed_storage);
1752        multi_added_removed_keys.update_with_state(&removed_state);
1753
1754        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1755
1756        // only slots in the state update can be included, so slot3 should not appear
1757        assert!(!targets.contains_key(&addr));
1758    }
1759}