reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use crate::tree::{
4    cached_state::CachedStateProvider, payload_processor::bal::bal_to_hashed_post_state,
5};
6use alloy_eip7928::BlockAccessList;
7use alloy_evm::block::StateChangeSource;
8use alloy_primitives::{keccak256, map::HashSet, B256};
9use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
10use dashmap::DashMap;
11use derive_more::derive::Deref;
12use metrics::{Gauge, Histogram};
13use rayon::prelude::*;
14use reth_metrics::Metrics;
15use reth_provider::AccountReader;
16use reth_revm::state::EvmState;
17use reth_trie::{
18    added_removed_keys::MultiAddedRemovedKeys, DecodedMultiProof, HashedPostState, HashedStorage,
19    MultiProofTargets,
20};
21use reth_trie_parallel::{
22    proof::ParallelProof,
23    proof_task::{
24        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
25    },
26};
27use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
28use tracing::{debug, error, instrument, trace};
29
30/// Source of state changes, either from EVM execution or from a Block Access List.
31#[derive(Clone, Copy)]
32pub enum Source {
33    /// State changes from EVM execution.
34    Evm(StateChangeSource),
35    /// State changes from Block Access List (EIP-7928).
36    BlockAccessList,
37}
38
39impl std::fmt::Debug for Source {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            Self::Evm(source) => source.fmt(f),
43            Self::BlockAccessList => f.write_str("BlockAccessList"),
44        }
45    }
46}
47
48impl From<StateChangeSource> for Source {
49    fn from(source: StateChangeSource) -> Self {
50        Self::Evm(source)
51    }
52}
53
54/// Maximum number of targets to batch together for prefetch batching.
55/// Prefetches are just proof requests (no state merging), so we allow a higher cap than state
56/// updates
57const PREFETCH_MAX_BATCH_TARGETS: usize = 512;
58
59/// Maximum number of prefetch messages to batch together.
60/// Prevents excessive batching even with small messages.
61const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
62
63/// Maximum number of targets to batch together for state updates.
64/// Lower than prefetch because state updates require additional processing (hashing, state
65/// partitioning) before dispatch.
66const STATE_UPDATE_MAX_BATCH_TARGETS: usize = 64;
67
68/// Preallocation hint for state update batching to avoid repeated reallocations on small bursts.
69const STATE_UPDATE_BATCH_PREALLOC: usize = 16;
70
71/// The default max targets, for limiting the number of account and storage proof targets to be
72/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
73const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
74
75/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
76/// state.
77#[derive(Default, Debug)]
78pub struct SparseTrieUpdate {
79    /// The state update that was used to calculate the proof
80    pub(crate) state: HashedPostState,
81    /// The calculated multiproof
82    pub(crate) multiproof: DecodedMultiProof,
83}
84
85impl SparseTrieUpdate {
86    /// Returns true if the update is empty.
87    pub(super) fn is_empty(&self) -> bool {
88        self.state.is_empty() && self.multiproof.is_empty()
89    }
90
91    /// Construct update from multiproof.
92    #[cfg(test)]
93    pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
94        Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
95    }
96
97    /// Extend update with contents of the other.
98    pub(super) fn extend(&mut self, other: Self) {
99        self.state.extend(other.state);
100        self.multiproof.extend(other.multiproof);
101    }
102}
103
104/// Messages used internally by the multi proof task.
105#[derive(Debug)]
106pub(super) enum MultiProofMessage {
107    /// Prefetch proof targets
108    PrefetchProofs(MultiProofTargets),
109    /// New state update from transaction execution with its source
110    StateUpdate(Source, EvmState),
111    /// State update that can be applied to the sparse trie without any new proofs.
112    ///
113    /// It can be the case when all accounts and storage slots from the state update were already
114    /// fetched and revealed.
115    EmptyProof {
116        /// The index of this proof in the sequence of state updates
117        sequence_number: u64,
118        /// The state update that was used to calculate the proof
119        state: HashedPostState,
120    },
121    /// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
122    ///
123    /// When received, the task generates a single state update from the BAL and processes it.
124    /// No further messages are expected after receiving this variant.
125    BlockAccessList(Arc<BlockAccessList>),
126    /// Signals state update stream end.
127    ///
128    /// This is triggered by block execution, indicating that no additional state updates are
129    /// expected.
130    FinishedStateUpdates,
131}
132
133/// Handle to track proof calculation ordering.
134#[derive(Debug, Default)]
135struct ProofSequencer {
136    /// The next proof sequence number to be produced.
137    next_sequence: u64,
138    /// The next sequence number expected to be delivered.
139    next_to_deliver: u64,
140    /// Buffer for out-of-order proofs and corresponding state updates
141    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
142}
143
144impl ProofSequencer {
145    /// Gets the next sequence number and increments the counter
146    const fn next_sequence(&mut self) -> u64 {
147        let seq = self.next_sequence;
148        self.next_sequence += 1;
149        seq
150    }
151
152    /// Adds a proof with the corresponding state update and returns all sequential proofs and state
153    /// updates if we have a continuous sequence
154    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
155        if sequence >= self.next_to_deliver {
156            self.pending_proofs.insert(sequence, update);
157        }
158
159        // return early if we don't have the next expected proof
160        if !self.pending_proofs.contains_key(&self.next_to_deliver) {
161            return Vec::new()
162        }
163
164        let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
165        let mut current_sequence = self.next_to_deliver;
166
167        // keep collecting proofs and state updates as long as we have consecutive sequence numbers
168        while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
169            consecutive_proofs.push(pending);
170            current_sequence += 1;
171        }
172
173        self.next_to_deliver += consecutive_proofs.len() as u64;
174
175        consecutive_proofs
176    }
177
178    /// Returns true if we still have pending proofs
179    pub(crate) fn has_pending(&self) -> bool {
180        !self.pending_proofs.is_empty()
181    }
182}
183
184/// A wrapper for the sender that signals completion when dropped.
185///
186/// This type is intended to be used in combination with the evm executor statehook.
187/// This should trigger once the block has been executed (after) the last state update has been
188/// sent. This triggers the exit condition of the multi proof task.
189#[derive(Deref, Debug)]
190pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
191
192impl StateHookSender {
193    pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
194        Self(inner)
195    }
196}
197
198impl Drop for StateHookSender {
199    fn drop(&mut self) {
200        // Send completion signal when the sender is dropped
201        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
202    }
203}
204
205pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
206    update.into_par_iter()
207        .filter_map(|(address, account)| {
208            if !account.is_touched() {
209                return None;
210            }
211
212            let hashed_address = keccak256(address);
213            trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
214
215            let destroyed = account.is_selfdestructed();
216            let info = if destroyed { None } else { Some(account.info.into()) };
217
218            let hashed_storage = if destroyed {
219                Some(HashedStorage::new(true))
220            } else {
221                let storage: Vec<_> = account
222                    .storage
223                    .into_iter()
224                    .filter(|(_slot, value)| value.is_changed())
225                    .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
226                    .collect();
227
228                if storage.is_empty() {
229                    None
230                } else {
231                    Some(HashedStorage::from_iter(false, storage))
232                }
233            };
234
235            Some((hashed_address, info, hashed_storage))
236        })
237        .collect()
238}
239
240/// Input parameters for dispatching a multiproof calculation.
241#[derive(Debug)]
242struct MultiproofInput {
243    source: Option<Source>,
244    hashed_state_update: HashedPostState,
245    proof_targets: MultiProofTargets,
246    proof_sequence_number: u64,
247    state_root_message_sender: CrossbeamSender<MultiProofMessage>,
248    multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
249}
250
251impl MultiproofInput {
252    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
253    fn send_empty_proof(self) {
254        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
255            sequence_number: self.proof_sequence_number,
256            state: self.hashed_state_update,
257        });
258    }
259}
260
261/// Coordinates multiproof dispatch between `MultiProofTask` and the parallel trie workers.
262///
263/// # Flow
264/// 1. `MultiProofTask` asks the manager to dispatch either storage or account proof work.
265/// 2. The manager builds the request, clones `proof_result_tx`, and hands everything to
266///    [`ProofWorkerHandle`].
267/// 3. A worker finishes the proof and sends a [`ProofResultMessage`] through the channel included
268///    in the job.
269/// 4. `MultiProofTask` consumes the message from the same channel and sequences it with
270///    `ProofSequencer`.
271#[derive(Debug)]
272pub struct MultiproofManager {
273    /// Handle to the proof worker pools (storage and account).
274    proof_worker_handle: ProofWorkerHandle,
275    /// Cached storage proof roots for missed leaves; this maps
276    /// hashed (missed) addresses to their storage proof roots.
277    ///
278    /// It is important to cache these. Otherwise, a common account
279    /// (popular ERC-20, etc.) having missed leaves in its path would
280    /// repeatedly calculate these proofs per interacting transaction
281    /// (same account different slots).
282    ///
283    /// This also works well with chunking multiproofs, which may break
284    /// a big account change into different chunks, which may repeatedly
285    /// revisit missed leaves.
286    missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
287    /// Channel sender cloned into each dispatched job so workers can send back the
288    /// `ProofResultMessage`.
289    proof_result_tx: CrossbeamSender<ProofResultMessage>,
290    /// Metrics
291    metrics: MultiProofTaskMetrics,
292}
293
294impl MultiproofManager {
295    /// Creates a new [`MultiproofManager`].
296    fn new(
297        metrics: MultiProofTaskMetrics,
298        proof_worker_handle: ProofWorkerHandle,
299        proof_result_tx: CrossbeamSender<ProofResultMessage>,
300    ) -> Self {
301        // Initialize the max worker gauges with the worker pool sizes
302        metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
303        metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
304
305        Self {
306            metrics,
307            proof_worker_handle,
308            missed_leaves_storage_roots: Default::default(),
309            proof_result_tx,
310        }
311    }
312
313    /// Dispatches a new multiproof calculation to worker pools.
314    fn dispatch(&self, input: MultiproofInput) {
315        // If there are no proof targets, we can just send an empty multiproof back immediately
316        if input.proof_targets.is_empty() {
317            trace!(
318                sequence_number = input.proof_sequence_number,
319                "No proof targets, sending empty multiproof back immediately"
320            );
321            input.send_empty_proof();
322            return;
323        }
324
325        self.dispatch_multiproof(input);
326    }
327
328    /// Signals that a multiproof calculation has finished.
329    fn on_calculation_complete(&self) {
330        self.metrics
331            .active_storage_workers_histogram
332            .record(self.proof_worker_handle.active_storage_workers() as f64);
333        self.metrics
334            .active_account_workers_histogram
335            .record(self.proof_worker_handle.active_account_workers() as f64);
336        self.metrics
337            .pending_storage_multiproofs_histogram
338            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
339        self.metrics
340            .pending_account_multiproofs_histogram
341            .record(self.proof_worker_handle.pending_account_tasks() as f64);
342    }
343
344    /// Dispatches a single multiproof calculation to worker pool.
345    fn dispatch_multiproof(&self, multiproof_input: MultiproofInput) {
346        let MultiproofInput {
347            source,
348            hashed_state_update,
349            proof_targets,
350            proof_sequence_number,
351            state_root_message_sender: _,
352            multi_added_removed_keys,
353        } = multiproof_input;
354
355        let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
356        let account_targets = proof_targets.len();
357        let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
358
359        trace!(
360            target: "engine::tree::payload_processor::multiproof",
361            proof_sequence_number,
362            ?proof_targets,
363            account_targets,
364            storage_targets,
365            ?source,
366            "Dispatching multiproof to workers"
367        );
368
369        let start = Instant::now();
370
371        // Extend prefix sets with targets
372        let frozen_prefix_sets =
373            ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets);
374
375        // Dispatch account multiproof to worker pool with result sender
376        let input = AccountMultiproofInput {
377            targets: proof_targets,
378            prefix_sets: frozen_prefix_sets,
379            collect_branch_node_masks: true,
380            multi_added_removed_keys,
381            missed_leaves_storage_roots,
382            // Workers will send ProofResultMessage directly to proof_result_rx
383            proof_result_sender: ProofResultContext::new(
384                self.proof_result_tx.clone(),
385                proof_sequence_number,
386                hashed_state_update,
387                start,
388            ),
389        };
390
391        if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
392            error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
393            return;
394        }
395
396        self.metrics
397            .active_storage_workers_histogram
398            .record(self.proof_worker_handle.active_storage_workers() as f64);
399        self.metrics
400            .active_account_workers_histogram
401            .record(self.proof_worker_handle.active_account_workers() as f64);
402        self.metrics
403            .pending_storage_multiproofs_histogram
404            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
405        self.metrics
406            .pending_account_multiproofs_histogram
407            .record(self.proof_worker_handle.pending_account_tasks() as f64);
408    }
409}
410
411#[derive(Metrics, Clone)]
412#[metrics(scope = "tree.root")]
413pub(crate) struct MultiProofTaskMetrics {
414    /// Histogram of active storage workers processing proofs.
415    pub active_storage_workers_histogram: Histogram,
416    /// Histogram of active account workers processing proofs.
417    pub active_account_workers_histogram: Histogram,
418    /// Gauge for the maximum number of storage workers in the pool.
419    pub max_storage_workers: Gauge,
420    /// Gauge for the maximum number of account workers in the pool.
421    pub max_account_workers: Gauge,
422    /// Histogram of pending storage multiproofs in the queue.
423    pub pending_storage_multiproofs_histogram: Histogram,
424    /// Histogram of pending account multiproofs in the queue.
425    pub pending_account_multiproofs_histogram: Histogram,
426
427    /// Histogram of the number of prefetch proof target accounts.
428    pub prefetch_proof_targets_accounts_histogram: Histogram,
429    /// Histogram of the number of prefetch proof target storages.
430    pub prefetch_proof_targets_storages_histogram: Histogram,
431    /// Histogram of the number of prefetch proof target chunks.
432    pub prefetch_proof_chunks_histogram: Histogram,
433
434    /// Histogram of the number of state update proof target accounts.
435    pub state_update_proof_targets_accounts_histogram: Histogram,
436    /// Histogram of the number of state update proof target storages.
437    pub state_update_proof_targets_storages_histogram: Histogram,
438    /// Histogram of the number of state update proof target chunks.
439    pub state_update_proof_chunks_histogram: Histogram,
440
441    /// Histogram of prefetch proof batch sizes (number of messages merged).
442    pub prefetch_batch_size_histogram: Histogram,
443    /// Histogram of state update batch sizes (number of messages merged).
444    pub state_update_batch_size_histogram: Histogram,
445
446    /// Histogram of proof calculation durations.
447    pub proof_calculation_duration_histogram: Histogram,
448
449    /// Histogram of sparse trie update durations.
450    pub sparse_trie_update_duration_histogram: Histogram,
451    /// Histogram of sparse trie final update durations.
452    pub sparse_trie_final_update_duration_histogram: Histogram,
453    /// Histogram of sparse trie total durations.
454    pub sparse_trie_total_duration_histogram: Histogram,
455
456    /// Histogram of state updates received.
457    pub state_updates_received_histogram: Histogram,
458    /// Histogram of proofs processed.
459    pub proofs_processed_histogram: Histogram,
460    /// Histogram of total time spent in the multiproof task.
461    pub multiproof_task_total_duration_histogram: Histogram,
462    /// Total time spent waiting for the first state update or prefetch request.
463    pub first_update_wait_time_histogram: Histogram,
464    /// Total time spent waiting for the last proof result.
465    pub last_proof_wait_time_histogram: Histogram,
466}
467
468/// Standalone task that receives a transaction state stream and updates relevant
469/// data structures to calculate state root.
470///
471/// ## Architecture: Dual-Channel Multiproof System
472///
473/// This task orchestrates parallel proof computation using a dual-channel architecture that
474/// separates control messages from proof computation results:
475///
476/// ```text
477/// ┌─────────────────────────────────────────────────────────────────┐
478/// │                        MultiProofTask                            │
479/// │                  Event Loop (crossbeam::select!)                 │
480/// └──┬──────────────────────────────────────────────────────────▲───┘
481///    │                                                           │
482///    │ (1) Send proof request                                   │
483///    │     via tx (control channel)                             │
484///    │                                                           │
485///    ▼                                                           │
486/// ┌──────────────────────────────────────────────────────────────┐ │
487/// │             MultiproofManager                                │ │
488/// │  - Deduplicates against fetched_proof_targets                │ │
489/// │  - Routes to appropriate worker pool                         │ │
490/// └──┬───────────────────────────────────────────────────────────┘ │
491///    │                                                             │
492///    │ (2) Dispatch to workers                                    │
493///    │     OR send EmptyProof (fast path)                         │
494///    ▼                                                             │
495/// ┌──────────────────────────────────────────────────────────────┐ │
496/// │              ProofWorkerHandle                                │ │
497/// │  ┌─────────────────────┐   ┌────────────────────────┐        │ │
498/// │  │ Storage Worker Pool │   │ Account Worker Pool     │        │ │
499/// │  │ (spawn_blocking)    │   │ (spawn_blocking)        │        │ │
500/// │  └─────────────────────┘   └────────────────────────┘        │ │
501/// └──┬───────────────────────────────────────────────────────────┘ │
502///    │                                                             │
503///    │ (3) Compute proofs in parallel                             │
504///    │     Send results back                                      │
505///    │                                                             │
506///    ▼                                                             │
507/// ┌──────────────────────────────────────────────────────────────┐ │
508/// │  proof_result_tx (crossbeam unbounded channel)                │ │
509/// │    → ProofResultMessage { multiproof, sequence_number, ... }  │ │
510/// └──────────────────────────────────────────────────────────────┘ │
511///                                                                   │
512///   (4) Receive via crossbeam::select! on two channels: ───────────┘
513///       - rx: Control messages (PrefetchProofs, StateUpdate,
514///             EmptyProof, FinishedStateUpdates)
515///       - proof_result_rx: Computed proof results from workers
516/// ```
517///
518/// ## Component Responsibilities
519///
520/// - **[`MultiProofTask`]**: Event loop coordinator
521///   - Receives state updates from transaction execution
522///   - Deduplicates proof targets against already-fetched proofs
523///   - Sequences proofs to maintain transaction ordering
524///   - Feeds sequenced updates to sparse trie task
525///
526/// - **[`MultiproofManager`]**: Calculation orchestrator
527///   - Decides between fast path ([`EmptyProof`]) and worker dispatch
528///   - Routes storage-only vs full multiproofs to appropriate workers
529///   - Records metrics for monitoring
530///
531/// - **[`ProofWorkerHandle`]**: Worker pool manager
532///   - Maintains separate pools for storage and account proofs
533///   - Dispatches work to blocking threads (CPU-intensive)
534///   - Sends results directly via `proof_result_tx` (bypasses control channel)
535///
536/// [`EmptyProof`]: MultiProofMessage::EmptyProof
537/// [`ProofWorkerHandle`]: reth_trie_parallel::proof_task::ProofWorkerHandle
538///
539/// ## Dual-Channel Design Rationale
540///
541/// The system uses two separate crossbeam channels:
542///
543/// 1. **Control Channel (`tx`/`rx`)**: For orchestration messages
544///    - `PrefetchProofs`: Pre-fetch proofs before execution
545///    - `StateUpdate`: New transaction execution results
546///    - `EmptyProof`: Fast path when all targets already fetched
547///    - `FinishedStateUpdates`: Signal to drain pending work
548///
549/// 2. **Proof Result Channel (`proof_result_tx`/`proof_result_rx`)**: For worker results
550///    - `ProofResultMessage`: Computed multiproofs from worker pools
551///    - Direct path from workers to event loop (no intermediate hops)
552///    - Keeps control messages separate from high-throughput proof data
553///
554/// This separation enables:
555/// - **Non-blocking control**: Control messages never wait behind large proof data
556/// - **Backpressure management**: Each channel can apply different policies
557/// - **Clear ownership**: Workers only need proof result sender, not control channel
558///
559/// ## Initialization and Lifecycle
560///
561/// The task initializes a blinded sparse trie and subscribes to transaction state streams.
562/// As it receives transaction execution results, it fetches proofs for relevant accounts
563/// from the database and reveals them to the tree, then updates relevant leaves according
564/// to transaction results. This feeds updates to the sparse trie task.
565///
566/// See the `run()` method documentation for detailed lifecycle flow.
567#[derive(Debug)]
568pub(super) struct MultiProofTask {
569    /// The size of proof targets chunk to spawn in one calculation.
570    /// If None, chunking is disabled and all targets are processed in a single proof.
571    chunk_size: Option<usize>,
572    /// Receiver for state root related messages (prefetch, state updates, finish signal).
573    rx: CrossbeamReceiver<MultiProofMessage>,
574    /// Sender for state root related messages.
575    tx: CrossbeamSender<MultiProofMessage>,
576    /// Receiver for proof results directly from workers.
577    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
578    /// Sender for state updates emitted by this type.
579    to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
580    /// Proof targets that have been already fetched.
581    fetched_proof_targets: MultiProofTargets,
582    /// Tracks keys which have been added and removed throughout the entire block.
583    multi_added_removed_keys: MultiAddedRemovedKeys,
584    /// Proof sequencing handler.
585    proof_sequencer: ProofSequencer,
586    /// Manages calculation of multiproofs.
587    multiproof_manager: MultiproofManager,
588    /// multi proof task metrics
589    metrics: MultiProofTaskMetrics,
590    /// If this number is exceeded and chunking is enabled, then this will override whether or not
591    /// there are any active workers and force chunking across workers. This is to prevent tasks
592    /// which are very long from hitting a single worker.
593    max_targets_for_chunking: usize,
594}
595
596impl MultiProofTask {
597    /// Creates a multiproof task with separate channels: control on `tx`/`rx`, proof results on
598    /// `proof_result_rx`.
599    pub(super) fn new(
600        proof_worker_handle: ProofWorkerHandle,
601        to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
602        chunk_size: Option<usize>,
603        tx: CrossbeamSender<MultiProofMessage>,
604        rx: CrossbeamReceiver<MultiProofMessage>,
605    ) -> Self {
606        let (proof_result_tx, proof_result_rx) = unbounded();
607        let metrics = MultiProofTaskMetrics::default();
608
609        Self {
610            chunk_size,
611            rx,
612            tx,
613            proof_result_rx,
614            to_sparse_trie,
615            fetched_proof_targets: Default::default(),
616            multi_added_removed_keys: MultiAddedRemovedKeys::new(),
617            proof_sequencer: ProofSequencer::default(),
618            multiproof_manager: MultiproofManager::new(
619                metrics.clone(),
620                proof_worker_handle,
621                proof_result_tx,
622            ),
623            metrics,
624            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
625        }
626    }
627
628    /// Returns a sender that can be used to send arbitrary [`MultiProofMessage`]s to this task.
629    pub(super) fn state_root_message_sender(&self) -> CrossbeamSender<MultiProofMessage> {
630        self.tx.clone()
631    }
632
633    /// Handles request for proof prefetch.
634    ///
635    /// Returns how many multiproof tasks were dispatched for the prefetch request.
636    #[instrument(
637        level = "debug",
638        target = "engine::tree::payload_processor::multiproof",
639        skip_all,
640        fields(accounts = targets.len(), chunks = 0)
641    )]
642    fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
643        let proof_targets = self.get_prefetch_proof_targets(targets);
644        self.fetched_proof_targets.extend_ref(&proof_targets);
645
646        // Make sure all target accounts have an `AddedRemovedKeySet` in the
647        // [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
648        // we still want to optimistically fetch extension children for the leaf addition case.
649        self.multi_added_removed_keys.touch_accounts(proof_targets.keys().copied());
650
651        // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
652        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
653
654        self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
655        self.metrics
656            .prefetch_proof_targets_storages_histogram
657            .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
658
659        let chunking_len = proof_targets.chunking_length();
660        let available_account_workers =
661            self.multiproof_manager.proof_worker_handle.available_account_workers();
662        let available_storage_workers =
663            self.multiproof_manager.proof_worker_handle.available_storage_workers();
664        let num_chunks = dispatch_with_chunking(
665            proof_targets,
666            chunking_len,
667            self.chunk_size,
668            self.max_targets_for_chunking,
669            available_account_workers,
670            available_storage_workers,
671            MultiProofTargets::chunks,
672            |proof_targets| {
673                self.multiproof_manager.dispatch(MultiproofInput {
674                    source: None,
675                    hashed_state_update: Default::default(),
676                    proof_targets,
677                    proof_sequence_number: self.proof_sequencer.next_sequence(),
678                    state_root_message_sender: self.tx.clone(),
679                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
680                });
681            },
682        );
683        self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64);
684
685        num_chunks as u64
686    }
687
688    // Returns true if all state updates finished and all proofs processed.
689    fn is_done(
690        &self,
691        proofs_processed: u64,
692        state_update_proofs_requested: u64,
693        prefetch_proofs_requested: u64,
694        updates_finished: bool,
695    ) -> bool {
696        let all_proofs_processed =
697            proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
698        let no_pending = !self.proof_sequencer.has_pending();
699        trace!(
700            target: "engine::tree::payload_processor::multiproof",
701            proofs_processed,
702            state_update_proofs_requested,
703            prefetch_proofs_requested,
704            no_pending,
705            updates_finished,
706            "Checking end condition"
707        );
708        all_proofs_processed && no_pending && updates_finished
709    }
710
711    /// Calls `get_proof_targets` with existing proof targets for prefetching.
712    fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
713        // Here we want to filter out any targets that are already fetched
714        //
715        // This means we need to remove any storage slots that have already been fetched
716        let mut duplicates = 0;
717
718        // First remove all storage targets that are subsets of already fetched storage slots
719        targets.retain(|hashed_address, target_storage| {
720            let keep = self
721                .fetched_proof_targets
722                .get(hashed_address)
723                // do NOT remove if None, because that means the account has not been fetched yet
724                .is_none_or(|fetched_storage| {
725                    // remove if a subset
726                    !target_storage.is_subset(fetched_storage)
727                });
728
729            if !keep {
730                duplicates += target_storage.len();
731            }
732
733            keep
734        });
735
736        // For all non-subset remaining targets, we have to calculate the difference
737        for (hashed_address, target_storage) in targets.deref_mut() {
738            let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
739                // this means the account has not been fetched yet, so we must fetch everything
740                // associated with this account
741                continue;
742            };
743
744            let prev_target_storage_len = target_storage.len();
745
746            // keep only the storage slots that have not been fetched yet
747            //
748            // we already removed subsets, so this should only remove duplicates
749            target_storage.retain(|slot| !fetched_storage.contains(slot));
750
751            duplicates += prev_target_storage_len - target_storage.len();
752        }
753
754        if duplicates > 0 {
755            trace!(target: "engine::tree::payload_processor::multiproof", duplicates, "Removed duplicate prefetch proof targets");
756        }
757
758        targets
759    }
760
761    /// Handles state updates.
762    ///
763    /// Returns how many proof dispatches were spawned (including an `EmptyProof` for already
764    /// fetched targets).
765    #[instrument(
766        level = "debug",
767        target = "engine::tree::payload_processor::multiproof",
768        skip(self, update),
769        fields(accounts = update.len(), chunks = 0)
770    )]
771    fn on_state_update(&mut self, source: Source, update: EvmState) -> u64 {
772        let hashed_state_update = evm_state_to_hashed_post_state(update);
773        self.on_hashed_state_update(source, hashed_state_update)
774    }
775
776    /// Processes a hashed state update and dispatches multiproofs as needed.
777    ///
778    /// Returns the number of state updates dispatched (both `EmptyProof` and regular multiproofs).
779    fn on_hashed_state_update(
780        &mut self,
781        source: Source,
782        hashed_state_update: HashedPostState,
783    ) -> u64 {
784        // Update removed keys based on the state update.
785        self.multi_added_removed_keys.update_with_state(&hashed_state_update);
786
787        // Split the state update into already fetched and not fetched according to the proof
788        // targets.
789        let (fetched_state_update, not_fetched_state_update) = hashed_state_update
790            .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
791
792        let mut state_updates = 0;
793        // If there are any accounts or storage slots that we already fetched the proofs for,
794        // send them immediately, as they don't require dispatching any additional multiproofs.
795        if !fetched_state_update.is_empty() {
796            let _ = self.tx.send(MultiProofMessage::EmptyProof {
797                sequence_number: self.proof_sequencer.next_sequence(),
798                state: fetched_state_update,
799            });
800            state_updates += 1;
801        }
802
803        // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
804        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
805
806        let chunking_len = not_fetched_state_update.chunking_length();
807        let mut spawned_proof_targets = MultiProofTargets::default();
808        let available_account_workers =
809            self.multiproof_manager.proof_worker_handle.available_account_workers();
810        let available_storage_workers =
811            self.multiproof_manager.proof_worker_handle.available_storage_workers();
812        let num_chunks = dispatch_with_chunking(
813            not_fetched_state_update,
814            chunking_len,
815            self.chunk_size,
816            self.max_targets_for_chunking,
817            available_account_workers,
818            available_storage_workers,
819            HashedPostState::chunks,
820            |hashed_state_update| {
821                let proof_targets = get_proof_targets(
822                    &hashed_state_update,
823                    &self.fetched_proof_targets,
824                    &multi_added_removed_keys,
825                );
826                spawned_proof_targets.extend_ref(&proof_targets);
827
828                self.multiproof_manager.dispatch(MultiproofInput {
829                    source: Some(source),
830                    hashed_state_update,
831                    proof_targets,
832                    proof_sequence_number: self.proof_sequencer.next_sequence(),
833                    state_root_message_sender: self.tx.clone(),
834                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
835                });
836            },
837        );
838        self.metrics
839            .state_update_proof_targets_accounts_histogram
840            .record(spawned_proof_targets.len() as f64);
841        self.metrics
842            .state_update_proof_targets_storages_histogram
843            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
844        self.metrics.state_update_proof_chunks_histogram.record(num_chunks as f64);
845
846        self.fetched_proof_targets.extend(spawned_proof_targets);
847
848        state_updates + num_chunks as u64
849    }
850
851    /// Handler for new proof calculated, aggregates all the existing sequential proofs.
852    fn on_proof(
853        &mut self,
854        sequence_number: u64,
855        update: SparseTrieUpdate,
856    ) -> Option<SparseTrieUpdate> {
857        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
858
859        ready_proofs
860            .into_iter()
861            // Merge all ready proofs and state updates
862            .reduce(|mut acc_update, update| {
863                acc_update.extend(update);
864                acc_update
865            })
866            // Return None if the resulting proof is empty
867            .filter(|proof| !proof.is_empty())
868    }
869
870    /// Processes a multiproof message, batching consecutive same-type messages.
871    ///
872    /// Drains queued messages of the same type and merges them into one batch before processing,
873    /// storing one pending message (different type or over-cap) to handle on the next iteration.
874    /// This preserves ordering without requeuing onto the channel.
875    ///
876    /// Returns `true` if done, `false` to continue.
877    fn process_multiproof_message<P>(
878        &mut self,
879        msg: MultiProofMessage,
880        ctx: &mut MultiproofBatchCtx,
881        batch_metrics: &mut MultiproofBatchMetrics,
882        provider: &CachedStateProvider<P>,
883    ) -> bool
884    where
885        P: AccountReader,
886    {
887        match msg {
888            // Prefetch proofs: batch consecutive prefetch requests up to target/message limits
889            MultiProofMessage::PrefetchProofs(targets) => {
890                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::PrefetchProofs");
891
892                if ctx.first_update_time.is_none() {
893                    self.metrics
894                        .first_update_wait_time_histogram
895                        .record(ctx.start.elapsed().as_secs_f64());
896                    ctx.first_update_time = Some(Instant::now());
897                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
898                }
899
900                let mut accumulated_count = targets.chunking_length();
901                ctx.accumulated_prefetch_targets.clear();
902                ctx.accumulated_prefetch_targets.push(targets);
903
904                // Batch consecutive prefetch messages up to limits.
905                while accumulated_count < PREFETCH_MAX_BATCH_TARGETS &&
906                    ctx.accumulated_prefetch_targets.len() < PREFETCH_MAX_BATCH_MESSAGES
907                {
908                    match self.rx.try_recv() {
909                        Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
910                            let next_count = next_targets.chunking_length();
911                            if accumulated_count + next_count > PREFETCH_MAX_BATCH_TARGETS {
912                                ctx.pending_msg =
913                                    Some(MultiProofMessage::PrefetchProofs(next_targets));
914                                break;
915                            }
916                            accumulated_count += next_count;
917                            ctx.accumulated_prefetch_targets.push(next_targets);
918                        }
919                        Ok(other_msg) => {
920                            ctx.pending_msg = Some(other_msg);
921                            break;
922                        }
923                        Err(_) => break,
924                    }
925                }
926
927                // Process all accumulated messages in a single batch
928                let num_batched = ctx.accumulated_prefetch_targets.len();
929                self.metrics.prefetch_batch_size_histogram.record(num_batched as f64);
930
931                // Merge all accumulated prefetch targets into a single dispatch payload.
932                // Use drain to preserve the buffer allocation.
933                let mut accumulated_iter = ctx.accumulated_prefetch_targets.drain(..);
934                let mut merged_targets =
935                    accumulated_iter.next().expect("prefetch batch always has at least one entry");
936                for next_targets in accumulated_iter {
937                    merged_targets.extend(next_targets);
938                }
939
940                let account_targets = merged_targets.len();
941                let storage_targets =
942                    merged_targets.values().map(|slots| slots.len()).sum::<usize>();
943                batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets);
944                trace!(
945                    target: "engine::tree::payload_processor::multiproof",
946                    account_targets,
947                    storage_targets,
948                    prefetch_proofs_requested = batch_metrics.prefetch_proofs_requested,
949                    num_batched,
950                    "Dispatched prefetch batch"
951                );
952
953                false
954            }
955            // State update: batch consecutive updates from the same source
956            MultiProofMessage::StateUpdate(source, update) => {
957                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
958
959                if ctx.first_update_time.is_none() {
960                    self.metrics
961                        .first_update_wait_time_histogram
962                        .record(ctx.start.elapsed().as_secs_f64());
963                    ctx.first_update_time = Some(Instant::now());
964                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
965                }
966
967                // Accumulate messages including the first one; reuse buffer to avoid allocations.
968                let mut accumulated_targets = estimate_evm_state_targets(&update);
969                ctx.accumulated_state_updates.clear();
970                ctx.accumulated_state_updates.push((source, update));
971
972                // Batch consecutive state update messages up to target limit.
973                while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
974                    match self.rx.try_recv() {
975                        Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
976                            let (batch_source, batch_update) = &ctx.accumulated_state_updates[0];
977                            if !can_batch_state_update(
978                                *batch_source,
979                                batch_update,
980                                next_source,
981                                &next_update,
982                            ) {
983                                ctx.pending_msg =
984                                    Some(MultiProofMessage::StateUpdate(next_source, next_update));
985                                break;
986                            }
987
988                            let next_estimate = estimate_evm_state_targets(&next_update);
989                            // Would exceed batch cap; leave pending to dispatch on next iteration.
990                            if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS
991                            {
992                                ctx.pending_msg =
993                                    Some(MultiProofMessage::StateUpdate(next_source, next_update));
994                                break;
995                            }
996                            accumulated_targets += next_estimate;
997                            ctx.accumulated_state_updates.push((next_source, next_update));
998                        }
999                        Ok(other_msg) => {
1000                            ctx.pending_msg = Some(other_msg);
1001                            break;
1002                        }
1003                        Err(_) => break,
1004                    }
1005                }
1006
1007                // Process all accumulated messages in a single batch
1008                let num_batched = ctx.accumulated_state_updates.len();
1009                self.metrics.state_update_batch_size_histogram.record(num_batched as f64);
1010
1011                #[cfg(debug_assertions)]
1012                {
1013                    let batch_source = ctx.accumulated_state_updates[0].0;
1014                    let batch_update = &ctx.accumulated_state_updates[0].1;
1015                    debug_assert!(ctx.accumulated_state_updates.iter().all(|(source, update)| {
1016                        can_batch_state_update(batch_source, batch_update, *source, update)
1017                    }));
1018                }
1019
1020                // Merge all accumulated updates into a single EvmState payload.
1021                // Use drain to preserve the buffer allocation.
1022                let mut accumulated_iter = ctx.accumulated_state_updates.drain(..);
1023                let (mut batch_source, mut merged_update) = accumulated_iter
1024                    .next()
1025                    .expect("state update batch always has at least one entry");
1026                for (next_source, next_update) in accumulated_iter {
1027                    batch_source = next_source;
1028                    merged_update.extend(next_update);
1029                }
1030
1031                let batch_len = merged_update.len();
1032                batch_metrics.state_update_proofs_requested +=
1033                    self.on_state_update(batch_source, merged_update);
1034                trace!(
1035                    target: "engine::tree::payload_processor::multiproof",
1036                    ?batch_source,
1037                    len = batch_len,
1038                    state_update_proofs_requested = ?batch_metrics.state_update_proofs_requested,
1039                    num_batched,
1040                    "Dispatched state update batch"
1041                );
1042
1043                false
1044            }
1045            // Process Block Access List (BAL) - complete state changes provided upfront
1046            MultiProofMessage::BlockAccessList(bal) => {
1047                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::BAL");
1048
1049                if ctx.first_update_time.is_none() {
1050                    self.metrics
1051                        .first_update_wait_time_histogram
1052                        .record(ctx.start.elapsed().as_secs_f64());
1053                    ctx.first_update_time = Some(Instant::now());
1054                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation from BAL");
1055                }
1056
1057                // Convert BAL to HashedPostState and process it
1058                match bal_to_hashed_post_state(&bal, provider) {
1059                    Ok(hashed_state) => {
1060                        debug!(
1061                            target: "engine::tree::payload_processor::multiproof",
1062                            accounts = hashed_state.accounts.len(),
1063                            storages = hashed_state.storages.len(),
1064                            "Processing BAL state update"
1065                        );
1066
1067                        // Use BlockAccessList as source for BAL-derived state updates
1068                        batch_metrics.state_update_proofs_requested +=
1069                            self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
1070                    }
1071                    Err(err) => {
1072                        error!(target: "engine::tree::payload_processor::multiproof", ?err, "Failed to convert BAL to hashed state");
1073                        return true;
1074                    }
1075                }
1076
1077                // Mark updates as finished since BAL provides complete state
1078                ctx.updates_finished_time = Some(Instant::now());
1079
1080                // Check if we're done (might need to wait for proofs to complete)
1081                if self.is_done(
1082                    batch_metrics.proofs_processed,
1083                    batch_metrics.state_update_proofs_requested,
1084                    batch_metrics.prefetch_proofs_requested,
1085                    ctx.updates_finished(),
1086                ) {
1087                    debug!(
1088                        target: "engine::tree::payload_processor::multiproof",
1089                        "BAL processed and all proofs complete, ending calculation"
1090                    );
1091                    return true;
1092                }
1093                false
1094            }
1095            // Signal that no more state updates will arrive
1096            MultiProofMessage::FinishedStateUpdates => {
1097                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::FinishedStateUpdates");
1098
1099                ctx.updates_finished_time = Some(Instant::now());
1100
1101                if self.is_done(
1102                    batch_metrics.proofs_processed,
1103                    batch_metrics.state_update_proofs_requested,
1104                    batch_metrics.prefetch_proofs_requested,
1105                    ctx.updates_finished(),
1106                ) {
1107                    debug!(
1108                        target: "engine::tree::payload_processor::multiproof",
1109                        "State updates finished and all proofs processed, ending calculation"
1110                    );
1111                    return true;
1112                }
1113                false
1114            }
1115            // Handle proof result with no trie nodes (state unchanged)
1116            MultiProofMessage::EmptyProof { sequence_number, state } => {
1117                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::EmptyProof");
1118
1119                batch_metrics.proofs_processed += 1;
1120
1121                if let Some(combined_update) = self.on_proof(
1122                    sequence_number,
1123                    SparseTrieUpdate { state, multiproof: Default::default() },
1124                ) {
1125                    let _ = self.to_sparse_trie.send(combined_update);
1126                }
1127
1128                if self.is_done(
1129                    batch_metrics.proofs_processed,
1130                    batch_metrics.state_update_proofs_requested,
1131                    batch_metrics.prefetch_proofs_requested,
1132                    ctx.updates_finished(),
1133                ) {
1134                    debug!(
1135                        target: "engine::tree::payload_processor::multiproof",
1136                        "State updates finished and all proofs processed, ending calculation"
1137                    );
1138                    return true;
1139                }
1140                false
1141            }
1142        }
1143    }
1144
1145    /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the
1146    /// sparse trie, updates the sparse trie, and eventually returns the state root.
1147    ///
1148    /// The lifecycle is the following:
1149    /// 1. Either [`MultiProofMessage::PrefetchProofs`] or [`MultiProofMessage::StateUpdate`] is
1150    ///    received from the engine.
1151    ///    * For [`MultiProofMessage::StateUpdate`], the state update is hashed with
1152    ///      [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are
1153    ///      extracted with [`get_proof_targets`].
1154    ///    * For both messages, proof targets are deduplicated according to `fetched_proof_targets`,
1155    ///      so that the proofs for accounts and storage slots that were already fetched are not
1156    ///      requested again.
1157    /// 2. Using the proof targets, a new multiproof is calculated using
1158    ///    [`MultiproofManager::dispatch`].
1159    ///    * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
1160    ///      sent back to this task along with the original state update.
1161    ///    * Otherwise, the multiproof is dispatched to worker pools and results are sent directly
1162    ///      to this task via the `proof_result_rx` channel as [`ProofResultMessage`].
1163    /// 3. Either [`MultiProofMessage::EmptyProof`] (via control channel) or [`ProofResultMessage`]
1164    ///    (via proof result channel) is received.
1165    ///    * The multiproof is added to the [`ProofSequencer`].
1166    ///    * If the proof sequencer has a contiguous sequence of multiproofs in the same order as
1167    ///      state updates arrived (i.e. transaction order), such sequence is returned.
1168    /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state
1169    ///    updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse
1170    ///    trie task.
1171    /// 5. Steps above are repeated until this task receives a
1172    ///    [`MultiProofMessage::FinishedStateUpdates`].
1173    ///    * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
1174    ///      [`ProofResultMessage`], we check if all proofs have been processed and if there are any
1175    ///      pending proofs in the proof sequencer left to be revealed.
1176    /// 6. While running, consecutive [`MultiProofMessage::PrefetchProofs`] and
1177    ///    [`MultiProofMessage::StateUpdate`] messages are batched to reduce redundant work; if a
1178    ///    different message type arrives mid-batch or a batch cap is reached, it is held as
1179    ///    `pending_msg` and processed on the next loop to preserve ordering.
1180    /// 7. This task exits after all pending proofs are processed.
1181    #[instrument(
1182        level = "debug",
1183        name = "MultiProofTask::run",
1184        target = "engine::tree::payload_processor::multiproof",
1185        skip_all
1186    )]
1187    pub(crate) fn run<P>(mut self, provider: CachedStateProvider<P>)
1188    where
1189        P: AccountReader,
1190    {
1191        let mut ctx = MultiproofBatchCtx::new(Instant::now());
1192        let mut batch_metrics = MultiproofBatchMetrics::default();
1193
1194        // Main event loop; select_biased! prioritizes proof results over control messages.
1195        // Labeled so inner match arms can `break 'main` once all work is complete.
1196        'main: loop {
1197            trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop");
1198
1199            if let Some(msg) = ctx.pending_msg.take() {
1200                if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1201                    break 'main;
1202                }
1203                continue;
1204            }
1205
1206            // Use select_biased! to prioritize proof results over new requests.
1207            // This prevents new work from starving completed proofs and keeps workers healthy.
1208            crossbeam_channel::select_biased! {
1209                recv(self.proof_result_rx) -> proof_msg => {
1210                    match proof_msg {
1211                        Ok(proof_result) => {
1212                            batch_metrics.proofs_processed += 1;
1213
1214                            self.metrics
1215                                .proof_calculation_duration_histogram
1216                                .record(proof_result.elapsed);
1217
1218                            self.multiproof_manager.on_calculation_complete();
1219
1220                            // Convert ProofResultMessage to SparseTrieUpdate
1221                            match proof_result.result {
1222                                Ok(proof_result_data) => {
1223                                    trace!(
1224                                        target: "engine::tree::payload_processor::multiproof",
1225                                        sequence = proof_result.sequence_number,
1226                                        total_proofs = batch_metrics.proofs_processed,
1227                                        "Processing calculated proof from worker"
1228                                    );
1229
1230                                    let update = SparseTrieUpdate {
1231                                        state: proof_result.state,
1232                                        multiproof: proof_result_data.into_multiproof(),
1233                                    };
1234
1235                                    if let Some(combined_update) =
1236                                        self.on_proof(proof_result.sequence_number, update)
1237                                    {
1238                                        let _ = self.to_sparse_trie.send(combined_update);
1239                                    }
1240                                }
1241                                Err(error) => {
1242                                    error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
1243                                    return
1244                                }
1245                            }
1246
1247                            if self.is_done(
1248                                batch_metrics.proofs_processed,
1249                                batch_metrics.state_update_proofs_requested,
1250                                batch_metrics.prefetch_proofs_requested,
1251                                ctx.updates_finished(),
1252                            ) {
1253                                debug!(
1254                                    target: "engine::tree::payload_processor::multiproof",
1255                                    "State updates finished and all proofs processed, ending calculation"
1256                                );
1257                                break 'main
1258                            }
1259                        }
1260                        Err(_) => {
1261                            error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
1262                            return
1263                        }
1264                    }
1265                },
1266                recv(self.rx) -> message => {
1267                    let msg = match message {
1268                        Ok(m) => m,
1269                        Err(_) => {
1270                            error!(target: "engine::tree::payload_processor::multiproof", "State root related message channel closed unexpectedly");
1271                            return
1272                        }
1273                    };
1274
1275                    if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1276                        break 'main;
1277                    }
1278                }
1279            }
1280        }
1281
1282        debug!(
1283            target: "engine::tree::payload_processor::multiproof",
1284            total_updates = batch_metrics.state_update_proofs_requested,
1285            total_proofs = batch_metrics.proofs_processed,
1286            total_time = ?ctx.first_update_time.map(|t|t.elapsed()),
1287            time_since_updates_finished = ?ctx.updates_finished_time.map(|t|t.elapsed()),
1288            "All proofs processed, ending calculation"
1289        );
1290
1291        // update total metrics on finish
1292        self.metrics
1293            .state_updates_received_histogram
1294            .record(batch_metrics.state_update_proofs_requested as f64);
1295        self.metrics.proofs_processed_histogram.record(batch_metrics.proofs_processed as f64);
1296        if let Some(total_time) = ctx.first_update_time.map(|t| t.elapsed()) {
1297            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1298        }
1299
1300        if let Some(updates_finished_time) = ctx.updates_finished_time {
1301            self.metrics
1302                .last_proof_wait_time_histogram
1303                .record(updates_finished_time.elapsed().as_secs_f64());
1304        }
1305    }
1306}
1307
1308/// Context for multiproof message batching loop.
1309///
1310/// Contains processing state that persists across loop iterations.
1311///
1312/// Used by `process_multiproof_message` to batch consecutive same-type messages received via
1313/// `try_recv` for efficient processing.
1314struct MultiproofBatchCtx {
1315    /// Buffers a non-matching message type encountered during batching.
1316    /// Processed first in next iteration to preserve ordering while allowing same-type
1317    /// messages to batch.
1318    pending_msg: Option<MultiProofMessage>,
1319    /// Timestamp when the first state update or prefetch was received.
1320    first_update_time: Option<Instant>,
1321    /// Timestamp before the first state update or prefetch was received.
1322    start: Instant,
1323    /// Timestamp when state updates finished. `Some` indicates all state updates have been
1324    /// received.
1325    updates_finished_time: Option<Instant>,
1326    /// Reusable buffer for accumulating prefetch targets during batching.
1327    accumulated_prefetch_targets: Vec<MultiProofTargets>,
1328    /// Reusable buffer for accumulating state updates during batching.
1329    accumulated_state_updates: Vec<(Source, EvmState)>,
1330}
1331
1332impl MultiproofBatchCtx {
1333    /// Creates a new batch context with the given start time.
1334    fn new(start: Instant) -> Self {
1335        Self {
1336            pending_msg: None,
1337            first_update_time: None,
1338            start,
1339            updates_finished_time: None,
1340            accumulated_prefetch_targets: Vec::with_capacity(PREFETCH_MAX_BATCH_MESSAGES),
1341            accumulated_state_updates: Vec::with_capacity(STATE_UPDATE_BATCH_PREALLOC),
1342        }
1343    }
1344
1345    /// Returns `true` if all state updates have been received.
1346    const fn updates_finished(&self) -> bool {
1347        self.updates_finished_time.is_some()
1348    }
1349}
1350
1351/// Counters for tracking proof requests and processing.
1352#[derive(Default)]
1353struct MultiproofBatchMetrics {
1354    /// Number of proofs that have been processed.
1355    proofs_processed: u64,
1356    /// Number of state update proofs requested.
1357    state_update_proofs_requested: u64,
1358    /// Number of prefetch proofs requested.
1359    prefetch_proofs_requested: u64,
1360}
1361
1362/// Returns accounts only with those storages that were not already fetched, and
1363/// if there are no such storages and the account itself was already fetched, the
1364/// account shouldn't be included.
1365fn get_proof_targets(
1366    state_update: &HashedPostState,
1367    fetched_proof_targets: &MultiProofTargets,
1368    multi_added_removed_keys: &MultiAddedRemovedKeys,
1369) -> MultiProofTargets {
1370    let mut targets = MultiProofTargets::default();
1371
1372    // first collect all new accounts (not previously fetched)
1373    for &hashed_address in state_update.accounts.keys() {
1374        if !fetched_proof_targets.contains_key(&hashed_address) {
1375            targets.insert(hashed_address, HashSet::default());
1376        }
1377    }
1378
1379    // then process storage slots for all accounts in the state update
1380    for (hashed_address, storage) in &state_update.storages {
1381        let fetched = fetched_proof_targets.get(hashed_address);
1382        let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1383        let mut changed_slots = storage
1384            .storage
1385            .keys()
1386            .filter(|slot| {
1387                !fetched.is_some_and(|f| f.contains(*slot)) ||
1388                    storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1389            })
1390            .peekable();
1391
1392        // If the storage is wiped, we still need to fetch the account proof.
1393        if storage.wiped && fetched.is_none() {
1394            targets.entry(*hashed_address).or_default();
1395        }
1396
1397        if changed_slots.peek().is_some() {
1398            targets.entry(*hashed_address).or_default().extend(changed_slots);
1399        }
1400    }
1401
1402    targets
1403}
1404
1405/// Dispatches work items as a single unit or in chunks based on target size and worker
1406/// availability.
1407#[allow(clippy::too_many_arguments)]
1408fn dispatch_with_chunking<T, I>(
1409    items: T,
1410    chunking_len: usize,
1411    chunk_size: Option<usize>,
1412    max_targets_for_chunking: usize,
1413    available_account_workers: usize,
1414    available_storage_workers: usize,
1415    chunker: impl FnOnce(T, usize) -> I,
1416    mut dispatch: impl FnMut(T),
1417) -> usize
1418where
1419    I: IntoIterator<Item = T>,
1420{
1421    let should_chunk = chunking_len > max_targets_for_chunking ||
1422        available_account_workers > 1 ||
1423        available_storage_workers > 1;
1424
1425    if should_chunk &&
1426        let Some(chunk_size) = chunk_size &&
1427        chunking_len > chunk_size
1428    {
1429        let mut num_chunks = 0usize;
1430        for chunk in chunker(items, chunk_size) {
1431            dispatch(chunk);
1432            num_chunks += 1;
1433        }
1434        return num_chunks;
1435    }
1436
1437    dispatch(items);
1438    1
1439}
1440
1441/// Checks whether two state updates can be merged in a batch.
1442///
1443/// Transaction updates with the same transaction ID (`StateChangeSource::Transaction(id)`)
1444/// are safe to merge because they originate from the same logical execution and can be
1445/// coalesced to amortize proof work.
1446fn can_batch_state_update(
1447    batch_source: Source,
1448    batch_update: &EvmState,
1449    next_source: Source,
1450    next_update: &EvmState,
1451) -> bool {
1452    if !same_source(batch_source, next_source) {
1453        return false;
1454    }
1455
1456    match (batch_source, next_source) {
1457        (
1458            Source::Evm(StateChangeSource::PreBlock(_)),
1459            Source::Evm(StateChangeSource::PreBlock(_)),
1460        ) |
1461        (
1462            Source::Evm(StateChangeSource::PostBlock(_)),
1463            Source::Evm(StateChangeSource::PostBlock(_)),
1464        ) => batch_update == next_update,
1465        _ => true,
1466    }
1467}
1468
1469/// Checks whether two sources refer to the same origin.
1470fn same_source(lhs: Source, rhs: Source) -> bool {
1471    match (lhs, rhs) {
1472        (
1473            Source::Evm(StateChangeSource::Transaction(a)),
1474            Source::Evm(StateChangeSource::Transaction(b)),
1475        ) => a == b,
1476        (
1477            Source::Evm(StateChangeSource::PreBlock(a)),
1478            Source::Evm(StateChangeSource::PreBlock(b)),
1479        ) => mem::discriminant(&a) == mem::discriminant(&b),
1480        (
1481            Source::Evm(StateChangeSource::PostBlock(a)),
1482            Source::Evm(StateChangeSource::PostBlock(b)),
1483        ) => mem::discriminant(&a) == mem::discriminant(&b),
1484        (Source::BlockAccessList, Source::BlockAccessList) => true,
1485        _ => false,
1486    }
1487}
1488
1489/// Estimates target count from `EvmState` for batching decisions.
1490fn estimate_evm_state_targets(state: &EvmState) -> usize {
1491    state
1492        .values()
1493        .filter(|account| account.is_touched())
1494        .map(|account| {
1495            let changed_slots = account.storage.iter().filter(|(_, v)| v.is_changed()).count();
1496            1 + changed_slots
1497        })
1498        .sum()
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503    use super::*;
1504    use crate::tree::cached_state::ExecutionCacheBuilder;
1505    use alloy_eip7928::{AccountChanges, BalanceChange};
1506    use alloy_primitives::{map::B256Set, Address};
1507    use reth_provider::{
1508        providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
1509        BlockReader, DatabaseProviderFactory, LatestStateProvider, PruneCheckpointReader,
1510        StageCheckpointReader, StateProviderBox, TrieReader,
1511    };
1512    use reth_trie::MultiProof;
1513    use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
1514    use revm_primitives::{B256, U256};
1515    use std::sync::{Arc, OnceLock};
1516    use tokio::runtime::{Handle, Runtime};
1517
1518    /// Get a handle to the test runtime, creating it if necessary
1519    fn get_test_runtime_handle() -> Handle {
1520        static TEST_RT: OnceLock<Runtime> = OnceLock::new();
1521        TEST_RT
1522            .get_or_init(|| {
1523                tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
1524            })
1525            .handle()
1526            .clone()
1527    }
1528
1529    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
1530    where
1531        F: DatabaseProviderFactory<
1532                Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
1533            > + Clone
1534            + Send
1535            + 'static,
1536    {
1537        let rt_handle = get_test_runtime_handle();
1538        let overlay_factory = OverlayStateProviderFactory::new(factory);
1539        let task_ctx = ProofTaskCtx::new(overlay_factory);
1540        let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1);
1541        let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
1542        let (tx, rx) = crossbeam_channel::unbounded();
1543
1544        MultiProofTask::new(proof_handle, to_sparse_trie, Some(1), tx, rx)
1545    }
1546
1547    fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
1548    where
1549        F: DatabaseProviderFactory<
1550                Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
1551            > + Clone
1552            + Send
1553            + 'static,
1554    {
1555        let db_provider = factory.database_provider_ro().unwrap();
1556        let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
1557        let cache = ExecutionCacheBuilder::default().build_caches(1000);
1558        CachedStateProvider::new(state_provider, cache, Default::default())
1559    }
1560
1561    #[test]
1562    fn test_add_proof_in_sequence() {
1563        let mut sequencer = ProofSequencer::default();
1564        let proof1 = MultiProof::default();
1565        let proof2 = MultiProof::default();
1566        sequencer.next_sequence = 2;
1567
1568        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1569        assert_eq!(ready.len(), 1);
1570        assert!(!sequencer.has_pending());
1571
1572        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1573        assert_eq!(ready.len(), 1);
1574        assert!(!sequencer.has_pending());
1575    }
1576
1577    #[test]
1578    fn test_add_proof_out_of_order() {
1579        let mut sequencer = ProofSequencer::default();
1580        let proof1 = MultiProof::default();
1581        let proof2 = MultiProof::default();
1582        let proof3 = MultiProof::default();
1583        sequencer.next_sequence = 3;
1584
1585        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1586        assert_eq!(ready.len(), 0);
1587        assert!(sequencer.has_pending());
1588
1589        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1590        assert_eq!(ready.len(), 1);
1591        assert!(sequencer.has_pending());
1592
1593        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1594        assert_eq!(ready.len(), 2);
1595        assert!(!sequencer.has_pending());
1596    }
1597
1598    #[test]
1599    fn test_add_proof_with_gaps() {
1600        let mut sequencer = ProofSequencer::default();
1601        let proof1 = MultiProof::default();
1602        let proof3 = MultiProof::default();
1603        sequencer.next_sequence = 3;
1604
1605        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1606        assert_eq!(ready.len(), 1);
1607
1608        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1609        assert_eq!(ready.len(), 0);
1610        assert!(sequencer.has_pending());
1611    }
1612
1613    #[test]
1614    fn test_add_proof_duplicate_sequence() {
1615        let mut sequencer = ProofSequencer::default();
1616        let proof1 = MultiProof::default();
1617        let proof2 = MultiProof::default();
1618
1619        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1620        assert_eq!(ready.len(), 1);
1621
1622        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1623        assert_eq!(ready.len(), 0);
1624        assert!(!sequencer.has_pending());
1625    }
1626
1627    #[test]
1628    fn test_add_proof_batch_processing() {
1629        let mut sequencer = ProofSequencer::default();
1630        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1631        sequencer.next_sequence = 5;
1632
1633        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1634        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1635        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1636        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1637
1638        let ready =
1639            sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1640        assert_eq!(ready.len(), 5);
1641        assert!(!sequencer.has_pending());
1642    }
1643
1644    fn create_get_proof_targets_state() -> HashedPostState {
1645        let mut state = HashedPostState::default();
1646
1647        let addr1 = B256::random();
1648        let addr2 = B256::random();
1649        state.accounts.insert(addr1, Some(Default::default()));
1650        state.accounts.insert(addr2, Some(Default::default()));
1651
1652        let mut storage = HashedStorage::default();
1653        let slot1 = B256::random();
1654        let slot2 = B256::random();
1655        storage.storage.insert(slot1, U256::ZERO);
1656        storage.storage.insert(slot2, U256::from(1));
1657        state.storages.insert(addr1, storage);
1658
1659        state
1660    }
1661
1662    #[test]
1663    fn test_get_proof_targets_new_account_targets() {
1664        let state = create_get_proof_targets_state();
1665        let fetched = MultiProofTargets::default();
1666
1667        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1668
1669        // should return all accounts as targets since nothing was fetched before
1670        assert_eq!(targets.len(), state.accounts.len());
1671        for addr in state.accounts.keys() {
1672            assert!(targets.contains_key(addr));
1673        }
1674    }
1675
1676    #[test]
1677    fn test_get_proof_targets_new_storage_targets() {
1678        let state = create_get_proof_targets_state();
1679        let fetched = MultiProofTargets::default();
1680
1681        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1682
1683        // verify storage slots are included for accounts with storage
1684        for (addr, storage) in &state.storages {
1685            assert!(targets.contains_key(addr));
1686            let target_slots = &targets[addr];
1687            assert_eq!(target_slots.len(), storage.storage.len());
1688            for slot in storage.storage.keys() {
1689                assert!(target_slots.contains(slot));
1690            }
1691        }
1692    }
1693
1694    #[test]
1695    fn test_get_proof_targets_filter_already_fetched_accounts() {
1696        let state = create_get_proof_targets_state();
1697        let mut fetched = MultiProofTargets::default();
1698
1699        // select an account that has no storage updates
1700        let fetched_addr = state
1701            .accounts
1702            .keys()
1703            .find(|&&addr| !state.storages.contains_key(&addr))
1704            .expect("Should have an account without storage");
1705
1706        // mark the account as already fetched
1707        fetched.insert(*fetched_addr, HashSet::default());
1708
1709        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1710
1711        // should not include the already fetched account since it has no storage updates
1712        assert!(!targets.contains_key(fetched_addr));
1713        // other accounts should still be included
1714        assert_eq!(targets.len(), state.accounts.len() - 1);
1715    }
1716
1717    #[test]
1718    fn test_get_proof_targets_filter_already_fetched_storage() {
1719        let state = create_get_proof_targets_state();
1720        let mut fetched = MultiProofTargets::default();
1721
1722        // mark one storage slot as already fetched
1723        let (addr, storage) = state.storages.iter().next().unwrap();
1724        let mut fetched_slots = HashSet::default();
1725        let fetched_slot = *storage.storage.keys().next().unwrap();
1726        fetched_slots.insert(fetched_slot);
1727        fetched.insert(*addr, fetched_slots);
1728
1729        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1730
1731        // should not include the already fetched storage slot
1732        let target_slots = &targets[addr];
1733        assert!(!target_slots.contains(&fetched_slot));
1734        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1735    }
1736
1737    #[test]
1738    fn test_get_proof_targets_empty_state() {
1739        let state = HashedPostState::default();
1740        let fetched = MultiProofTargets::default();
1741
1742        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1743
1744        assert!(targets.is_empty());
1745    }
1746
1747    #[test]
1748    fn test_get_proof_targets_mixed_fetched_state() {
1749        let mut state = HashedPostState::default();
1750        let mut fetched = MultiProofTargets::default();
1751
1752        let addr1 = B256::random();
1753        let addr2 = B256::random();
1754        let slot1 = B256::random();
1755        let slot2 = B256::random();
1756
1757        state.accounts.insert(addr1, Some(Default::default()));
1758        state.accounts.insert(addr2, Some(Default::default()));
1759
1760        let mut storage = HashedStorage::default();
1761        storage.storage.insert(slot1, U256::ZERO);
1762        storage.storage.insert(slot2, U256::from(1));
1763        state.storages.insert(addr1, storage);
1764
1765        let mut fetched_slots = HashSet::default();
1766        fetched_slots.insert(slot1);
1767        fetched.insert(addr1, fetched_slots);
1768
1769        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1770
1771        assert!(targets.contains_key(&addr2));
1772        assert!(!targets[&addr1].contains(&slot1));
1773        assert!(targets[&addr1].contains(&slot2));
1774    }
1775
1776    #[test]
1777    fn test_get_proof_targets_unmodified_account_with_storage() {
1778        let mut state = HashedPostState::default();
1779        let fetched = MultiProofTargets::default();
1780
1781        let addr = B256::random();
1782        let slot1 = B256::random();
1783        let slot2 = B256::random();
1784
1785        // don't add the account to state.accounts (simulating unmodified account)
1786        // but add storage updates for this account
1787        let mut storage = HashedStorage::default();
1788        storage.storage.insert(slot1, U256::from(1));
1789        storage.storage.insert(slot2, U256::from(2));
1790        state.storages.insert(addr, storage);
1791
1792        assert!(!state.accounts.contains_key(&addr));
1793        assert!(!fetched.contains_key(&addr));
1794
1795        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1796
1797        // verify that we still get the storage slots for the unmodified account
1798        assert!(targets.contains_key(&addr));
1799
1800        let target_slots = &targets[&addr];
1801        assert_eq!(target_slots.len(), 2);
1802        assert!(target_slots.contains(&slot1));
1803        assert!(target_slots.contains(&slot2));
1804    }
1805
1806    #[test]
1807    fn test_get_prefetch_proof_targets_no_duplicates() {
1808        let test_provider_factory = create_test_provider_factory();
1809        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1810
1811        // populate some targets
1812        let mut targets = MultiProofTargets::default();
1813        let addr1 = B256::random();
1814        let addr2 = B256::random();
1815        let slot1 = B256::random();
1816        let slot2 = B256::random();
1817        targets.insert(addr1, std::iter::once(slot1).collect());
1818        targets.insert(addr2, std::iter::once(slot2).collect());
1819
1820        let prefetch_proof_targets =
1821            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1822
1823        // check that the prefetch proof targets are the same because there are no fetched proof
1824        // targets yet
1825        assert_eq!(prefetch_proof_targets, targets);
1826
1827        // add a different addr and slot to fetched proof targets
1828        let addr3 = B256::random();
1829        let slot3 = B256::random();
1830        test_state_root_task.fetched_proof_targets.insert(addr3, std::iter::once(slot3).collect());
1831
1832        let prefetch_proof_targets =
1833            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1834
1835        // check that the prefetch proof targets are the same because the fetched proof targets
1836        // don't overlap with the prefetch targets
1837        assert_eq!(prefetch_proof_targets, targets);
1838    }
1839
1840    #[test]
1841    fn test_get_prefetch_proof_targets_remove_subset() {
1842        let test_provider_factory = create_test_provider_factory();
1843        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1844
1845        // populate some targe
1846        let mut targets = MultiProofTargets::default();
1847        let addr1 = B256::random();
1848        let addr2 = B256::random();
1849        let slot1 = B256::random();
1850        let slot2 = B256::random();
1851        targets.insert(addr1, std::iter::once(slot1).collect());
1852        targets.insert(addr2, std::iter::once(slot2).collect());
1853
1854        // add a subset of the first target to fetched proof targets
1855        test_state_root_task.fetched_proof_targets.insert(addr1, std::iter::once(slot1).collect());
1856
1857        let prefetch_proof_targets =
1858            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1859
1860        // check that the prefetch proof targets do not include the subset
1861        assert_eq!(prefetch_proof_targets.len(), 1);
1862        assert!(!prefetch_proof_targets.contains_key(&addr1));
1863        assert!(prefetch_proof_targets.contains_key(&addr2));
1864
1865        // now add one more slot to the prefetch targets
1866        let slot3 = B256::random();
1867        targets.get_mut(&addr1).unwrap().insert(slot3);
1868
1869        let prefetch_proof_targets =
1870            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1871
1872        // check that the prefetch proof targets do not include the subset
1873        // but include the new slot
1874        assert_eq!(prefetch_proof_targets.len(), 2);
1875        assert!(prefetch_proof_targets.contains_key(&addr1));
1876        assert_eq!(
1877            *prefetch_proof_targets.get(&addr1).unwrap(),
1878            std::iter::once(slot3).collect::<B256Set>()
1879        );
1880        assert!(prefetch_proof_targets.contains_key(&addr2));
1881        assert_eq!(
1882            *prefetch_proof_targets.get(&addr2).unwrap(),
1883            std::iter::once(slot2).collect::<B256Set>()
1884        );
1885    }
1886
1887    #[test]
1888    fn test_get_proof_targets_with_removed_storage_keys() {
1889        let mut state = HashedPostState::default();
1890        let mut fetched = MultiProofTargets::default();
1891        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1892
1893        let addr = B256::random();
1894        let slot1 = B256::random();
1895        let slot2 = B256::random();
1896
1897        // add account to state
1898        state.accounts.insert(addr, Some(Default::default()));
1899
1900        // add storage updates
1901        let mut storage = HashedStorage::default();
1902        storage.storage.insert(slot1, U256::from(100));
1903        storage.storage.insert(slot2, U256::from(200));
1904        state.storages.insert(addr, storage);
1905
1906        // mark slot1 as already fetched
1907        let mut fetched_slots = HashSet::default();
1908        fetched_slots.insert(slot1);
1909        fetched.insert(addr, fetched_slots);
1910
1911        // update multi_added_removed_keys to mark slot1 as removed
1912        let mut removed_state = HashedPostState::default();
1913        let mut removed_storage = HashedStorage::default();
1914        removed_storage.storage.insert(slot1, U256::ZERO); // U256::ZERO marks as removed
1915        removed_state.storages.insert(addr, removed_storage);
1916        multi_added_removed_keys.update_with_state(&removed_state);
1917
1918        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1919
1920        // slot1 should be included despite being fetched, because it's marked as removed
1921        assert!(targets.contains_key(&addr));
1922        let target_slots = &targets[&addr];
1923        assert_eq!(target_slots.len(), 2);
1924        assert!(target_slots.contains(&slot1)); // included because it's removed
1925        assert!(target_slots.contains(&slot2)); // included because it's not fetched
1926    }
1927
1928    #[test]
1929    fn test_get_proof_targets_with_wiped_storage() {
1930        let mut state = HashedPostState::default();
1931        let fetched = MultiProofTargets::default();
1932        let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1933
1934        let addr = B256::random();
1935        let slot1 = B256::random();
1936
1937        // add account to state
1938        state.accounts.insert(addr, Some(Default::default()));
1939
1940        // add wiped storage
1941        let mut storage = HashedStorage::new(true);
1942        storage.storage.insert(slot1, U256::from(100));
1943        state.storages.insert(addr, storage);
1944
1945        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1946
1947        // account should be included because storage is wiped and account wasn't fetched
1948        assert!(targets.contains_key(&addr));
1949        let target_slots = &targets[&addr];
1950        assert_eq!(target_slots.len(), 1);
1951        assert!(target_slots.contains(&slot1));
1952    }
1953
1954    #[test]
1955    fn test_get_proof_targets_removed_keys_not_in_state_update() {
1956        let mut state = HashedPostState::default();
1957        let mut fetched = MultiProofTargets::default();
1958        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1959
1960        let addr = B256::random();
1961        let slot1 = B256::random();
1962        let slot2 = B256::random();
1963        let slot3 = B256::random();
1964
1965        // add account to state
1966        state.accounts.insert(addr, Some(Default::default()));
1967
1968        // add storage updates for slot1 and slot2 only
1969        let mut storage = HashedStorage::default();
1970        storage.storage.insert(slot1, U256::from(100));
1971        storage.storage.insert(slot2, U256::from(200));
1972        state.storages.insert(addr, storage);
1973
1974        // mark all slots as already fetched
1975        let mut fetched_slots = HashSet::default();
1976        fetched_slots.insert(slot1);
1977        fetched_slots.insert(slot2);
1978        fetched_slots.insert(slot3); // slot3 is fetched but not in state update
1979        fetched.insert(addr, fetched_slots);
1980
1981        // mark slot3 as removed (even though it's not in the state update)
1982        let mut removed_state = HashedPostState::default();
1983        let mut removed_storage = HashedStorage::default();
1984        removed_storage.storage.insert(slot3, U256::ZERO);
1985        removed_state.storages.insert(addr, removed_storage);
1986        multi_added_removed_keys.update_with_state(&removed_state);
1987
1988        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1989
1990        // only slots in the state update can be included, so slot3 should not appear
1991        assert!(!targets.contains_key(&addr));
1992    }
1993
1994    /// Verifies that consecutive prefetch proof messages are batched together.
1995    #[test]
1996    fn test_prefetch_proofs_batching() {
1997        let test_provider_factory = create_test_provider_factory();
1998        let mut task = create_test_state_root_task(test_provider_factory);
1999
2000        // send multiple messages
2001        let addr1 = B256::random();
2002        let addr2 = B256::random();
2003        let addr3 = B256::random();
2004
2005        let mut targets1 = MultiProofTargets::default();
2006        targets1.insert(addr1, HashSet::default());
2007
2008        let mut targets2 = MultiProofTargets::default();
2009        targets2.insert(addr2, HashSet::default());
2010
2011        let mut targets3 = MultiProofTargets::default();
2012        targets3.insert(addr3, HashSet::default());
2013
2014        let tx = task.state_root_message_sender();
2015        tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
2016        tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
2017        tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap();
2018
2019        let proofs_requested =
2020            if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2021                // simulate the batching logic
2022                let mut merged_targets = targets;
2023                let mut num_batched = 1;
2024                while let Ok(MultiProofMessage::PrefetchProofs(next_targets)) = task.rx.try_recv() {
2025                    merged_targets.extend(next_targets);
2026                    num_batched += 1;
2027                }
2028
2029                assert_eq!(num_batched, 3);
2030                assert_eq!(merged_targets.len(), 3);
2031                assert!(merged_targets.contains_key(&addr1));
2032                assert!(merged_targets.contains_key(&addr2));
2033                assert!(merged_targets.contains_key(&addr3));
2034
2035                task.on_prefetch_proof(merged_targets)
2036            } else {
2037                panic!("Expected PrefetchProofs message");
2038            };
2039
2040        assert_eq!(proofs_requested, 1);
2041    }
2042
2043    /// Verifies that consecutive state update messages from the same source are batched together.
2044    #[test]
2045    fn test_state_update_batching() {
2046        use alloy_evm::block::StateChangeSource;
2047        use revm_state::Account;
2048
2049        let test_provider_factory = create_test_provider_factory();
2050        let mut task = create_test_state_root_task(test_provider_factory);
2051
2052        // create multiple state updates
2053        let addr1 = alloy_primitives::Address::random();
2054        let addr2 = alloy_primitives::Address::random();
2055
2056        let mut update1 = EvmState::default();
2057        update1.insert(
2058            addr1,
2059            Account {
2060                info: revm_state::AccountInfo {
2061                    balance: U256::from(100),
2062                    nonce: 1,
2063                    code_hash: Default::default(),
2064                    code: Default::default(),
2065                },
2066                transaction_id: Default::default(),
2067                storage: Default::default(),
2068                status: revm_state::AccountStatus::Touched,
2069            },
2070        );
2071
2072        let mut update2 = EvmState::default();
2073        update2.insert(
2074            addr2,
2075            Account {
2076                info: revm_state::AccountInfo {
2077                    balance: U256::from(200),
2078                    nonce: 2,
2079                    code_hash: Default::default(),
2080                    code: Default::default(),
2081                },
2082                transaction_id: Default::default(),
2083                storage: Default::default(),
2084                status: revm_state::AccountStatus::Touched,
2085            },
2086        );
2087
2088        let source = StateChangeSource::Transaction(0);
2089
2090        let tx = task.state_root_message_sender();
2091        tx.send(MultiProofMessage::StateUpdate(source.into(), update1.clone())).unwrap();
2092        tx.send(MultiProofMessage::StateUpdate(source.into(), update2.clone())).unwrap();
2093
2094        let proofs_requested =
2095            if let Ok(MultiProofMessage::StateUpdate(_src, update)) = task.rx.recv() {
2096                let mut merged_update = update;
2097                let mut num_batched = 1;
2098
2099                while let Ok(MultiProofMessage::StateUpdate(_next_source, next_update)) =
2100                    task.rx.try_recv()
2101                {
2102                    merged_update.extend(next_update);
2103                    num_batched += 1;
2104                }
2105
2106                assert_eq!(num_batched, 2);
2107                assert_eq!(merged_update.len(), 2);
2108                assert!(merged_update.contains_key(&addr1));
2109                assert!(merged_update.contains_key(&addr2));
2110
2111                task.on_state_update(source.into(), merged_update)
2112            } else {
2113                panic!("Expected StateUpdate message");
2114            };
2115        assert_eq!(proofs_requested, 1);
2116    }
2117
2118    /// Verifies that state updates from different sources are not batched together.
2119    #[test]
2120    fn test_state_update_batching_separates_sources() {
2121        use alloy_evm::block::StateChangeSource;
2122        use revm_state::Account;
2123
2124        let test_provider_factory = create_test_provider_factory();
2125        let task = create_test_state_root_task(test_provider_factory);
2126
2127        let addr_a1 = alloy_primitives::Address::random();
2128        let addr_b1 = alloy_primitives::Address::random();
2129        let addr_a2 = alloy_primitives::Address::random();
2130
2131        let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
2132            let mut state = EvmState::default();
2133            state.insert(
2134                addr,
2135                Account {
2136                    info: revm_state::AccountInfo {
2137                        balance: U256::from(balance),
2138                        nonce: 1,
2139                        code_hash: Default::default(),
2140                        code: Default::default(),
2141                    },
2142                    transaction_id: Default::default(),
2143                    storage: Default::default(),
2144                    status: revm_state::AccountStatus::Touched,
2145                },
2146            );
2147            state
2148        };
2149
2150        let source_a = StateChangeSource::Transaction(1);
2151        let source_b = StateChangeSource::Transaction(2);
2152
2153        // Queue: A1 (immediate dispatch), B1 (batched), A2 (should become pending)
2154        let tx = task.state_root_message_sender();
2155        tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a1, 100)))
2156            .unwrap();
2157        tx.send(MultiProofMessage::StateUpdate(source_b.into(), create_state_update(addr_b1, 200)))
2158            .unwrap();
2159        tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a2, 300)))
2160            .unwrap();
2161
2162        let mut pending_msg: Option<MultiProofMessage> = None;
2163
2164        if let Ok(MultiProofMessage::StateUpdate(first_source, _)) = task.rx.recv() {
2165            assert!(same_source(first_source, source_a.into()));
2166
2167            // Simulate batching loop for remaining messages
2168            let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new();
2169            let mut accumulated_targets = 0usize;
2170
2171            loop {
2172                if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
2173                    break;
2174                }
2175                match task.rx.try_recv() {
2176                    Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
2177                        if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
2178                            !can_batch_state_update(
2179                                *batch_source,
2180                                batch_update,
2181                                next_source,
2182                                &next_update,
2183                            )
2184                        {
2185                            pending_msg =
2186                                Some(MultiProofMessage::StateUpdate(next_source, next_update));
2187                            break;
2188                        }
2189
2190                        let next_estimate = estimate_evm_state_targets(&next_update);
2191                        if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
2192                            pending_msg =
2193                                Some(MultiProofMessage::StateUpdate(next_source, next_update));
2194                            break;
2195                        }
2196                        if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
2197                            !accumulated_updates.is_empty()
2198                        {
2199                            pending_msg =
2200                                Some(MultiProofMessage::StateUpdate(next_source, next_update));
2201                            break;
2202                        }
2203                        accumulated_targets += next_estimate;
2204                        accumulated_updates.push((next_source, next_update));
2205                    }
2206                    Ok(other_msg) => {
2207                        pending_msg = Some(other_msg);
2208                        break;
2209                    }
2210                    Err(_) => break,
2211                }
2212            }
2213
2214            assert_eq!(accumulated_updates.len(), 1, "Should only batch matching sources");
2215            let batch_source = accumulated_updates[0].0;
2216            assert!(same_source(batch_source, source_b.into()));
2217
2218            let batch_source = accumulated_updates[0].0;
2219            let mut merged_update = accumulated_updates.remove(0).1;
2220            for (_, next_update) in accumulated_updates {
2221                merged_update.extend(next_update);
2222            }
2223
2224            assert!(same_source(batch_source, source_b.into()), "Batch should use matching source");
2225            assert!(merged_update.contains_key(&addr_b1));
2226            assert!(!merged_update.contains_key(&addr_a1));
2227            assert!(!merged_update.contains_key(&addr_a2));
2228        } else {
2229            panic!("Expected first StateUpdate");
2230        }
2231
2232        match pending_msg {
2233            Some(MultiProofMessage::StateUpdate(pending_source, pending_update)) => {
2234                assert!(same_source(pending_source, source_a.into()));
2235                assert!(pending_update.contains_key(&addr_a2));
2236            }
2237            other => panic!("Expected pending StateUpdate with source_a, got {:?}", other),
2238        }
2239    }
2240
2241    /// Verifies that pre-block updates only batch when their payloads are identical.
2242    #[test]
2243    fn test_pre_block_updates_require_payload_match_to_batch() {
2244        use alloy_evm::block::{StateChangePreBlockSource, StateChangeSource};
2245        use revm_state::Account;
2246
2247        let test_provider_factory = create_test_provider_factory();
2248        let task = create_test_state_root_task(test_provider_factory);
2249
2250        let addr1 = alloy_primitives::Address::random();
2251        let addr2 = alloy_primitives::Address::random();
2252        let addr3 = alloy_primitives::Address::random();
2253
2254        let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
2255            let mut state = EvmState::default();
2256            state.insert(
2257                addr,
2258                Account {
2259                    info: revm_state::AccountInfo {
2260                        balance: U256::from(balance),
2261                        nonce: 1,
2262                        code_hash: Default::default(),
2263                        code: Default::default(),
2264                    },
2265                    transaction_id: Default::default(),
2266                    storage: Default::default(),
2267                    status: revm_state::AccountStatus::Touched,
2268                },
2269            );
2270            state
2271        };
2272
2273        let source = StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
2274
2275        // Queue: first update dispatched immediately, next two should not merge
2276        let tx = task.state_root_message_sender();
2277        tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr1, 100)))
2278            .unwrap();
2279        tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr2, 200)))
2280            .unwrap();
2281        tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr3, 300)))
2282            .unwrap();
2283
2284        let mut pending_msg: Option<MultiProofMessage> = None;
2285
2286        if let Ok(MultiProofMessage::StateUpdate(first_source, first_update)) = task.rx.recv() {
2287            assert!(same_source(first_source, source.into()));
2288            assert!(first_update.contains_key(&addr1));
2289
2290            let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new();
2291            let mut accumulated_targets = 0usize;
2292
2293            loop {
2294                if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
2295                    break;
2296                }
2297                match task.rx.try_recv() {
2298                    Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
2299                        if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
2300                            !can_batch_state_update(
2301                                *batch_source,
2302                                batch_update,
2303                                next_source,
2304                                &next_update,
2305                            )
2306                        {
2307                            pending_msg =
2308                                Some(MultiProofMessage::StateUpdate(next_source, next_update));
2309                            break;
2310                        }
2311
2312                        let next_estimate = estimate_evm_state_targets(&next_update);
2313                        if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
2314                            pending_msg =
2315                                Some(MultiProofMessage::StateUpdate(next_source, next_update));
2316                            break;
2317                        }
2318                        if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
2319                            !accumulated_updates.is_empty()
2320                        {
2321                            pending_msg =
2322                                Some(MultiProofMessage::StateUpdate(next_source, next_update));
2323                            break;
2324                        }
2325                        accumulated_targets += next_estimate;
2326                        accumulated_updates.push((next_source, next_update));
2327                    }
2328                    Ok(other_msg) => {
2329                        pending_msg = Some(other_msg);
2330                        break;
2331                    }
2332                    Err(_) => break,
2333                }
2334            }
2335
2336            assert_eq!(
2337                accumulated_updates.len(),
2338                1,
2339                "Second pre-block update should not merge with a different payload"
2340            );
2341            let (batched_source, batched_update) = accumulated_updates.remove(0);
2342            assert!(same_source(batched_source, source.into()));
2343            assert!(batched_update.contains_key(&addr2));
2344            assert!(!batched_update.contains_key(&addr3));
2345
2346            match pending_msg {
2347                Some(MultiProofMessage::StateUpdate(_, pending_update)) => {
2348                    assert!(pending_update.contains_key(&addr3));
2349                }
2350                other => panic!("Expected pending third pre-block update, got {:?}", other),
2351            }
2352        } else {
2353            panic!("Expected first StateUpdate");
2354        }
2355    }
2356
2357    /// Verifies that different message types arriving mid-batch are not lost and preserve order.
2358    #[test]
2359    fn test_batching_preserves_ordering_with_different_message_type() {
2360        use alloy_evm::block::StateChangeSource;
2361        use revm_state::Account;
2362
2363        let test_provider_factory = create_test_provider_factory();
2364        let task = create_test_state_root_task(test_provider_factory);
2365
2366        let addr1 = B256::random();
2367        let addr2 = B256::random();
2368        let addr3 = B256::random();
2369        let state_addr1 = alloy_primitives::Address::random();
2370        let state_addr2 = alloy_primitives::Address::random();
2371
2372        // Create PrefetchProofs targets
2373        let mut targets1 = MultiProofTargets::default();
2374        targets1.insert(addr1, HashSet::default());
2375
2376        let mut targets2 = MultiProofTargets::default();
2377        targets2.insert(addr2, HashSet::default());
2378
2379        let mut targets3 = MultiProofTargets::default();
2380        targets3.insert(addr3, HashSet::default());
2381
2382        // Create StateUpdate 1
2383        let mut state_update1 = EvmState::default();
2384        state_update1.insert(
2385            state_addr1,
2386            Account {
2387                info: revm_state::AccountInfo {
2388                    balance: U256::from(100),
2389                    nonce: 1,
2390                    code_hash: Default::default(),
2391                    code: Default::default(),
2392                },
2393                transaction_id: Default::default(),
2394                storage: Default::default(),
2395                status: revm_state::AccountStatus::Touched,
2396            },
2397        );
2398
2399        // Create StateUpdate 2
2400        let mut state_update2 = EvmState::default();
2401        state_update2.insert(
2402            state_addr2,
2403            Account {
2404                info: revm_state::AccountInfo {
2405                    balance: U256::from(200),
2406                    nonce: 2,
2407                    code_hash: Default::default(),
2408                    code: Default::default(),
2409                },
2410                transaction_id: Default::default(),
2411                storage: Default::default(),
2412                status: revm_state::AccountStatus::Touched,
2413            },
2414        );
2415
2416        let source = StateChangeSource::Transaction(42);
2417
2418        // Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3]
2419        let tx = task.state_root_message_sender();
2420        tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
2421        tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
2422        tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap();
2423        tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap();
2424        tx.send(MultiProofMessage::PrefetchProofs(targets3.clone())).unwrap();
2425
2426        // Step 1: Receive and batch PrefetchProofs (should get targets1 + targets2)
2427        let mut pending_msg: Option<MultiProofMessage> = None;
2428        if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2429            let mut merged_targets = targets;
2430            let mut num_batched = 1;
2431
2432            loop {
2433                match task.rx.try_recv() {
2434                    Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
2435                        merged_targets.extend(next_targets);
2436                        num_batched += 1;
2437                    }
2438                    Ok(other_msg) => {
2439                        // Store locally to preserve ordering (the fix)
2440                        pending_msg = Some(other_msg);
2441                        break;
2442                    }
2443                    Err(_) => break,
2444                }
2445            }
2446
2447            // Should have batched exactly 2 PrefetchProofs (not 3!)
2448            assert_eq!(num_batched, 2, "Should batch only until different message type");
2449            assert_eq!(merged_targets.len(), 2);
2450            assert!(merged_targets.contains_key(&addr1));
2451            assert!(merged_targets.contains_key(&addr2));
2452            assert!(!merged_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
2453        } else {
2454            panic!("Expected PrefetchProofs message");
2455        }
2456
2457        // Step 2: The pending message should be StateUpdate1 (preserved ordering)
2458        match pending_msg {
2459            Some(MultiProofMessage::StateUpdate(_src, update)) => {
2460                assert!(update.contains_key(&state_addr1), "Should be first StateUpdate");
2461            }
2462            _ => panic!("StateUpdate1 was lost or reordered! The ordering fix is broken."),
2463        }
2464
2465        // Step 3: Next in channel should be StateUpdate2
2466        match task.rx.try_recv() {
2467            Ok(MultiProofMessage::StateUpdate(_src, update)) => {
2468                assert!(update.contains_key(&state_addr2), "Should be second StateUpdate");
2469            }
2470            _ => panic!("StateUpdate2 was lost!"),
2471        }
2472
2473        // Step 4: Next in channel should be PrefetchProofs3
2474        match task.rx.try_recv() {
2475            Ok(MultiProofMessage::PrefetchProofs(targets)) => {
2476                assert_eq!(targets.len(), 1);
2477                assert!(targets.contains_key(&addr3));
2478            }
2479            _ => panic!("PrefetchProofs3 was lost!"),
2480        }
2481    }
2482
2483    /// Verifies that a pending message is processed before the next loop iteration (ordering).
2484    #[test]
2485    fn test_pending_message_processed_before_next_iteration() {
2486        use alloy_evm::block::StateChangeSource;
2487        use revm_state::Account;
2488
2489        let test_provider_factory = create_test_provider_factory();
2490        let test_provider = create_cached_provider(test_provider_factory.clone());
2491        let mut task = create_test_state_root_task(test_provider_factory);
2492
2493        // Queue: Prefetch1, StateUpdate, Prefetch2
2494        let prefetch_addr1 = B256::random();
2495        let prefetch_addr2 = B256::random();
2496        let mut prefetch1 = MultiProofTargets::default();
2497        prefetch1.insert(prefetch_addr1, HashSet::default());
2498        let mut prefetch2 = MultiProofTargets::default();
2499        prefetch2.insert(prefetch_addr2, HashSet::default());
2500
2501        let state_addr = alloy_primitives::Address::random();
2502        let mut state_update = EvmState::default();
2503        state_update.insert(
2504            state_addr,
2505            Account {
2506                info: revm_state::AccountInfo {
2507                    balance: U256::from(42),
2508                    nonce: 1,
2509                    code_hash: Default::default(),
2510                    code: Default::default(),
2511                },
2512                transaction_id: Default::default(),
2513                storage: Default::default(),
2514                status: revm_state::AccountStatus::Touched,
2515            },
2516        );
2517
2518        let source = StateChangeSource::Transaction(99);
2519
2520        let tx = task.state_root_message_sender();
2521        tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap();
2522        tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap();
2523        tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
2524
2525        let mut ctx = MultiproofBatchCtx::new(Instant::now());
2526        let mut batch_metrics = MultiproofBatchMetrics::default();
2527
2528        // First message: Prefetch1 batches; StateUpdate becomes pending.
2529        let first = task.rx.recv().unwrap();
2530        assert!(matches!(first, MultiProofMessage::PrefetchProofs(_)));
2531        assert!(!task.process_multiproof_message(
2532            first,
2533            &mut ctx,
2534            &mut batch_metrics,
2535            &test_provider
2536        ));
2537        let pending = ctx.pending_msg.take().expect("pending message captured");
2538        assert!(matches!(pending, MultiProofMessage::StateUpdate(_, _)));
2539
2540        // Pending message should be handled before the next select loop.
2541        assert!(!task.process_multiproof_message(
2542            pending,
2543            &mut ctx,
2544            &mut batch_metrics,
2545            &test_provider
2546        ));
2547
2548        // Prefetch2 should now be in pending_msg (captured by StateUpdate's batching loop).
2549        match ctx.pending_msg.take() {
2550            Some(MultiProofMessage::PrefetchProofs(targets)) => {
2551                assert_eq!(targets.len(), 1);
2552                assert!(targets.contains_key(&prefetch_addr2));
2553            }
2554            other => panic!("Expected remaining PrefetchProofs2 in pending_msg, got {:?}", other),
2555        }
2556    }
2557
2558    /// Verifies that pending messages from a previous batch drain get full batching treatment.
2559    #[test]
2560    fn test_pending_messages_get_full_batching_treatment() {
2561        // Queue: [Prefetch1, State1, State2, State3, Prefetch2]
2562        //
2563        // Expected behavior:
2564        // 1. recv() → Prefetch1
2565        // 2. try_recv() → State1 is different type → pending = State1, break
2566        // 3. Process Prefetch1
2567        // 4. Next iteration: pending = State1 → process with batching
2568        // 5. try_recv() → State2 same type → merge
2569        // 6. try_recv() → State3 same type → merge
2570        // 7. try_recv() → Prefetch2 different type → pending = Prefetch2, break
2571        // 8. Process merged State (1+2+3)
2572        //
2573        // Without the state-machine fix, State1 would be processed alone (no batching).
2574        use alloy_evm::block::StateChangeSource;
2575        use revm_state::Account;
2576
2577        let test_provider_factory = create_test_provider_factory();
2578        let task = create_test_state_root_task(test_provider_factory);
2579
2580        let prefetch_addr1 = B256::random();
2581        let prefetch_addr2 = B256::random();
2582        let state_addr1 = alloy_primitives::Address::random();
2583        let state_addr2 = alloy_primitives::Address::random();
2584        let state_addr3 = alloy_primitives::Address::random();
2585
2586        // Create Prefetch targets
2587        let mut prefetch1 = MultiProofTargets::default();
2588        prefetch1.insert(prefetch_addr1, HashSet::default());
2589
2590        let mut prefetch2 = MultiProofTargets::default();
2591        prefetch2.insert(prefetch_addr2, HashSet::default());
2592
2593        // Create StateUpdates
2594        let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
2595            let mut state = EvmState::default();
2596            state.insert(
2597                addr,
2598                Account {
2599                    info: revm_state::AccountInfo {
2600                        balance: U256::from(balance),
2601                        nonce: 1,
2602                        code_hash: Default::default(),
2603                        code: Default::default(),
2604                    },
2605                    transaction_id: Default::default(),
2606                    storage: Default::default(),
2607                    status: revm_state::AccountStatus::Touched,
2608                },
2609            );
2610            state
2611        };
2612
2613        let source = StateChangeSource::Transaction(42);
2614
2615        // Queue: [Prefetch1, State1, State2, State3, Prefetch2]
2616        let tx = task.state_root_message_sender();
2617        tx.send(MultiProofMessage::PrefetchProofs(prefetch1.clone())).unwrap();
2618        tx.send(MultiProofMessage::StateUpdate(
2619            source.into(),
2620            create_state_update(state_addr1, 100),
2621        ))
2622        .unwrap();
2623        tx.send(MultiProofMessage::StateUpdate(
2624            source.into(),
2625            create_state_update(state_addr2, 200),
2626        ))
2627        .unwrap();
2628        tx.send(MultiProofMessage::StateUpdate(
2629            source.into(),
2630            create_state_update(state_addr3, 300),
2631        ))
2632        .unwrap();
2633        tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
2634
2635        // Simulate the state-machine loop behavior
2636        let mut pending_msg: Option<MultiProofMessage> = None;
2637
2638        // First iteration: recv() gets Prefetch1, drains until State1
2639        if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2640            let mut merged_targets = targets;
2641            loop {
2642                match task.rx.try_recv() {
2643                    Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
2644                        merged_targets.extend(next_targets);
2645                    }
2646                    Ok(other_msg) => {
2647                        pending_msg = Some(other_msg);
2648                        break;
2649                    }
2650                    Err(_) => break,
2651                }
2652            }
2653            // Should have only Prefetch1 (State1 is different type)
2654            assert_eq!(merged_targets.len(), 1);
2655            assert!(merged_targets.contains_key(&prefetch_addr1));
2656        } else {
2657            panic!("Expected PrefetchProofs");
2658        }
2659
2660        // Pending should be State1
2661        assert!(matches!(pending_msg, Some(MultiProofMessage::StateUpdate(_, _))));
2662
2663        // Second iteration: process pending State1 WITH BATCHING
2664        // This is the key test - the pending message should drain State2 and State3
2665        if let Some(MultiProofMessage::StateUpdate(_src, first_update)) = pending_msg.take() {
2666            let mut merged_update = first_update;
2667            let mut num_batched = 1;
2668
2669            loop {
2670                match task.rx.try_recv() {
2671                    Ok(MultiProofMessage::StateUpdate(_src, next_update)) => {
2672                        merged_update.extend(next_update);
2673                        num_batched += 1;
2674                    }
2675                    Ok(other_msg) => {
2676                        pending_msg = Some(other_msg);
2677                        break;
2678                    }
2679                    Err(_) => break,
2680                }
2681            }
2682
2683            // THE KEY ASSERTION: pending State1 should have batched with State2 and State3
2684            assert_eq!(
2685                num_batched, 3,
2686                "Pending message should get full batching treatment and merge all 3 StateUpdates"
2687            );
2688            assert_eq!(merged_update.len(), 3, "Should have all 3 addresses in merged update");
2689            assert!(merged_update.contains_key(&state_addr1));
2690            assert!(merged_update.contains_key(&state_addr2));
2691            assert!(merged_update.contains_key(&state_addr3));
2692        } else {
2693            panic!("Expected pending StateUpdate");
2694        }
2695
2696        // Pending should now be Prefetch2
2697        match pending_msg {
2698            Some(MultiProofMessage::PrefetchProofs(targets)) => {
2699                assert_eq!(targets.len(), 1);
2700                assert!(targets.contains_key(&prefetch_addr2));
2701            }
2702            _ => panic!("Prefetch2 was lost!"),
2703        }
2704    }
2705
2706    /// Verifies that BAL messages are processed correctly and generate state updates.
2707    #[test]
2708    fn test_bal_message_processing() {
2709        let test_provider_factory = create_test_provider_factory();
2710        let test_provider = create_cached_provider(test_provider_factory.clone());
2711        let mut task = create_test_state_root_task(test_provider_factory);
2712
2713        // Create a simple BAL with one account change
2714        let account_address = Address::random();
2715        let account_changes = AccountChanges {
2716            address: account_address,
2717            balance_changes: vec![BalanceChange::new(0, U256::from(1000))],
2718            nonce_changes: vec![],
2719            code_changes: vec![],
2720            storage_changes: vec![],
2721            storage_reads: vec![],
2722        };
2723
2724        let bal = Arc::new(vec![account_changes]);
2725
2726        let mut ctx = MultiproofBatchCtx::new(Instant::now());
2727        let mut batch_metrics = MultiproofBatchMetrics::default();
2728
2729        let should_finish = task.process_multiproof_message(
2730            MultiProofMessage::BlockAccessList(bal),
2731            &mut ctx,
2732            &mut batch_metrics,
2733            &test_provider,
2734        );
2735
2736        // BAL should mark updates as finished
2737        assert!(ctx.updates_finished_time.is_some());
2738
2739        // Should have dispatched state update proofs
2740        assert!(batch_metrics.state_update_proofs_requested > 0);
2741
2742        // Should need to wait for the results of those proofs to arrive
2743        assert!(!should_finish, "Should continue waiting for proofs");
2744    }
2745}