Skip to main content

reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use crate::tree::payload_processor::bal::bal_to_hashed_post_state;
4use alloy_eip7928::BlockAccessList;
5use alloy_evm::block::StateChangeSource;
6use alloy_primitives::{keccak256, map::HashSet, B256};
7use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
8use derive_more::derive::Deref;
9use metrics::{Gauge, Histogram};
10use reth_metrics::Metrics;
11use reth_provider::AccountReader;
12use reth_revm::state::EvmState;
13use reth_trie::{
14    added_removed_keys::MultiAddedRemovedKeys, proof_v2, HashedPostState, HashedStorage,
15    MultiProofTargets,
16};
17#[cfg(test)]
18use reth_trie_parallel::stats::ParallelTrieTracker;
19use reth_trie_parallel::{
20    proof::ParallelProof,
21    proof_task::{
22        AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
23        ProofWorkerHandle,
24    },
25    targets_v2::MultiProofTargetsV2,
26};
27use revm_primitives::map::{hash_map, B256Map};
28use std::{collections::BTreeMap, sync::Arc, time::Instant};
29use tracing::{debug, error, instrument, trace};
30
31/// Source of state changes, either from EVM execution or from a Block Access List.
32#[derive(Clone, Copy)]
33pub enum Source {
34    /// State changes from EVM execution.
35    Evm(StateChangeSource),
36    /// State changes from Block Access List (EIP-7928).
37    BlockAccessList,
38}
39
40impl std::fmt::Debug for Source {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            Self::Evm(source) => source.fmt(f),
44            Self::BlockAccessList => f.write_str("BlockAccessList"),
45        }
46    }
47}
48
49impl From<StateChangeSource> for Source {
50    fn from(source: StateChangeSource) -> Self {
51        Self::Evm(source)
52    }
53}
54
55/// Maximum number of targets to batch together for prefetch batching.
56/// Prefetches are just proof requests (no state merging), so we allow a higher cap than state
57/// updates
58const PREFETCH_MAX_BATCH_TARGETS: usize = 512;
59
60/// Maximum number of prefetch messages to batch together.
61/// Prevents excessive batching even with small messages.
62const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
63
64/// The default max targets, for limiting the number of account and storage proof targets to be
65/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
66pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
67
68/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
69/// state.
70#[derive(Debug)]
71pub struct SparseTrieUpdate {
72    /// The state update that was used to calculate the proof
73    pub(crate) state: HashedPostState,
74    /// The calculated multiproof
75    pub(crate) multiproof: ProofResult,
76}
77
78impl SparseTrieUpdate {
79    /// Returns true if the update is empty.
80    pub(super) fn is_empty(&self) -> bool {
81        self.state.is_empty() && self.multiproof.is_empty()
82    }
83
84    /// Construct update from multiproof.
85    #[cfg(test)]
86    pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
87        let stats = ParallelTrieTracker::default().finish();
88        Ok(Self {
89            state: HashedPostState::default(),
90            multiproof: ProofResult::Legacy(multiproof.try_into()?, stats),
91        })
92    }
93
94    /// Extend update with contents of the other.
95    pub(super) fn extend(&mut self, other: Self) {
96        self.state.extend(other.state);
97        self.multiproof.extend(other.multiproof);
98    }
99}
100
101/// Messages used internally by the multi proof task.
102#[derive(Debug)]
103pub enum MultiProofMessage {
104    /// Prefetch proof targets
105    PrefetchProofs(VersionedMultiProofTargets),
106    /// New state update from transaction execution with its source
107    StateUpdate(Source, EvmState),
108    /// State update that can be applied to the sparse trie without any new proofs.
109    ///
110    /// It can be the case when all accounts and storage slots from the state update were already
111    /// fetched and revealed.
112    EmptyProof {
113        /// The index of this proof in the sequence of state updates
114        sequence_number: u64,
115        /// The state update that was used to calculate the proof
116        state: HashedPostState,
117    },
118    /// Pre-hashed state update from BAL conversion that can be applied directly without proofs.
119    HashedStateUpdate(HashedPostState),
120    /// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
121    ///
122    /// When received, the task generates a single state update from the BAL and processes it.
123    /// No further messages are expected after receiving this variant.
124    BlockAccessList(Arc<BlockAccessList>),
125    /// Signals state update stream end.
126    ///
127    /// This is triggered by block execution, indicating that no additional state updates are
128    /// expected.
129    FinishedStateUpdates,
130}
131
132/// Handle to track proof calculation ordering.
133#[derive(Debug, Default)]
134struct ProofSequencer {
135    /// The next proof sequence number to be produced.
136    next_sequence: u64,
137    /// The next sequence number expected to be delivered.
138    next_to_deliver: u64,
139    /// Buffer for out-of-order proofs and corresponding state updates
140    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
141}
142
143impl ProofSequencer {
144    /// Gets the next sequence number and increments the counter
145    const fn next_sequence(&mut self) -> u64 {
146        let seq = self.next_sequence;
147        self.next_sequence += 1;
148        seq
149    }
150
151    /// Adds a proof with the corresponding state update and returns all sequential proofs and state
152    /// updates if we have a continuous sequence
153    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
154        // Optimization: fast path for in-order delivery to avoid BTreeMap overhead.
155        // If this is the expected sequence, return it immediately without buffering.
156        if sequence == self.next_to_deliver {
157            let mut consecutive_proofs = Vec::with_capacity(1);
158            consecutive_proofs.push(update);
159            self.next_to_deliver += 1;
160
161            // Check if we have subsequent proofs in the pending buffer
162            while let Some(pending) = self.pending_proofs.remove(&self.next_to_deliver) {
163                consecutive_proofs.push(pending);
164                self.next_to_deliver += 1;
165            }
166
167            return consecutive_proofs;
168        }
169
170        if sequence > self.next_to_deliver {
171            self.pending_proofs.insert(sequence, update);
172        }
173
174        Vec::new()
175    }
176
177    /// Returns true if we still have pending proofs
178    pub(crate) fn has_pending(&self) -> bool {
179        !self.pending_proofs.is_empty()
180    }
181}
182
183/// A wrapper for the sender that signals completion when dropped.
184///
185/// This type is intended to be used in combination with the evm executor statehook.
186/// This should trigger once the block has been executed (after) the last state update has been
187/// sent. This triggers the exit condition of the multi proof task.
188#[derive(Deref, Debug)]
189pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
190
191impl StateHookSender {
192    pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
193        Self(inner)
194    }
195}
196
197impl Drop for StateHookSender {
198    fn drop(&mut self) {
199        // Send completion signal when the sender is dropped
200        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
201    }
202}
203
204pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
205    let mut hashed_state = HashedPostState::with_capacity(update.len());
206
207    for (address, account) in update {
208        if account.is_touched() {
209            let hashed_address = keccak256(address);
210            trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
211
212            let destroyed = account.is_selfdestructed();
213            let info = if destroyed { None } else { Some(account.info.into()) };
214            hashed_state.accounts.insert(hashed_address, info);
215
216            let mut changed_storage_iter = account
217                .storage
218                .into_iter()
219                .filter(|(_slot, value)| value.is_changed())
220                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
221                .peekable();
222
223            if destroyed {
224                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
225            } else if changed_storage_iter.peek().is_some() {
226                hashed_state
227                    .storages
228                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
229            }
230        }
231    }
232
233    hashed_state
234}
235
236/// Extends a `MultiProofTargets` with the contents of a `VersionedMultiProofTargets`,
237/// regardless of which variant the latter is.
238fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiProofTargets) {
239    match src {
240        VersionedMultiProofTargets::Legacy(targets) => {
241            dest.extend_ref(targets);
242        }
243        VersionedMultiProofTargets::V2(targets) => {
244            // Add all account targets
245            for target in &targets.account_targets {
246                dest.entry(target.key()).or_default();
247            }
248
249            // Add all storage targets
250            for (hashed_address, slots) in &targets.storage_targets {
251                let slot_set = dest.entry(*hashed_address).or_default();
252                for slot in slots {
253                    slot_set.insert(slot.key());
254                }
255            }
256        }
257    }
258}
259
260/// A set of multiproof targets which can be either in the legacy or V2 representations.
261#[derive(Debug)]
262pub enum VersionedMultiProofTargets {
263    /// Legacy targets
264    Legacy(MultiProofTargets),
265    /// V2 targets
266    V2(MultiProofTargetsV2),
267}
268
269impl VersionedMultiProofTargets {
270    /// Returns true if there are no account or storage targets.
271    fn is_empty(&self) -> bool {
272        match self {
273            Self::Legacy(targets) => targets.is_empty(),
274            Self::V2(targets) => targets.is_empty(),
275        }
276    }
277
278    /// Returns the number of account targets in the multiproof target
279    fn account_targets_len(&self) -> usize {
280        match self {
281            Self::Legacy(targets) => targets.len(),
282            Self::V2(targets) => targets.account_targets.len(),
283        }
284    }
285
286    /// Returns the number of storage targets in the multiproof target
287    fn storage_targets_len(&self) -> usize {
288        match self {
289            Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum::<usize>(),
290            Self::V2(targets) => {
291                targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
292            }
293        }
294    }
295
296    /// Returns the number of accounts in the multiproof targets.
297    fn len(&self) -> usize {
298        match self {
299            Self::Legacy(targets) => targets.len(),
300            Self::V2(targets) => targets.account_targets.len(),
301        }
302    }
303
304    /// Returns the total storage slot count across all accounts.
305    fn storage_count(&self) -> usize {
306        match self {
307            Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum(),
308            Self::V2(targets) => targets.storage_targets.values().map(|slots| slots.len()).sum(),
309        }
310    }
311
312    /// Returns the number of items that will be considered during chunking.
313    fn chunking_length(&self) -> usize {
314        match self {
315            Self::Legacy(targets) => targets.chunking_length(),
316            Self::V2(targets) => targets.chunking_length(),
317        }
318    }
319
320    /// Retains the targets representing the difference with another `MultiProofTargets`.
321    /// Removes all targets that are already present in `other`.
322    fn retain_difference(&mut self, other: &MultiProofTargets) {
323        match self {
324            Self::Legacy(targets) => {
325                targets.retain_difference(other);
326            }
327            Self::V2(targets) => {
328                // Remove account targets that exist in other
329                targets.account_targets.retain(|target| !other.contains_key(&target.key()));
330
331                // For each account in storage_targets, remove slots that exist in other
332                targets.storage_targets.retain(|hashed_address, slots| {
333                    if let Some(other_slots) = other.get(hashed_address) {
334                        slots.retain(|slot| !other_slots.contains(&slot.key()));
335                        !slots.is_empty()
336                    } else {
337                        true
338                    }
339                });
340            }
341        }
342    }
343
344    /// Extends this `VersionedMultiProofTargets` with the contents of another.
345    ///
346    /// Panics if the variants do not match.
347    fn extend(&mut self, other: Self) {
348        match (self, other) {
349            (Self::Legacy(dest), Self::Legacy(src)) => {
350                dest.extend(src);
351            }
352            (Self::V2(dest), Self::V2(src)) => {
353                dest.account_targets.extend(src.account_targets);
354                for (addr, slots) in src.storage_targets {
355                    dest.storage_targets.entry(addr).or_default().extend(slots);
356                }
357            }
358            _ => panic!("Cannot extend VersionedMultiProofTargets with mismatched variants"),
359        }
360    }
361
362    /// Chunks this `VersionedMultiProofTargets` into smaller chunks of the given size.
363    fn chunks(self, chunk_size: usize) -> Box<dyn Iterator<Item = Self>> {
364        match self {
365            Self::Legacy(targets) => {
366                Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy))
367            }
368            Self::V2(targets) => Box::new(targets.chunks(chunk_size).map(Self::V2)),
369        }
370    }
371}
372
373/// Input parameters for dispatching a multiproof calculation.
374#[derive(Debug)]
375struct MultiproofInput {
376    source: Option<Source>,
377    hashed_state_update: HashedPostState,
378    proof_targets: VersionedMultiProofTargets,
379    proof_sequence_number: u64,
380    state_root_message_sender: CrossbeamSender<MultiProofMessage>,
381    multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
382}
383
384impl MultiproofInput {
385    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
386    fn send_empty_proof(self) {
387        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
388            sequence_number: self.proof_sequence_number,
389            state: self.hashed_state_update,
390        });
391    }
392}
393
394/// Coordinates multiproof dispatch between `MultiProofTask` and the parallel trie workers.
395///
396/// # Flow
397/// 1. `MultiProofTask` asks the manager to dispatch either storage or account proof work.
398/// 2. The manager builds the request, clones `proof_result_tx`, and hands everything to
399///    [`ProofWorkerHandle`].
400/// 3. A worker finishes the proof and sends a [`ProofResultMessage`] through the channel included
401///    in the job.
402/// 4. `MultiProofTask` consumes the message from the same channel and sequences it with
403///    `ProofSequencer`.
404#[derive(Debug)]
405pub struct MultiproofManager {
406    /// Handle to the proof worker pools (storage and account).
407    proof_worker_handle: ProofWorkerHandle,
408    /// Channel sender cloned into each dispatched job so workers can send back the
409    /// `ProofResultMessage`.
410    proof_result_tx: CrossbeamSender<ProofResultMessage>,
411    /// Metrics
412    metrics: MultiProofTaskMetrics,
413}
414
415impl MultiproofManager {
416    /// Creates a new [`MultiproofManager`].
417    fn new(
418        metrics: MultiProofTaskMetrics,
419        proof_worker_handle: ProofWorkerHandle,
420        proof_result_tx: CrossbeamSender<ProofResultMessage>,
421    ) -> Self {
422        // Initialize the max worker gauges with the worker pool sizes
423        metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
424        metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
425
426        Self { metrics, proof_worker_handle, proof_result_tx }
427    }
428
429    /// Dispatches a new multiproof calculation to worker pools.
430    fn dispatch(&self, input: MultiproofInput) {
431        // If there are no proof targets, we can just send an empty multiproof back immediately
432        if input.proof_targets.is_empty() {
433            trace!(
434                sequence_number = input.proof_sequence_number,
435                "No proof targets, sending empty multiproof back immediately"
436            );
437            input.send_empty_proof();
438            return;
439        }
440
441        self.dispatch_multiproof(input);
442    }
443
444    /// Signals that a multiproof calculation has finished.
445    fn on_calculation_complete(&self) {
446        self.metrics
447            .active_storage_workers_histogram
448            .record(self.proof_worker_handle.active_storage_workers() as f64);
449        self.metrics
450            .active_account_workers_histogram
451            .record(self.proof_worker_handle.active_account_workers() as f64);
452        self.metrics
453            .pending_storage_multiproofs_histogram
454            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
455        self.metrics
456            .pending_account_multiproofs_histogram
457            .record(self.proof_worker_handle.pending_account_tasks() as f64);
458    }
459
460    /// Dispatches a single multiproof calculation to worker pool.
461    fn dispatch_multiproof(&self, multiproof_input: MultiproofInput) {
462        let MultiproofInput {
463            source,
464            hashed_state_update,
465            proof_targets,
466            proof_sequence_number,
467            state_root_message_sender: _,
468            multi_added_removed_keys,
469        } = multiproof_input;
470
471        trace!(
472            target: "engine::tree::payload_processor::multiproof",
473            proof_sequence_number,
474            ?proof_targets,
475            account_targets = proof_targets.account_targets_len(),
476            storage_targets = proof_targets.storage_targets_len(),
477            ?source,
478            "Dispatching multiproof to workers"
479        );
480
481        let start = Instant::now();
482
483        // Workers will send ProofResultMessage directly to proof_result_rx
484        let proof_result_sender = ProofResultContext::new(
485            self.proof_result_tx.clone(),
486            proof_sequence_number,
487            hashed_state_update,
488            start,
489        );
490
491        let input = match proof_targets {
492            VersionedMultiProofTargets::Legacy(proof_targets) => {
493                // Extend prefix sets with targets
494                let frozen_prefix_sets = ParallelProof::extend_prefix_sets_with_targets(
495                    &Default::default(),
496                    &proof_targets,
497                );
498
499                AccountMultiproofInput::Legacy {
500                    targets: proof_targets,
501                    prefix_sets: frozen_prefix_sets,
502                    collect_branch_node_masks: true,
503                    multi_added_removed_keys,
504                    proof_result_sender,
505                }
506            }
507            VersionedMultiProofTargets::V2(proof_targets) => {
508                AccountMultiproofInput::V2 { targets: proof_targets, proof_result_sender }
509            }
510        };
511
512        // Dispatch account multiproof to worker pool with result sender
513        if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
514            error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
515            return;
516        }
517
518        self.metrics
519            .active_storage_workers_histogram
520            .record(self.proof_worker_handle.active_storage_workers() as f64);
521        self.metrics
522            .active_account_workers_histogram
523            .record(self.proof_worker_handle.active_account_workers() as f64);
524        self.metrics
525            .pending_storage_multiproofs_histogram
526            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
527        self.metrics
528            .pending_account_multiproofs_histogram
529            .record(self.proof_worker_handle.pending_account_tasks() as f64);
530    }
531}
532
533#[derive(Metrics, Clone)]
534#[metrics(scope = "tree.root")]
535pub(crate) struct MultiProofTaskMetrics {
536    /// Histogram of active storage workers processing proofs.
537    pub active_storage_workers_histogram: Histogram,
538    /// Histogram of active account workers processing proofs.
539    pub active_account_workers_histogram: Histogram,
540    /// Gauge for the maximum number of storage workers in the pool.
541    pub max_storage_workers: Gauge,
542    /// Gauge for the maximum number of account workers in the pool.
543    pub max_account_workers: Gauge,
544    /// Histogram of pending storage multiproofs in the queue.
545    pub pending_storage_multiproofs_histogram: Histogram,
546    /// Histogram of pending account multiproofs in the queue.
547    pub pending_account_multiproofs_histogram: Histogram,
548
549    /// Histogram of the number of prefetch proof target accounts.
550    pub prefetch_proof_targets_accounts_histogram: Histogram,
551    /// Histogram of the number of prefetch proof target storages.
552    pub prefetch_proof_targets_storages_histogram: Histogram,
553    /// Histogram of the number of prefetch proof target chunks.
554    pub prefetch_proof_chunks_histogram: Histogram,
555
556    /// Histogram of the number of state update proof target accounts.
557    pub state_update_proof_targets_accounts_histogram: Histogram,
558    /// Histogram of the number of state update proof target storages.
559    pub state_update_proof_targets_storages_histogram: Histogram,
560    /// Histogram of the number of state update proof target chunks.
561    pub state_update_proof_chunks_histogram: Histogram,
562
563    /// Histogram of prefetch proof batch sizes (number of messages merged).
564    pub prefetch_batch_size_histogram: Histogram,
565
566    /// Histogram of proof calculation durations.
567    pub proof_calculation_duration_histogram: Histogram,
568
569    /// Histogram of sparse trie update durations.
570    pub sparse_trie_update_duration_histogram: Histogram,
571    /// Histogram of sparse trie final update durations.
572    pub sparse_trie_final_update_duration_histogram: Histogram,
573    /// Histogram of sparse trie total durations.
574    pub sparse_trie_total_duration_histogram: Histogram,
575
576    /// Histogram of state updates received.
577    pub state_updates_received_histogram: Histogram,
578    /// Histogram of proofs processed.
579    pub proofs_processed_histogram: Histogram,
580    /// Histogram of total time spent in the multiproof task.
581    pub multiproof_task_total_duration_histogram: Histogram,
582    /// Total time spent waiting for the first state update or prefetch request.
583    pub first_update_wait_time_histogram: Histogram,
584    /// Total time spent waiting for the last proof result.
585    pub last_proof_wait_time_histogram: Histogram,
586    /// Time spent preparing the sparse trie for reuse after state root computation.
587    pub into_trie_for_reuse_duration_histogram: Histogram,
588    /// Time spent waiting for preserved sparse trie cache to become available.
589    pub sparse_trie_cache_wait_duration_histogram: Histogram,
590}
591
592/// Standalone task that receives a transaction state stream and updates relevant
593/// data structures to calculate state root.
594///
595/// ## Architecture: Dual-Channel Multiproof System
596///
597/// This task orchestrates parallel proof computation using a dual-channel architecture that
598/// separates control messages from proof computation results:
599///
600/// ```text
601/// ┌─────────────────────────────────────────────────────────────────┐
602/// │                        MultiProofTask                            │
603/// │                  Event Loop (crossbeam::select!)                 │
604/// └──┬──────────────────────────────────────────────────────────▲───┘
605///    │                                                           │
606///    │ (1) Send proof request                                   │
607///    │     via tx (control channel)                             │
608///    │                                                           │
609///    ▼                                                           │
610/// ┌──────────────────────────────────────────────────────────────┐ │
611/// │             MultiproofManager                                │ │
612/// │  - Deduplicates against fetched_proof_targets                │ │
613/// │  - Routes to appropriate worker pool                         │ │
614/// └──┬───────────────────────────────────────────────────────────┘ │
615///    │                                                             │
616///    │ (2) Dispatch to workers                                    │
617///    │     OR send EmptyProof (fast path)                         │
618///    ▼                                                             │
619/// ┌──────────────────────────────────────────────────────────────┐ │
620/// │              ProofWorkerHandle                                │ │
621/// │  ┌─────────────────────┐   ┌────────────────────────┐        │ │
622/// │  │ Storage Worker Pool │   │ Account Worker Pool     │        │ │
623/// │  │ (spawn_blocking)    │   │ (spawn_blocking)        │        │ │
624/// │  └─────────────────────┘   └────────────────────────┘        │ │
625/// └──┬───────────────────────────────────────────────────────────┘ │
626///    │                                                             │
627///    │ (3) Compute proofs in parallel                             │
628///    │     Send results back                                      │
629///    │                                                             │
630///    ▼                                                             │
631/// ┌──────────────────────────────────────────────────────────────┐ │
632/// │  proof_result_tx (crossbeam unbounded channel)                │ │
633/// │    → ProofResultMessage { multiproof, sequence_number, ... }  │ │
634/// └──────────────────────────────────────────────────────────────┘ │
635///                                                                   │
636///   (4) Receive via crossbeam::select! on two channels: ───────────┘
637///       - rx: Control messages (PrefetchProofs, StateUpdate,
638///             EmptyProof, FinishedStateUpdates)
639///       - proof_result_rx: Computed proof results from workers
640/// ```
641///
642/// ## Component Responsibilities
643///
644/// - **[`MultiProofTask`]**: Event loop coordinator
645///   - Receives state updates from transaction execution
646///   - Deduplicates proof targets against already-fetched proofs
647///   - Sequences proofs to maintain transaction ordering
648///   - Feeds sequenced updates to sparse trie task
649///
650/// - **[`MultiproofManager`]**: Calculation orchestrator
651///   - Decides between fast path ([`EmptyProof`]) and worker dispatch
652///   - Routes storage-only vs full multiproofs to appropriate workers
653///   - Records metrics for monitoring
654///
655/// - **[`ProofWorkerHandle`]**: Worker pool manager
656///   - Maintains separate pools for storage and account proofs
657///   - Dispatches work to blocking threads (CPU-intensive)
658///   - Sends results directly via `proof_result_tx` (bypasses control channel)
659///
660/// [`EmptyProof`]: MultiProofMessage::EmptyProof
661/// [`ProofWorkerHandle`]: reth_trie_parallel::proof_task::ProofWorkerHandle
662///
663/// ## Dual-Channel Design Rationale
664///
665/// The system uses two separate crossbeam channels:
666///
667/// 1. **Control Channel (`tx`/`rx`)**: For orchestration messages
668///    - `PrefetchProofs`: Pre-fetch proofs before execution
669///    - `StateUpdate`: New transaction execution results
670///    - `EmptyProof`: Fast path when all targets already fetched
671///    - `FinishedStateUpdates`: Signal to drain pending work
672///
673/// 2. **Proof Result Channel (`proof_result_tx`/`proof_result_rx`)**: For worker results
674///    - `ProofResultMessage`: Computed multiproofs from worker pools
675///    - Direct path from workers to event loop (no intermediate hops)
676///    - Keeps control messages separate from high-throughput proof data
677///
678/// This separation enables:
679/// - **Non-blocking control**: Control messages never wait behind large proof data
680/// - **Backpressure management**: Each channel can apply different policies
681/// - **Clear ownership**: Workers only need proof result sender, not control channel
682///
683/// ## Initialization and Lifecycle
684///
685/// The task initializes a blinded sparse trie and subscribes to transaction state streams.
686/// As it receives transaction execution results, it fetches proofs for relevant accounts
687/// from the database and reveals them to the tree, then updates relevant leaves according
688/// to transaction results. This feeds updates to the sparse trie task.
689///
690/// See the `run()` method documentation for detailed lifecycle flow.
691#[derive(Debug)]
692pub(super) struct MultiProofTask {
693    /// The size of proof targets chunk to spawn in one calculation.
694    /// If None, chunking is disabled and all targets are processed in a single proof.
695    chunk_size: Option<usize>,
696    /// Receiver for state root related messages (prefetch, state updates, finish signal).
697    rx: CrossbeamReceiver<MultiProofMessage>,
698    /// Sender for state root related messages.
699    tx: CrossbeamSender<MultiProofMessage>,
700    /// Receiver for proof results directly from workers.
701    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
702    /// Sender for state updates emitted by this type.
703    to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
704    /// Proof targets that have been already fetched.
705    fetched_proof_targets: MultiProofTargets,
706    /// Tracks keys which have been added and removed throughout the entire block.
707    multi_added_removed_keys: MultiAddedRemovedKeys,
708    /// Proof sequencing handler.
709    proof_sequencer: ProofSequencer,
710    /// Manages calculation of multiproofs.
711    multiproof_manager: MultiproofManager,
712    /// multi proof task metrics
713    metrics: MultiProofTaskMetrics,
714    /// If this number is exceeded and chunking is enabled, then this will override whether or not
715    /// there are any active workers and force chunking across workers. This is to prevent tasks
716    /// which are very long from hitting a single worker.
717    max_targets_for_chunking: usize,
718    /// Whether or not V2 proof calculation is enabled. If enabled then [`MultiProofTargetsV2`]
719    /// will be produced by state updates.
720    v2_proofs_enabled: bool,
721}
722
723impl MultiProofTask {
724    /// Creates a multiproof task with separate channels: control on `tx`/`rx`, proof results on
725    /// `proof_result_rx`.
726    pub(super) fn new(
727        proof_worker_handle: ProofWorkerHandle,
728        to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
729        chunk_size: Option<usize>,
730        tx: CrossbeamSender<MultiProofMessage>,
731        rx: CrossbeamReceiver<MultiProofMessage>,
732    ) -> Self {
733        let (proof_result_tx, proof_result_rx) = unbounded();
734        let metrics = MultiProofTaskMetrics::default();
735
736        Self {
737            chunk_size,
738            rx,
739            tx,
740            proof_result_rx,
741            to_sparse_trie,
742            fetched_proof_targets: Default::default(),
743            multi_added_removed_keys: MultiAddedRemovedKeys::new(),
744            proof_sequencer: ProofSequencer::default(),
745            multiproof_manager: MultiproofManager::new(
746                metrics.clone(),
747                proof_worker_handle,
748                proof_result_tx,
749            ),
750            metrics,
751            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
752            v2_proofs_enabled: false,
753        }
754    }
755
756    /// Enables V2 proof target generation on state updates.
757    pub(super) const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self {
758        self.v2_proofs_enabled = v2_proofs_enabled;
759        self
760    }
761
762    /// Handles request for proof prefetch.
763    ///
764    /// Returns how many multiproof tasks were dispatched for the prefetch request.
765    #[instrument(
766        level = "debug",
767        target = "engine::tree::payload_processor::multiproof",
768        skip_all,
769        fields(accounts = targets.account_targets_len(), chunks = 0)
770    )]
771    fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 {
772        // Remove already fetched proof targets to avoid redundant work.
773        targets.retain_difference(&self.fetched_proof_targets);
774
775        if targets.is_empty() {
776            return 0;
777        }
778
779        extend_multiproof_targets(&mut self.fetched_proof_targets, &targets);
780
781        // For Legacy multiproofs, make sure all target accounts have an `AddedRemovedKeySet` in the
782        // [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
783        // we still want to optimistically fetch extension children for the leaf addition case.
784        // V2 multiproofs don't need this.
785        //
786        // Only clone the AddedRemovedKeys for accounts in the targets, not the entire accumulated
787        // set, to avoid O(n) cloning with many buffered blocks.
788        let multi_added_removed_keys =
789            if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets {
790                self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied());
791                Some(Arc::new(MultiAddedRemovedKeys {
792                    account: self.multi_added_removed_keys.account.clone(),
793                    storages: legacy_targets
794                        .keys()
795                        .filter_map(|k| {
796                            self.multi_added_removed_keys.storages.get(k).map(|v| (*k, v.clone()))
797                        })
798                        .collect(),
799                }))
800            } else {
801                None
802            };
803
804        self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
805        self.metrics
806            .prefetch_proof_targets_storages_histogram
807            .record(targets.storage_count() as f64);
808
809        let chunking_len = targets.chunking_length();
810        let available_account_workers =
811            self.multiproof_manager.proof_worker_handle.available_account_workers();
812        let available_storage_workers =
813            self.multiproof_manager.proof_worker_handle.available_storage_workers();
814        let num_chunks = dispatch_with_chunking(
815            targets,
816            chunking_len,
817            self.chunk_size,
818            self.max_targets_for_chunking,
819            available_account_workers,
820            available_storage_workers,
821            VersionedMultiProofTargets::chunks,
822            |proof_targets| {
823                self.multiproof_manager.dispatch(MultiproofInput {
824                    source: None,
825                    hashed_state_update: Default::default(),
826                    proof_targets,
827                    proof_sequence_number: self.proof_sequencer.next_sequence(),
828                    state_root_message_sender: self.tx.clone(),
829                    multi_added_removed_keys: multi_added_removed_keys.clone(),
830                });
831            },
832        );
833        self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64);
834
835        num_chunks as u64
836    }
837
838    /// Returns true if all state updates finished and all pending proofs processed.
839    fn is_done(&self, metrics: &MultiproofBatchMetrics, ctx: &MultiproofBatchCtx) -> bool {
840        let all_proofs_processed = metrics.all_proofs_processed();
841        let no_pending = !self.proof_sequencer.has_pending();
842        let updates_finished = ctx.updates_finished();
843        trace!(
844            target: "engine::tree::payload_processor::multiproof",
845            proofs_processed = metrics.proofs_processed,
846            state_update_proofs_requested = metrics.state_update_proofs_requested,
847            prefetch_proofs_requested = metrics.prefetch_proofs_requested,
848            no_pending,
849            updates_finished,
850            "Checking end condition"
851        );
852        all_proofs_processed && no_pending && updates_finished
853    }
854
855    /// Handles state updates.
856    ///
857    /// Returns how many proof dispatches were spawned (including an `EmptyProof` for already
858    /// fetched targets).
859    #[instrument(
860        level = "debug",
861        target = "engine::tree::payload_processor::multiproof",
862        skip(self, update),
863        fields(accounts = update.len(), chunks = 0)
864    )]
865    fn on_state_update(&mut self, source: Source, update: EvmState) -> u64 {
866        let hashed_state_update = evm_state_to_hashed_post_state(update);
867        self.on_hashed_state_update(source, hashed_state_update)
868    }
869
870    /// Processes a hashed state update and dispatches multiproofs as needed.
871    ///
872    /// Returns the number of state updates dispatched (both `EmptyProof` and regular multiproofs).
873    fn on_hashed_state_update(
874        &mut self,
875        source: Source,
876        hashed_state_update: HashedPostState,
877    ) -> u64 {
878        // Update removed keys based on the state update.
879        self.multi_added_removed_keys.update_with_state(&hashed_state_update);
880
881        // Split the state update into already fetched and not fetched according to the proof
882        // targets.
883        let (fetched_state_update, not_fetched_state_update) = hashed_state_update
884            .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
885
886        let mut state_updates = 0;
887        // If there are any accounts or storage slots that we already fetched the proofs for,
888        // send them immediately, as they don't require dispatching any additional multiproofs.
889        if !fetched_state_update.is_empty() {
890            let _ = self.tx.send(MultiProofMessage::EmptyProof {
891                sequence_number: self.proof_sequencer.next_sequence(),
892                state: fetched_state_update,
893            });
894            state_updates += 1;
895        }
896
897        if not_fetched_state_update.is_empty() {
898            return state_updates;
899        }
900
901        // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
902        let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
903            account: self.multi_added_removed_keys.account.clone(),
904            storages: {
905                let mut storages = B256Map::with_capacity_and_hasher(
906                    not_fetched_state_update.storages.len(),
907                    Default::default(),
908                );
909
910                for account in not_fetched_state_update
911                    .storages
912                    .keys()
913                    .chain(not_fetched_state_update.accounts.keys())
914                {
915                    if let hash_map::Entry::Vacant(entry) = storages.entry(*account) {
916                        entry.insert(
917                            self.multi_added_removed_keys
918                                .storages
919                                .get(account)
920                                .cloned()
921                                .unwrap_or_default(),
922                        );
923                    }
924                }
925
926                storages
927            },
928        });
929
930        let chunking_len = not_fetched_state_update.chunking_length();
931        let mut spawned_proof_targets = MultiProofTargets::default();
932        let available_account_workers =
933            self.multiproof_manager.proof_worker_handle.available_account_workers();
934        let available_storage_workers =
935            self.multiproof_manager.proof_worker_handle.available_storage_workers();
936
937        let num_chunks = dispatch_with_chunking(
938            not_fetched_state_update,
939            chunking_len,
940            self.chunk_size,
941            self.max_targets_for_chunking,
942            available_account_workers,
943            available_storage_workers,
944            HashedPostState::chunks,
945            |hashed_state_update| {
946                let proof_targets = get_proof_targets(
947                    &hashed_state_update,
948                    &self.fetched_proof_targets,
949                    &multi_added_removed_keys,
950                    self.v2_proofs_enabled,
951                );
952                extend_multiproof_targets(&mut spawned_proof_targets, &proof_targets);
953
954                self.multiproof_manager.dispatch(MultiproofInput {
955                    source: Some(source),
956                    hashed_state_update,
957                    proof_targets,
958                    proof_sequence_number: self.proof_sequencer.next_sequence(),
959                    state_root_message_sender: self.tx.clone(),
960                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
961                });
962            },
963        );
964        self.metrics
965            .state_update_proof_targets_accounts_histogram
966            .record(spawned_proof_targets.len() as f64);
967        self.metrics
968            .state_update_proof_targets_storages_histogram
969            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
970        self.metrics.state_update_proof_chunks_histogram.record(num_chunks as f64);
971
972        self.fetched_proof_targets.extend(spawned_proof_targets);
973
974        state_updates + num_chunks as u64
975    }
976
977    /// Handler for new proof calculated, aggregates all the existing sequential proofs.
978    fn on_proof(
979        &mut self,
980        sequence_number: u64,
981        update: SparseTrieUpdate,
982    ) -> Option<SparseTrieUpdate> {
983        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
984
985        ready_proofs
986            .into_iter()
987            // Merge all ready proofs and state updates
988            .reduce(|mut acc_update, update| {
989                acc_update.extend(update);
990                acc_update
991            })
992            // Return None if the resulting proof is empty
993            .filter(|proof| !proof.is_empty())
994    }
995
996    /// Processes a multiproof message, batching consecutive prefetch messages.
997    ///
998    /// For prefetch messages, drains queued prefetch messages and merges them into one batch before
999    /// processing, storing one pending message (different type or over-cap) to handle on the next
1000    /// iteration. State updates are processed directly without batching.
1001    ///
1002    /// Returns `true` if done, `false` to continue.
1003    fn process_multiproof_message<P>(
1004        &mut self,
1005        msg: MultiProofMessage,
1006        ctx: &mut MultiproofBatchCtx,
1007        batch_metrics: &mut MultiproofBatchMetrics,
1008        provider: &P,
1009    ) -> bool
1010    where
1011        P: AccountReader,
1012    {
1013        match msg {
1014            // Prefetch proofs: batch consecutive prefetch requests up to target/message limits
1015            MultiProofMessage::PrefetchProofs(targets) => {
1016                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::PrefetchProofs");
1017
1018                if ctx.first_update_time.is_none() {
1019                    self.metrics
1020                        .first_update_wait_time_histogram
1021                        .record(ctx.start.elapsed().as_secs_f64());
1022                    ctx.first_update_time = Some(Instant::now());
1023                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1024                }
1025
1026                let mut accumulated_count = targets.chunking_length();
1027                ctx.accumulated_prefetch_targets.clear();
1028                ctx.accumulated_prefetch_targets.push(targets);
1029
1030                // Batch consecutive prefetch messages up to limits.
1031                // EmptyProof messages are handled inline since they're very fast (~100ns)
1032                // and shouldn't interrupt batching.
1033                while accumulated_count < PREFETCH_MAX_BATCH_TARGETS &&
1034                    ctx.accumulated_prefetch_targets.len() < PREFETCH_MAX_BATCH_MESSAGES
1035                {
1036                    match self.rx.try_recv() {
1037                        Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
1038                            let next_count = next_targets.chunking_length();
1039                            if accumulated_count + next_count > PREFETCH_MAX_BATCH_TARGETS {
1040                                ctx.pending_msg =
1041                                    Some(MultiProofMessage::PrefetchProofs(next_targets));
1042                                break;
1043                            }
1044                            accumulated_count += next_count;
1045                            ctx.accumulated_prefetch_targets.push(next_targets);
1046                        }
1047                        Ok(MultiProofMessage::EmptyProof { sequence_number, state }) => {
1048                            // Handle inline - very fast, don't break batching
1049                            batch_metrics.proofs_processed += 1;
1050                            if let Some(combined_update) = self.on_proof(
1051                                sequence_number,
1052                                SparseTrieUpdate {
1053                                    state,
1054                                    multiproof: ProofResult::empty(self.v2_proofs_enabled),
1055                                },
1056                            ) {
1057                                let _ = self.to_sparse_trie.send(combined_update);
1058                            }
1059                        }
1060                        Ok(other_msg) => {
1061                            ctx.pending_msg = Some(other_msg);
1062                            break;
1063                        }
1064                        Err(_) => break,
1065                    }
1066                }
1067
1068                // Process all accumulated messages in a single batch
1069                let num_batched = ctx.accumulated_prefetch_targets.len();
1070                self.metrics.prefetch_batch_size_histogram.record(num_batched as f64);
1071
1072                // Merge all accumulated prefetch targets into a single dispatch payload.
1073                // Use drain to preserve the buffer allocation.
1074                let mut accumulated_iter = ctx.accumulated_prefetch_targets.drain(..);
1075                let mut merged_targets =
1076                    accumulated_iter.next().expect("prefetch batch always has at least one entry");
1077                for next_targets in accumulated_iter {
1078                    merged_targets.extend(next_targets);
1079                }
1080
1081                let account_targets = merged_targets.len();
1082                let storage_targets = merged_targets.storage_count();
1083                batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets);
1084                trace!(
1085                    target: "engine::tree::payload_processor::multiproof",
1086                    account_targets,
1087                    storage_targets,
1088                    prefetch_proofs_requested = batch_metrics.prefetch_proofs_requested,
1089                    num_batched,
1090                    "Dispatched prefetch batch"
1091                );
1092
1093                false
1094            }
1095            MultiProofMessage::StateUpdate(source, update) => {
1096                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
1097
1098                if ctx.first_update_time.is_none() {
1099                    self.metrics
1100                        .first_update_wait_time_histogram
1101                        .record(ctx.start.elapsed().as_secs_f64());
1102                    ctx.first_update_time = Some(Instant::now());
1103                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1104                }
1105
1106                let update_len = update.len();
1107                batch_metrics.state_update_proofs_requested += self.on_state_update(source, update);
1108                trace!(
1109                    target: "engine::tree::payload_processor::multiproof",
1110                    ?source,
1111                    len = update_len,
1112                    state_update_proofs_requested = ?batch_metrics.state_update_proofs_requested,
1113                    "Dispatched state update"
1114                );
1115
1116                false
1117            }
1118            // Process Block Access List (BAL) - complete state changes provided upfront
1119            MultiProofMessage::BlockAccessList(bal) => {
1120                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::BAL");
1121
1122                if ctx.first_update_time.is_none() {
1123                    self.metrics
1124                        .first_update_wait_time_histogram
1125                        .record(ctx.start.elapsed().as_secs_f64());
1126                    ctx.first_update_time = Some(Instant::now());
1127                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation from BAL");
1128                }
1129
1130                // Convert BAL to HashedPostState and process it
1131                match bal_to_hashed_post_state(&bal, provider) {
1132                    Ok(hashed_state) => {
1133                        debug!(
1134                            target: "engine::tree::payload_processor::multiproof",
1135                            accounts = hashed_state.accounts.len(),
1136                            storages = hashed_state.storages.len(),
1137                            "Processing BAL state update"
1138                        );
1139
1140                        // Use BlockAccessList as source for BAL-derived state updates
1141                        batch_metrics.state_update_proofs_requested +=
1142                            self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
1143                    }
1144                    Err(err) => {
1145                        error!(target: "engine::tree::payload_processor::multiproof", ?err, "Failed to convert BAL to hashed state");
1146                        return true;
1147                    }
1148                }
1149
1150                // Mark updates as finished since BAL provides complete state
1151                ctx.updates_finished_time = Some(Instant::now());
1152
1153                // Check if we're done (might need to wait for proofs to complete)
1154                if self.is_done(batch_metrics, ctx) {
1155                    debug!(
1156                        target: "engine::tree::payload_processor::multiproof",
1157                        "BAL processed and all proofs complete, ending calculation"
1158                    );
1159                    return true;
1160                }
1161                false
1162            }
1163            // Signal that no more state updates will arrive
1164            MultiProofMessage::FinishedStateUpdates => {
1165                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::FinishedStateUpdates");
1166
1167                ctx.updates_finished_time = Some(Instant::now());
1168
1169                if self.is_done(batch_metrics, ctx) {
1170                    debug!(
1171                        target: "engine::tree::payload_processor::multiproof",
1172                        "State updates finished and all proofs processed, ending calculation"
1173                    );
1174                    return true;
1175                }
1176                false
1177            }
1178            // Handle proof result with no trie nodes (state unchanged)
1179            MultiProofMessage::EmptyProof { sequence_number, state } => {
1180                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::EmptyProof");
1181
1182                batch_metrics.proofs_processed += 1;
1183
1184                if let Some(combined_update) = self.on_proof(
1185                    sequence_number,
1186                    SparseTrieUpdate {
1187                        state,
1188                        multiproof: ProofResult::empty(self.v2_proofs_enabled),
1189                    },
1190                ) {
1191                    let _ = self.to_sparse_trie.send(combined_update);
1192                }
1193
1194                if self.is_done(batch_metrics, ctx) {
1195                    debug!(
1196                        target: "engine::tree::payload_processor::multiproof",
1197                        "State updates finished and all proofs processed, ending calculation"
1198                    );
1199                    return true;
1200                }
1201                false
1202            }
1203            MultiProofMessage::HashedStateUpdate(hashed_state) => {
1204                batch_metrics.state_update_proofs_requested +=
1205                    self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
1206                false
1207            }
1208        }
1209    }
1210
1211    /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the
1212    /// sparse trie, updates the sparse trie, and eventually returns the state root.
1213    ///
1214    /// The lifecycle is the following:
1215    /// 1. Either [`MultiProofMessage::PrefetchProofs`] or [`MultiProofMessage::StateUpdate`] is
1216    ///    received from the engine.
1217    ///    * For [`MultiProofMessage::StateUpdate`], the state update is hashed with
1218    ///      [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are
1219    ///      extracted with [`get_proof_targets`].
1220    ///    * For both messages, proof targets are deduplicated according to `fetched_proof_targets`,
1221    ///      so that the proofs for accounts and storage slots that were already fetched are not
1222    ///      requested again.
1223    /// 2. Using the proof targets, a new multiproof is calculated using
1224    ///    [`MultiproofManager::dispatch`].
1225    ///    * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
1226    ///      sent back to this task along with the original state update.
1227    ///    * Otherwise, the multiproof is dispatched to worker pools and results are sent directly
1228    ///      to this task via the `proof_result_rx` channel as [`ProofResultMessage`].
1229    /// 3. Either [`MultiProofMessage::EmptyProof`] (via control channel) or [`ProofResultMessage`]
1230    ///    (via proof result channel) is received.
1231    ///    * The multiproof is added to the [`ProofSequencer`].
1232    ///    * If the proof sequencer has a contiguous sequence of multiproofs in the same order as
1233    ///      state updates arrived (i.e. transaction order), such sequence is returned.
1234    /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state
1235    ///    updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse
1236    ///    trie task.
1237    /// 5. Steps above are repeated until this task receives a
1238    ///    [`MultiProofMessage::FinishedStateUpdates`].
1239    ///    * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
1240    ///      [`ProofResultMessage`], we check if all proofs have been processed and if there are any
1241    ///      pending proofs in the proof sequencer left to be revealed.
1242    /// 6. While running, consecutive [`MultiProofMessage::PrefetchProofs`] messages are batched to
1243    ///    reduce redundant work; if a different message type arrives mid-batch or a batch cap is
1244    ///    reached, it is held as `pending_msg` and processed on the next loop to preserve ordering.
1245    /// 7. This task exits after all pending proofs are processed.
1246    #[instrument(
1247        level = "debug",
1248        name = "MultiProofTask::run",
1249        target = "engine::tree::payload_processor::multiproof",
1250        skip_all
1251    )]
1252    pub(crate) fn run<P>(mut self, provider: P)
1253    where
1254        P: AccountReader,
1255    {
1256        let mut ctx = MultiproofBatchCtx::new(Instant::now());
1257        let mut batch_metrics = MultiproofBatchMetrics::default();
1258
1259        // Main event loop; select_biased! prioritizes proof results over control messages.
1260        // Labeled so inner match arms can `break 'main` once all work is complete.
1261        'main: loop {
1262            trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop");
1263
1264            if let Some(msg) = ctx.pending_msg.take() {
1265                if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1266                    break 'main;
1267                }
1268                continue;
1269            }
1270
1271            // Use select_biased! to prioritize proof results over new requests.
1272            // This prevents new work from starving completed proofs and keeps workers healthy.
1273            crossbeam_channel::select_biased! {
1274                recv(self.proof_result_rx) -> proof_msg => {
1275                    match proof_msg {
1276                        Ok(proof_result) => {
1277                            batch_metrics.proofs_processed += 1;
1278
1279                            self.metrics
1280                                .proof_calculation_duration_histogram
1281                                .record(proof_result.elapsed);
1282
1283                            self.multiproof_manager.on_calculation_complete();
1284
1285                            // Convert ProofResultMessage to SparseTrieUpdate
1286                            match proof_result.result {
1287                                Ok(proof_result_data) => {
1288                                    trace!(
1289                                        target: "engine::tree::payload_processor::multiproof",
1290                                        sequence = proof_result.sequence_number,
1291                                        total_proofs = batch_metrics.proofs_processed,
1292                                        "Processing calculated proof from worker"
1293                                    );
1294
1295                                    let update = SparseTrieUpdate {
1296                                        state: proof_result.state,
1297                                        multiproof: proof_result_data,
1298                                    };
1299
1300                                    if let Some(combined_update) =
1301                                        self.on_proof(proof_result.sequence_number, update)
1302                                    {
1303                                        let _ = self.to_sparse_trie.send(combined_update);
1304                                    }
1305                                }
1306                                Err(error) => {
1307                                    error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
1308                                    return
1309                                }
1310                            }
1311
1312                            if self.is_done(&batch_metrics, &ctx) {
1313                                debug!(
1314                                    target: "engine::tree::payload_processor::multiproof",
1315                                    "State updates finished and all proofs processed, ending calculation"
1316                                );
1317                                break 'main
1318                            }
1319                        }
1320                        Err(_) => {
1321                            error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
1322                            return
1323                        }
1324                    }
1325                },
1326                recv(self.rx) -> message => {
1327                    let msg = match message {
1328                        Ok(m) => m,
1329                        Err(_) => {
1330                            error!(target: "engine::tree::payload_processor::multiproof", "State root related message channel closed unexpectedly");
1331                            return
1332                        }
1333                    };
1334
1335                    if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1336                        break 'main;
1337                    }
1338                }
1339            }
1340        }
1341
1342        debug!(
1343            target: "engine::tree::payload_processor::multiproof",
1344            total_updates = batch_metrics.state_update_proofs_requested,
1345            total_proofs = batch_metrics.proofs_processed,
1346            total_time = ?ctx.first_update_time.map(|t|t.elapsed()),
1347            time_since_updates_finished = ?ctx.updates_finished_time.map(|t|t.elapsed()),
1348            "All proofs processed, ending calculation"
1349        );
1350
1351        // update total metrics on finish
1352        self.metrics
1353            .state_updates_received_histogram
1354            .record(batch_metrics.state_update_proofs_requested as f64);
1355        self.metrics.proofs_processed_histogram.record(batch_metrics.proofs_processed as f64);
1356        if let Some(total_time) = ctx.first_update_time.map(|t| t.elapsed()) {
1357            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1358        }
1359
1360        if let Some(updates_finished_time) = ctx.updates_finished_time {
1361            self.metrics
1362                .last_proof_wait_time_histogram
1363                .record(updates_finished_time.elapsed().as_secs_f64());
1364        }
1365    }
1366}
1367
1368/// Context for multiproof message batching loop.
1369///
1370/// Contains processing state that persists across loop iterations.
1371///
1372/// Used by `process_multiproof_message` to batch consecutive prefetch messages received via
1373/// `try_recv` for efficient processing.
1374struct MultiproofBatchCtx {
1375    /// Buffers a non-matching message type encountered during batching.
1376    /// Processed first in next iteration to preserve ordering while allowing same-type
1377    /// messages to batch.
1378    pending_msg: Option<MultiProofMessage>,
1379    /// Timestamp when the first state update or prefetch was received.
1380    first_update_time: Option<Instant>,
1381    /// Timestamp before the first state update or prefetch was received.
1382    start: Instant,
1383    /// Timestamp when state updates finished. `Some` indicates all state updates have been
1384    /// received.
1385    updates_finished_time: Option<Instant>,
1386    /// Reusable buffer for accumulating prefetch targets during batching.
1387    accumulated_prefetch_targets: Vec<VersionedMultiProofTargets>,
1388}
1389
1390impl MultiproofBatchCtx {
1391    /// Creates a new batch context with the given start time.
1392    fn new(start: Instant) -> Self {
1393        Self {
1394            pending_msg: None,
1395            first_update_time: None,
1396            start,
1397            updates_finished_time: None,
1398            accumulated_prefetch_targets: Vec::with_capacity(PREFETCH_MAX_BATCH_MESSAGES),
1399        }
1400    }
1401
1402    /// Returns `true` if all state updates have been received.
1403    const fn updates_finished(&self) -> bool {
1404        self.updates_finished_time.is_some()
1405    }
1406}
1407
1408/// Counters for tracking proof requests and processing.
1409#[derive(Default)]
1410struct MultiproofBatchMetrics {
1411    /// Number of proofs that have been processed.
1412    proofs_processed: u64,
1413    /// Number of state update proofs requested.
1414    state_update_proofs_requested: u64,
1415    /// Number of prefetch proofs requested.
1416    prefetch_proofs_requested: u64,
1417}
1418
1419impl MultiproofBatchMetrics {
1420    /// Returns `true` if all requested proofs have been processed.
1421    const fn all_proofs_processed(&self) -> bool {
1422        self.proofs_processed >= self.state_update_proofs_requested + self.prefetch_proofs_requested
1423    }
1424}
1425
1426/// Returns accounts only with those storages that were not already fetched, and
1427/// if there are no such storages and the account itself was already fetched, the
1428/// account shouldn't be included.
1429fn get_proof_targets(
1430    state_update: &HashedPostState,
1431    fetched_proof_targets: &MultiProofTargets,
1432    multi_added_removed_keys: &MultiAddedRemovedKeys,
1433    v2_enabled: bool,
1434) -> VersionedMultiProofTargets {
1435    if v2_enabled {
1436        let mut targets = MultiProofTargetsV2::default();
1437
1438        // first collect all new accounts (not previously fetched)
1439        for &hashed_address in state_update.accounts.keys() {
1440            if !fetched_proof_targets.contains_key(&hashed_address) {
1441                targets.account_targets.push(hashed_address.into());
1442            }
1443        }
1444
1445        // then process storage slots for all accounts in the state update
1446        for (hashed_address, storage) in &state_update.storages {
1447            let fetched = fetched_proof_targets.get(hashed_address);
1448
1449            // If the storage is wiped, we still need to fetch the account proof.
1450            if storage.wiped && fetched.is_none() {
1451                targets.account_targets.push(Into::<proof_v2::Target>::into(*hashed_address));
1452                continue
1453            }
1454
1455            let changed_slots = storage
1456                .storage
1457                .keys()
1458                .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
1459                .map(|slot| Into::<proof_v2::Target>::into(*slot))
1460                .collect::<Vec<_>>();
1461
1462            if !changed_slots.is_empty() {
1463                targets.account_targets.push((*hashed_address).into());
1464                targets.storage_targets.insert(*hashed_address, changed_slots);
1465            }
1466        }
1467
1468        VersionedMultiProofTargets::V2(targets)
1469    } else {
1470        let mut targets = MultiProofTargets::default();
1471
1472        // first collect all new accounts (not previously fetched)
1473        for hashed_address in state_update.accounts.keys() {
1474            if !fetched_proof_targets.contains_key(hashed_address) {
1475                targets.insert(*hashed_address, HashSet::default());
1476            }
1477        }
1478
1479        // then process storage slots for all accounts in the state update
1480        for (hashed_address, storage) in &state_update.storages {
1481            let fetched = fetched_proof_targets.get(hashed_address);
1482            let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1483            let mut changed_slots = storage
1484                .storage
1485                .keys()
1486                .filter(|slot| {
1487                    !fetched.is_some_and(|f| f.contains(*slot)) ||
1488                        storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1489                })
1490                .peekable();
1491
1492            // If the storage is wiped, we still need to fetch the account proof.
1493            if storage.wiped && fetched.is_none() {
1494                targets.entry(*hashed_address).or_default();
1495            }
1496
1497            if changed_slots.peek().is_some() {
1498                targets.entry(*hashed_address).or_default().extend(changed_slots);
1499            }
1500        }
1501
1502        VersionedMultiProofTargets::Legacy(targets)
1503    }
1504}
1505
1506/// Dispatches work items as a single unit or in chunks based on target size and worker
1507/// availability.
1508#[allow(clippy::too_many_arguments)]
1509pub(crate) fn dispatch_with_chunking<T, I>(
1510    items: T,
1511    chunking_len: usize,
1512    chunk_size: Option<usize>,
1513    max_targets_for_chunking: usize,
1514    available_account_workers: usize,
1515    available_storage_workers: usize,
1516    chunker: impl FnOnce(T, usize) -> I,
1517    mut dispatch: impl FnMut(T),
1518) -> usize
1519where
1520    I: IntoIterator<Item = T>,
1521{
1522    let should_chunk = chunking_len > max_targets_for_chunking ||
1523        available_account_workers > 1 ||
1524        available_storage_workers > 1;
1525
1526    if should_chunk &&
1527        let Some(chunk_size) = chunk_size &&
1528        chunking_len > chunk_size
1529    {
1530        let mut num_chunks = 0usize;
1531        for chunk in chunker(items, chunk_size) {
1532            dispatch(chunk);
1533            num_chunks += 1;
1534        }
1535        return num_chunks;
1536    }
1537
1538    dispatch(items);
1539    1
1540}
1541
1542#[cfg(test)]
1543mod tests {
1544    use crate::tree::cached_state::CachedStateProvider;
1545
1546    use super::*;
1547    use alloy_eip7928::{AccountChanges, BalanceChange};
1548    use alloy_primitives::Address;
1549    use reth_provider::{
1550        providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
1551        BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
1552        PruneCheckpointReader, StageCheckpointReader, StateProviderBox, StorageChangeSetReader,
1553        StorageSettingsCache,
1554    };
1555    use reth_trie::MultiProof;
1556    use reth_trie_db::ChangesetCache;
1557    use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
1558    use revm_primitives::{B256, U256};
1559    use std::sync::{Arc, OnceLock};
1560
1561    /// Get a test runtime, creating it if necessary
1562    fn get_test_runtime() -> &'static reth_tasks::Runtime {
1563        static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
1564        TEST_RT.get_or_init(reth_tasks::Runtime::test)
1565    }
1566
1567    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
1568    where
1569        F: DatabaseProviderFactory<
1570                Provider: BlockReader
1571                              + StageCheckpointReader
1572                              + PruneCheckpointReader
1573                              + ChangeSetReader
1574                              + StorageChangeSetReader
1575                              + StorageSettingsCache
1576                              + BlockNumReader,
1577            > + Clone
1578            + Send
1579            + 'static,
1580    {
1581        let runtime = get_test_runtime();
1582        let changeset_cache = ChangesetCache::new();
1583        let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
1584        let task_ctx = ProofTaskCtx::new(overlay_factory);
1585        let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
1586        let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
1587        let (tx, rx) = crossbeam_channel::unbounded();
1588
1589        MultiProofTask::new(proof_handle, to_sparse_trie, Some(1), tx, rx)
1590    }
1591
1592    fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
1593    where
1594        F: DatabaseProviderFactory<
1595                Provider: BlockReader
1596                              + StageCheckpointReader
1597                              + PruneCheckpointReader
1598                              + reth_provider::StorageSettingsCache,
1599            > + Clone
1600            + Send
1601            + 'static,
1602    {
1603        let db_provider = factory.database_provider_ro().unwrap();
1604        let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
1605        let cache = crate::tree::cached_state::ExecutionCache::new(1000);
1606        CachedStateProvider::new(state_provider, cache, Default::default())
1607    }
1608
1609    #[test]
1610    fn test_add_proof_in_sequence() {
1611        let mut sequencer = ProofSequencer::default();
1612        let proof1 = MultiProof::default();
1613        let proof2 = MultiProof::default();
1614        sequencer.next_sequence = 2;
1615
1616        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1617        assert_eq!(ready.len(), 1);
1618        assert!(!sequencer.has_pending());
1619
1620        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1621        assert_eq!(ready.len(), 1);
1622        assert!(!sequencer.has_pending());
1623    }
1624
1625    #[test]
1626    fn test_add_proof_out_of_order() {
1627        let mut sequencer = ProofSequencer::default();
1628        let proof1 = MultiProof::default();
1629        let proof2 = MultiProof::default();
1630        let proof3 = MultiProof::default();
1631        sequencer.next_sequence = 3;
1632
1633        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1634        assert_eq!(ready.len(), 0);
1635        assert!(sequencer.has_pending());
1636
1637        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1638        assert_eq!(ready.len(), 1);
1639        assert!(sequencer.has_pending());
1640
1641        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1642        assert_eq!(ready.len(), 2);
1643        assert!(!sequencer.has_pending());
1644    }
1645
1646    #[test]
1647    fn test_add_proof_with_gaps() {
1648        let mut sequencer = ProofSequencer::default();
1649        let proof1 = MultiProof::default();
1650        let proof3 = MultiProof::default();
1651        sequencer.next_sequence = 3;
1652
1653        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1654        assert_eq!(ready.len(), 1);
1655
1656        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1657        assert_eq!(ready.len(), 0);
1658        assert!(sequencer.has_pending());
1659    }
1660
1661    #[test]
1662    fn test_add_proof_duplicate_sequence() {
1663        let mut sequencer = ProofSequencer::default();
1664        let proof1 = MultiProof::default();
1665        let proof2 = MultiProof::default();
1666
1667        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1668        assert_eq!(ready.len(), 1);
1669
1670        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1671        assert_eq!(ready.len(), 0);
1672        assert!(!sequencer.has_pending());
1673    }
1674
1675    #[test]
1676    fn test_add_proof_batch_processing() {
1677        let mut sequencer = ProofSequencer::default();
1678        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1679        sequencer.next_sequence = 5;
1680
1681        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1682        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1683        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1684        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1685
1686        let ready =
1687            sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1688        assert_eq!(ready.len(), 5);
1689        assert!(!sequencer.has_pending());
1690    }
1691
1692    fn create_get_proof_targets_state() -> HashedPostState {
1693        let mut state = HashedPostState::default();
1694
1695        let addr1 = B256::random();
1696        let addr2 = B256::random();
1697        state.accounts.insert(addr1, Some(Default::default()));
1698        state.accounts.insert(addr2, Some(Default::default()));
1699
1700        let mut storage = HashedStorage::default();
1701        let slot1 = B256::random();
1702        let slot2 = B256::random();
1703        storage.storage.insert(slot1, U256::ZERO);
1704        storage.storage.insert(slot2, U256::from(1));
1705        state.storages.insert(addr1, storage);
1706
1707        state
1708    }
1709
1710    fn unwrap_legacy_targets(targets: VersionedMultiProofTargets) -> MultiProofTargets {
1711        match targets {
1712            VersionedMultiProofTargets::Legacy(targets) => targets,
1713            VersionedMultiProofTargets::V2(_) => panic!("Expected Legacy targets"),
1714        }
1715    }
1716
1717    #[test]
1718    fn test_get_proof_targets_new_account_targets() {
1719        let state = create_get_proof_targets_state();
1720        let fetched = MultiProofTargets::default();
1721
1722        let targets = unwrap_legacy_targets(get_proof_targets(
1723            &state,
1724            &fetched,
1725            &MultiAddedRemovedKeys::new(),
1726            false,
1727        ));
1728
1729        // should return all accounts as targets since nothing was fetched before
1730        assert_eq!(targets.len(), state.accounts.len());
1731        for addr in state.accounts.keys() {
1732            assert!(targets.contains_key(addr));
1733        }
1734    }
1735
1736    #[test]
1737    fn test_get_proof_targets_new_storage_targets() {
1738        let state = create_get_proof_targets_state();
1739        let fetched = MultiProofTargets::default();
1740
1741        let targets = unwrap_legacy_targets(get_proof_targets(
1742            &state,
1743            &fetched,
1744            &MultiAddedRemovedKeys::new(),
1745            false,
1746        ));
1747
1748        // verify storage slots are included for accounts with storage
1749        for (addr, storage) in &state.storages {
1750            assert!(targets.contains_key(addr));
1751            let target_slots = &targets[addr];
1752            assert_eq!(target_slots.len(), storage.storage.len());
1753            for slot in storage.storage.keys() {
1754                assert!(target_slots.contains(slot));
1755            }
1756        }
1757    }
1758
1759    #[test]
1760    fn test_get_proof_targets_filter_already_fetched_accounts() {
1761        let state = create_get_proof_targets_state();
1762        let mut fetched = MultiProofTargets::default();
1763
1764        // select an account that has no storage updates
1765        let fetched_addr = state
1766            .accounts
1767            .keys()
1768            .find(|&&addr| !state.storages.contains_key(&addr))
1769            .expect("Should have an account without storage");
1770
1771        // mark the account as already fetched
1772        fetched.insert(*fetched_addr, HashSet::default());
1773
1774        let targets = unwrap_legacy_targets(get_proof_targets(
1775            &state,
1776            &fetched,
1777            &MultiAddedRemovedKeys::new(),
1778            false,
1779        ));
1780
1781        // should not include the already fetched account since it has no storage updates
1782        assert!(!targets.contains_key(fetched_addr));
1783        // other accounts should still be included
1784        assert_eq!(targets.len(), state.accounts.len() - 1);
1785    }
1786
1787    #[test]
1788    fn test_get_proof_targets_filter_already_fetched_storage() {
1789        let state = create_get_proof_targets_state();
1790        let mut fetched = MultiProofTargets::default();
1791
1792        // mark one storage slot as already fetched
1793        let (addr, storage) = state.storages.iter().next().unwrap();
1794        let mut fetched_slots = HashSet::default();
1795        let fetched_slot = *storage.storage.keys().next().unwrap();
1796        fetched_slots.insert(fetched_slot);
1797        fetched.insert(*addr, fetched_slots);
1798
1799        let targets = unwrap_legacy_targets(get_proof_targets(
1800            &state,
1801            &fetched,
1802            &MultiAddedRemovedKeys::new(),
1803            false,
1804        ));
1805
1806        // should not include the already fetched storage slot
1807        let target_slots = &targets[addr];
1808        assert!(!target_slots.contains(&fetched_slot));
1809        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1810    }
1811
1812    #[test]
1813    fn test_get_proof_targets_empty_state() {
1814        let state = HashedPostState::default();
1815        let fetched = MultiProofTargets::default();
1816
1817        let targets = unwrap_legacy_targets(get_proof_targets(
1818            &state,
1819            &fetched,
1820            &MultiAddedRemovedKeys::new(),
1821            false,
1822        ));
1823
1824        assert!(targets.is_empty());
1825    }
1826
1827    #[test]
1828    fn test_get_proof_targets_mixed_fetched_state() {
1829        let mut state = HashedPostState::default();
1830        let mut fetched = MultiProofTargets::default();
1831
1832        let addr1 = B256::random();
1833        let addr2 = B256::random();
1834        let slot1 = B256::random();
1835        let slot2 = B256::random();
1836
1837        state.accounts.insert(addr1, Some(Default::default()));
1838        state.accounts.insert(addr2, Some(Default::default()));
1839
1840        let mut storage = HashedStorage::default();
1841        storage.storage.insert(slot1, U256::ZERO);
1842        storage.storage.insert(slot2, U256::from(1));
1843        state.storages.insert(addr1, storage);
1844
1845        let mut fetched_slots = HashSet::default();
1846        fetched_slots.insert(slot1);
1847        fetched.insert(addr1, fetched_slots);
1848
1849        let targets = unwrap_legacy_targets(get_proof_targets(
1850            &state,
1851            &fetched,
1852            &MultiAddedRemovedKeys::new(),
1853            false,
1854        ));
1855
1856        assert!(targets.contains_key(&addr2));
1857        assert!(!targets[&addr1].contains(&slot1));
1858        assert!(targets[&addr1].contains(&slot2));
1859    }
1860
1861    #[test]
1862    fn test_get_proof_targets_unmodified_account_with_storage() {
1863        let mut state = HashedPostState::default();
1864        let fetched = MultiProofTargets::default();
1865
1866        let addr = B256::random();
1867        let slot1 = B256::random();
1868        let slot2 = B256::random();
1869
1870        // don't add the account to state.accounts (simulating unmodified account)
1871        // but add storage updates for this account
1872        let mut storage = HashedStorage::default();
1873        storage.storage.insert(slot1, U256::from(1));
1874        storage.storage.insert(slot2, U256::from(2));
1875        state.storages.insert(addr, storage);
1876
1877        assert!(!state.accounts.contains_key(&addr));
1878        assert!(!fetched.contains_key(&addr));
1879
1880        let targets = unwrap_legacy_targets(get_proof_targets(
1881            &state,
1882            &fetched,
1883            &MultiAddedRemovedKeys::new(),
1884            false,
1885        ));
1886
1887        // verify that we still get the storage slots for the unmodified account
1888        assert!(targets.contains_key(&addr));
1889
1890        let target_slots = &targets[&addr];
1891        assert_eq!(target_slots.len(), 2);
1892        assert!(target_slots.contains(&slot1));
1893        assert!(target_slots.contains(&slot2));
1894    }
1895
1896    #[test]
1897    fn test_get_proof_targets_with_removed_storage_keys() {
1898        let mut state = HashedPostState::default();
1899        let mut fetched = MultiProofTargets::default();
1900        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1901
1902        let addr = B256::random();
1903        let slot1 = B256::random();
1904        let slot2 = B256::random();
1905
1906        // add account to state
1907        state.accounts.insert(addr, Some(Default::default()));
1908
1909        // add storage updates
1910        let mut storage = HashedStorage::default();
1911        storage.storage.insert(slot1, U256::from(100));
1912        storage.storage.insert(slot2, U256::from(200));
1913        state.storages.insert(addr, storage);
1914
1915        // mark slot1 as already fetched
1916        let mut fetched_slots = HashSet::default();
1917        fetched_slots.insert(slot1);
1918        fetched.insert(addr, fetched_slots);
1919
1920        // update multi_added_removed_keys to mark slot1 as removed
1921        let mut removed_state = HashedPostState::default();
1922        let mut removed_storage = HashedStorage::default();
1923        removed_storage.storage.insert(slot1, U256::ZERO); // U256::ZERO marks as removed
1924        removed_state.storages.insert(addr, removed_storage);
1925        multi_added_removed_keys.update_with_state(&removed_state);
1926
1927        let targets = unwrap_legacy_targets(get_proof_targets(
1928            &state,
1929            &fetched,
1930            &multi_added_removed_keys,
1931            false,
1932        ));
1933
1934        // slot1 should be included despite being fetched, because it's marked as removed
1935        assert!(targets.contains_key(&addr));
1936        let target_slots = &targets[&addr];
1937        assert_eq!(target_slots.len(), 2);
1938        assert!(target_slots.contains(&slot1)); // included because it's removed
1939        assert!(target_slots.contains(&slot2)); // included because it's not fetched
1940    }
1941
1942    #[test]
1943    fn test_get_proof_targets_with_wiped_storage() {
1944        let mut state = HashedPostState::default();
1945        let fetched = MultiProofTargets::default();
1946        let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1947
1948        let addr = B256::random();
1949        let slot1 = B256::random();
1950
1951        // add account to state
1952        state.accounts.insert(addr, Some(Default::default()));
1953
1954        // add wiped storage
1955        let mut storage = HashedStorage::new(true);
1956        storage.storage.insert(slot1, U256::from(100));
1957        state.storages.insert(addr, storage);
1958
1959        let targets = unwrap_legacy_targets(get_proof_targets(
1960            &state,
1961            &fetched,
1962            &multi_added_removed_keys,
1963            false,
1964        ));
1965
1966        // account should be included because storage is wiped and account wasn't fetched
1967        assert!(targets.contains_key(&addr));
1968        let target_slots = &targets[&addr];
1969        assert_eq!(target_slots.len(), 1);
1970        assert!(target_slots.contains(&slot1));
1971    }
1972
1973    #[test]
1974    fn test_get_proof_targets_removed_keys_not_in_state_update() {
1975        let mut state = HashedPostState::default();
1976        let mut fetched = MultiProofTargets::default();
1977        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1978
1979        let addr = B256::random();
1980        let slot1 = B256::random();
1981        let slot2 = B256::random();
1982        let slot3 = B256::random();
1983
1984        // add account to state
1985        state.accounts.insert(addr, Some(Default::default()));
1986
1987        // add storage updates for slot1 and slot2 only
1988        let mut storage = HashedStorage::default();
1989        storage.storage.insert(slot1, U256::from(100));
1990        storage.storage.insert(slot2, U256::from(200));
1991        state.storages.insert(addr, storage);
1992
1993        // mark all slots as already fetched
1994        let mut fetched_slots = HashSet::default();
1995        fetched_slots.insert(slot1);
1996        fetched_slots.insert(slot2);
1997        fetched_slots.insert(slot3); // slot3 is fetched but not in state update
1998        fetched.insert(addr, fetched_slots);
1999
2000        // mark slot3 as removed (even though it's not in the state update)
2001        let mut removed_state = HashedPostState::default();
2002        let mut removed_storage = HashedStorage::default();
2003        removed_storage.storage.insert(slot3, U256::ZERO);
2004        removed_state.storages.insert(addr, removed_storage);
2005        multi_added_removed_keys.update_with_state(&removed_state);
2006
2007        let targets = unwrap_legacy_targets(get_proof_targets(
2008            &state,
2009            &fetched,
2010            &multi_added_removed_keys,
2011            false,
2012        ));
2013
2014        // only slots in the state update can be included, so slot3 should not appear
2015        assert!(!targets.contains_key(&addr));
2016    }
2017
2018    /// Verifies that consecutive prefetch proof messages are batched together.
2019    #[test]
2020    fn test_prefetch_proofs_batching() {
2021        let test_provider_factory = create_test_provider_factory();
2022        let mut task = create_test_state_root_task(test_provider_factory);
2023
2024        // send multiple messages
2025        let addr1 = B256::random();
2026        let addr2 = B256::random();
2027        let addr3 = B256::random();
2028
2029        let mut targets1 = MultiProofTargets::default();
2030        targets1.insert(addr1, HashSet::default());
2031
2032        let mut targets2 = MultiProofTargets::default();
2033        targets2.insert(addr2, HashSet::default());
2034
2035        let mut targets3 = MultiProofTargets::default();
2036        targets3.insert(addr3, HashSet::default());
2037
2038        let tx = task.tx.clone();
2039        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
2040            .unwrap();
2041        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
2042            .unwrap();
2043        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets3)))
2044            .unwrap();
2045
2046        let proofs_requested =
2047            if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2048                // simulate the batching logic
2049                let mut merged_targets = targets;
2050                let mut num_batched = 1;
2051                while let Ok(MultiProofMessage::PrefetchProofs(next_targets)) = task.rx.try_recv() {
2052                    merged_targets.extend(next_targets);
2053                    num_batched += 1;
2054                }
2055
2056                assert_eq!(num_batched, 3);
2057                assert_eq!(merged_targets.len(), 3);
2058                let legacy_targets = unwrap_legacy_targets(merged_targets);
2059                assert!(legacy_targets.contains_key(&addr1));
2060                assert!(legacy_targets.contains_key(&addr2));
2061                assert!(legacy_targets.contains_key(&addr3));
2062
2063                task.on_prefetch_proof(VersionedMultiProofTargets::Legacy(legacy_targets))
2064            } else {
2065                panic!("Expected PrefetchProofs message");
2066            };
2067
2068        assert_eq!(proofs_requested, 1);
2069    }
2070
2071    /// Verifies that different message types arriving mid-batch are not lost and preserve order.
2072    #[test]
2073    fn test_batching_preserves_ordering_with_different_message_type() {
2074        use alloy_evm::block::StateChangeSource;
2075        use revm_state::Account;
2076
2077        let test_provider_factory = create_test_provider_factory();
2078        let task = create_test_state_root_task(test_provider_factory);
2079
2080        let addr1 = B256::random();
2081        let addr2 = B256::random();
2082        let addr3 = B256::random();
2083        let state_addr1 = alloy_primitives::Address::random();
2084        let state_addr2 = alloy_primitives::Address::random();
2085
2086        // Create PrefetchProofs targets
2087        let mut targets1 = MultiProofTargets::default();
2088        targets1.insert(addr1, HashSet::default());
2089
2090        let mut targets2 = MultiProofTargets::default();
2091        targets2.insert(addr2, HashSet::default());
2092
2093        let mut targets3 = MultiProofTargets::default();
2094        targets3.insert(addr3, HashSet::default());
2095
2096        // Create StateUpdate 1
2097        let mut state_update1 = EvmState::default();
2098        state_update1.insert(
2099            state_addr1,
2100            Account {
2101                info: revm_state::AccountInfo {
2102                    balance: U256::from(100),
2103                    nonce: 1,
2104                    code_hash: Default::default(),
2105                    code: Default::default(),
2106                    account_id: None,
2107                },
2108                original_info: Box::new(revm_state::AccountInfo::default()),
2109                transaction_id: Default::default(),
2110                storage: Default::default(),
2111                status: revm_state::AccountStatus::Touched,
2112            },
2113        );
2114
2115        // Create StateUpdate 2
2116        let mut state_update2 = EvmState::default();
2117        state_update2.insert(
2118            state_addr2,
2119            Account {
2120                info: revm_state::AccountInfo {
2121                    balance: U256::from(200),
2122                    nonce: 2,
2123                    code_hash: Default::default(),
2124                    code: Default::default(),
2125                    account_id: None,
2126                },
2127                original_info: Box::new(revm_state::AccountInfo::default()),
2128                transaction_id: Default::default(),
2129                storage: Default::default(),
2130                status: revm_state::AccountStatus::Touched,
2131            },
2132        );
2133
2134        let source = StateChangeSource::Transaction(42);
2135
2136        // Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3]
2137        let tx = task.tx.clone();
2138        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
2139            .unwrap();
2140        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
2141            .unwrap();
2142        tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap();
2143        tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap();
2144        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
2145            targets3.clone(),
2146        )))
2147        .unwrap();
2148
2149        // Step 1: Receive and batch PrefetchProofs (should get targets1 + targets2)
2150        let mut pending_msg: Option<MultiProofMessage> = None;
2151        if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2152            let mut merged_targets = targets;
2153            let mut num_batched = 1;
2154
2155            loop {
2156                match task.rx.try_recv() {
2157                    Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
2158                        merged_targets.extend(next_targets);
2159                        num_batched += 1;
2160                    }
2161                    Ok(other_msg) => {
2162                        // Store locally to preserve ordering (the fix)
2163                        pending_msg = Some(other_msg);
2164                        break;
2165                    }
2166                    Err(_) => break,
2167                }
2168            }
2169
2170            // Should have batched exactly 2 PrefetchProofs (not 3!)
2171            assert_eq!(num_batched, 2, "Should batch only until different message type");
2172            assert_eq!(merged_targets.len(), 2);
2173            let legacy_targets = unwrap_legacy_targets(merged_targets);
2174            assert!(legacy_targets.contains_key(&addr1));
2175            assert!(legacy_targets.contains_key(&addr2));
2176            assert!(!legacy_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
2177        } else {
2178            panic!("Expected PrefetchProofs message");
2179        }
2180
2181        // Step 2: The pending message should be StateUpdate1 (preserved ordering)
2182        match pending_msg {
2183            Some(MultiProofMessage::StateUpdate(_src, update)) => {
2184                assert!(update.contains_key(&state_addr1), "Should be first StateUpdate");
2185            }
2186            _ => panic!("StateUpdate1 was lost or reordered! The ordering fix is broken."),
2187        }
2188
2189        // Step 3: Next in channel should be StateUpdate2
2190        match task.rx.try_recv() {
2191            Ok(MultiProofMessage::StateUpdate(_src, update)) => {
2192                assert!(update.contains_key(&state_addr2), "Should be second StateUpdate");
2193            }
2194            _ => panic!("StateUpdate2 was lost!"),
2195        }
2196
2197        // Step 4: Next in channel should be PrefetchProofs3
2198        match task.rx.try_recv() {
2199            Ok(MultiProofMessage::PrefetchProofs(targets)) => {
2200                assert_eq!(targets.len(), 1);
2201                let legacy_targets = unwrap_legacy_targets(targets);
2202                assert!(legacy_targets.contains_key(&addr3));
2203            }
2204            _ => panic!("PrefetchProofs3 was lost!"),
2205        }
2206    }
2207
2208    /// Verifies that a pending message is processed before the next loop iteration (ordering).
2209    #[test]
2210    fn test_pending_message_processed_before_next_iteration() {
2211        use alloy_evm::block::StateChangeSource;
2212        use revm_state::Account;
2213
2214        let test_provider_factory = create_test_provider_factory();
2215        let test_provider = create_cached_provider(test_provider_factory.clone());
2216        let mut task = create_test_state_root_task(test_provider_factory);
2217
2218        // Queue: Prefetch1, StateUpdate, Prefetch2
2219        let prefetch_addr1 = B256::random();
2220        let prefetch_addr2 = B256::random();
2221        let mut prefetch1 = MultiProofTargets::default();
2222        prefetch1.insert(prefetch_addr1, HashSet::default());
2223        let mut prefetch2 = MultiProofTargets::default();
2224        prefetch2.insert(prefetch_addr2, HashSet::default());
2225
2226        let state_addr = alloy_primitives::Address::random();
2227        let mut state_update = EvmState::default();
2228        state_update.insert(
2229            state_addr,
2230            Account {
2231                info: revm_state::AccountInfo {
2232                    balance: U256::from(42),
2233                    nonce: 1,
2234                    code_hash: Default::default(),
2235                    code: Default::default(),
2236                    account_id: None,
2237                },
2238                original_info: Box::new(revm_state::AccountInfo::default()),
2239                transaction_id: Default::default(),
2240                storage: Default::default(),
2241                status: revm_state::AccountStatus::Touched,
2242            },
2243        );
2244
2245        let source = StateChangeSource::Transaction(99);
2246
2247        let tx = task.tx.clone();
2248        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(prefetch1)))
2249            .unwrap();
2250        tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap();
2251        tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
2252            prefetch2.clone(),
2253        )))
2254        .unwrap();
2255
2256        let mut ctx = MultiproofBatchCtx::new(Instant::now());
2257        let mut batch_metrics = MultiproofBatchMetrics::default();
2258
2259        // First message: Prefetch1 batches; StateUpdate becomes pending.
2260        let first = task.rx.recv().unwrap();
2261        assert!(matches!(first, MultiProofMessage::PrefetchProofs(_)));
2262        assert!(!task.process_multiproof_message(
2263            first,
2264            &mut ctx,
2265            &mut batch_metrics,
2266            &test_provider
2267        ));
2268        let pending = ctx.pending_msg.take().expect("pending message captured");
2269        assert!(matches!(pending, MultiProofMessage::StateUpdate(_, _)));
2270
2271        // Pending message should be handled before the next select loop.
2272        // StateUpdate is processed directly without batching.
2273        assert!(!task.process_multiproof_message(
2274            pending,
2275            &mut ctx,
2276            &mut batch_metrics,
2277            &test_provider
2278        ));
2279
2280        // Since StateUpdate doesn't batch, Prefetch2 remains in the channel (not in pending_msg).
2281        assert!(ctx.pending_msg.is_none());
2282
2283        // Prefetch2 should still be in the channel.
2284        match task.rx.try_recv() {
2285            Ok(MultiProofMessage::PrefetchProofs(targets)) => {
2286                assert_eq!(targets.len(), 1);
2287                let legacy_targets = unwrap_legacy_targets(targets);
2288                assert!(legacy_targets.contains_key(&prefetch_addr2));
2289            }
2290            other => panic!("Expected PrefetchProofs2 in channel, got {:?}", other),
2291        }
2292    }
2293
2294    /// Verifies that BAL messages are processed correctly and generate state updates.
2295    #[test]
2296    fn test_bal_message_processing() {
2297        let test_provider_factory = create_test_provider_factory();
2298        let test_provider = create_cached_provider(test_provider_factory.clone());
2299        let mut task = create_test_state_root_task(test_provider_factory);
2300
2301        // Create a simple BAL with one account change
2302        let account_address = Address::random();
2303        let account_changes = AccountChanges {
2304            address: account_address,
2305            balance_changes: vec![BalanceChange::new(0, U256::from(1000))],
2306            nonce_changes: vec![],
2307            code_changes: vec![],
2308            storage_changes: vec![],
2309            storage_reads: vec![],
2310        };
2311
2312        let bal = Arc::new(vec![account_changes]);
2313
2314        let mut ctx = MultiproofBatchCtx::new(Instant::now());
2315        let mut batch_metrics = MultiproofBatchMetrics::default();
2316
2317        let should_finish = task.process_multiproof_message(
2318            MultiProofMessage::BlockAccessList(bal),
2319            &mut ctx,
2320            &mut batch_metrics,
2321            &test_provider,
2322        );
2323
2324        // BAL should mark updates as finished
2325        assert!(ctx.updates_finished_time.is_some());
2326
2327        // Should have dispatched state update proofs
2328        assert!(batch_metrics.state_update_proofs_requested > 0);
2329
2330        // Should need to wait for the results of those proofs to arrive
2331        assert!(!should_finish, "Should continue waiting for proofs");
2332    }
2333}