reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{
6    keccak256,
7    map::{B256Set, HashSet},
8    B256,
9};
10use dashmap::DashMap;
11use derive_more::derive::Deref;
12use metrics::Histogram;
13use reth_errors::ProviderError;
14use reth_metrics::Metrics;
15use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx};
16use reth_revm::state::EvmState;
17use reth_trie::{
18    added_removed_keys::MultiAddedRemovedKeys, prefix_set::TriePrefixSetsMut,
19    updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState, HashedPostStateSorted,
20    HashedStorage, MultiProofTargets, TrieInput,
21};
22use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle};
23use std::{
24    collections::{BTreeMap, VecDeque},
25    ops::DerefMut,
26    sync::{
27        mpsc::{channel, Receiver, Sender},
28        Arc,
29    },
30    time::{Duration, Instant},
31};
32use tracing::{debug, error, trace};
33
34/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
35/// state.
36#[derive(Default, Debug)]
37pub struct SparseTrieUpdate {
38    /// The state update that was used to calculate the proof
39    pub(crate) state: HashedPostState,
40    /// The calculated multiproof
41    pub(crate) multiproof: DecodedMultiProof,
42}
43
44impl SparseTrieUpdate {
45    /// Returns true if the update is empty.
46    pub(super) fn is_empty(&self) -> bool {
47        self.state.is_empty() && self.multiproof.is_empty()
48    }
49
50    /// Construct update from multiproof.
51    #[cfg(test)]
52    pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
53        Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
54    }
55
56    /// Extend update with contents of the other.
57    pub(super) fn extend(&mut self, other: Self) {
58        self.state.extend(other.state);
59        self.multiproof.extend(other.multiproof);
60    }
61}
62
63/// Common configuration for multi proof tasks
64#[derive(Debug, Clone)]
65pub(super) struct MultiProofConfig<Factory> {
66    /// View over the state in the database.
67    pub consistent_view: ConsistentDbView<Factory>,
68    /// The sorted collection of cached in-memory intermediate trie nodes that
69    /// can be reused for computation.
70    pub nodes_sorted: Arc<TrieUpdatesSorted>,
71    /// The sorted in-memory overlay hashed state.
72    pub state_sorted: Arc<HashedPostStateSorted>,
73    /// The collection of prefix sets for the computation. Since the prefix sets _always_
74    /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here,
75    /// if we have cached nodes for them.
76    pub prefix_sets: Arc<TriePrefixSetsMut>,
77}
78
79impl<Factory> MultiProofConfig<Factory> {
80    /// Creates a new state root config from the consistent view and the trie input.
81    ///
82    /// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the
83    /// [`TrieInput`].
84    pub(super) fn new_from_input(
85        consistent_view: ConsistentDbView<Factory>,
86        mut input: TrieInput,
87    ) -> (TrieInput, Self) {
88        let config = Self {
89            consistent_view,
90            nodes_sorted: Arc::new(input.nodes.drain_into_sorted()),
91            state_sorted: Arc::new(input.state.drain_into_sorted()),
92            prefix_sets: Arc::new(input.prefix_sets.clone()),
93        };
94        (input.cleared(), config)
95    }
96}
97
98/// Messages used internally by the multi proof task.
99#[derive(Debug)]
100pub(super) enum MultiProofMessage {
101    /// Prefetch proof targets
102    PrefetchProofs(MultiProofTargets),
103    /// New state update from transaction execution with its source
104    StateUpdate(StateChangeSource, EvmState),
105    /// State update that can be applied to the sparse trie without any new proofs.
106    ///
107    /// It can be the case when all accounts and storage slots from the state update were already
108    /// fetched and revealed.
109    EmptyProof {
110        /// The index of this proof in the sequence of state updates
111        sequence_number: u64,
112        /// The state update that was used to calculate the proof
113        state: HashedPostState,
114    },
115    /// Proof calculation completed for a specific state update
116    ProofCalculated(Box<ProofCalculated>),
117    /// Error during proof calculation
118    ProofCalculationError(ProviderError),
119    /// Signals state update stream end.
120    ///
121    /// This is triggered by block execution, indicating that no additional state updates are
122    /// expected.
123    FinishedStateUpdates,
124}
125
126/// Message about completion of proof calculation for a specific state update
127#[derive(Debug)]
128pub(super) struct ProofCalculated {
129    /// The index of this proof in the sequence of state updates
130    sequence_number: u64,
131    /// Sparse trie update
132    update: SparseTrieUpdate,
133    /// The time taken to calculate the proof.
134    elapsed: Duration,
135}
136
137/// Handle to track proof calculation ordering.
138#[derive(Debug, Default)]
139struct ProofSequencer {
140    /// The next proof sequence number to be produced.
141    next_sequence: u64,
142    /// The next sequence number expected to be delivered.
143    next_to_deliver: u64,
144    /// Buffer for out-of-order proofs and corresponding state updates
145    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
146}
147
148impl ProofSequencer {
149    /// Gets the next sequence number and increments the counter
150    const fn next_sequence(&mut self) -> u64 {
151        let seq = self.next_sequence;
152        self.next_sequence += 1;
153        seq
154    }
155
156    /// Adds a proof with the corresponding state update and returns all sequential proofs and state
157    /// updates if we have a continuous sequence
158    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
159        if sequence >= self.next_to_deliver {
160            self.pending_proofs.insert(sequence, update);
161        }
162
163        // return early if we don't have the next expected proof
164        if !self.pending_proofs.contains_key(&self.next_to_deliver) {
165            return Vec::new()
166        }
167
168        let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
169        let mut current_sequence = self.next_to_deliver;
170
171        // keep collecting proofs and state updates as long as we have consecutive sequence numbers
172        while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
173            consecutive_proofs.push(pending);
174            current_sequence += 1;
175
176            // if we don't have the next number, stop collecting
177            if !self.pending_proofs.contains_key(&current_sequence) {
178                break;
179            }
180        }
181
182        self.next_to_deliver += consecutive_proofs.len() as u64;
183
184        consecutive_proofs
185    }
186
187    /// Returns true if we still have pending proofs
188    pub(crate) fn has_pending(&self) -> bool {
189        !self.pending_proofs.is_empty()
190    }
191}
192
193/// A wrapper for the sender that signals completion when dropped.
194///
195/// This type is intended to be used in combination with the evm executor statehook.
196/// This should trigger once the block has been executed (after) the last state update has been
197/// sent. This triggers the exit condition of the multi proof task.
198#[derive(Deref, Debug)]
199pub(super) struct StateHookSender(Sender<MultiProofMessage>);
200
201impl StateHookSender {
202    pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
203        Self(inner)
204    }
205}
206
207impl Drop for StateHookSender {
208    fn drop(&mut self) {
209        // Send completion signal when the sender is dropped
210        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
211    }
212}
213
214pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
215    let mut hashed_state = HashedPostState::with_capacity(update.len());
216
217    for (address, account) in update {
218        if account.is_touched() {
219            let hashed_address = keccak256(address);
220            trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
221
222            let destroyed = account.is_selfdestructed();
223            let info = if destroyed { None } else { Some(account.info.into()) };
224            hashed_state.accounts.insert(hashed_address, info);
225
226            let mut changed_storage_iter = account
227                .storage
228                .into_iter()
229                .filter(|(_slot, value)| value.is_changed())
230                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
231                .peekable();
232
233            if destroyed {
234                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
235            } else if changed_storage_iter.peek().is_some() {
236                hashed_state
237                    .storages
238                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
239            }
240        }
241    }
242
243    hashed_state
244}
245
246/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
247#[derive(Debug)]
248enum PendingMultiproofTask<Factory> {
249    /// A storage multiproof task input.
250    Storage(StorageMultiproofInput<Factory>),
251    /// A regular multiproof task input.
252    Regular(MultiproofInput<Factory>),
253}
254
255impl<Factory> PendingMultiproofTask<Factory> {
256    /// Returns the proof sequence number of the task.
257    const fn proof_sequence_number(&self) -> u64 {
258        match self {
259            Self::Storage(input) => input.proof_sequence_number,
260            Self::Regular(input) => input.proof_sequence_number,
261        }
262    }
263
264    /// Returns whether or not the proof targets are empty.
265    fn proof_targets_is_empty(&self) -> bool {
266        match self {
267            Self::Storage(input) => input.proof_targets.is_empty(),
268            Self::Regular(input) => input.proof_targets.is_empty(),
269        }
270    }
271
272    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
273    fn send_empty_proof(self) {
274        match self {
275            Self::Storage(input) => input.send_empty_proof(),
276            Self::Regular(input) => input.send_empty_proof(),
277        }
278    }
279}
280
281impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
282    fn from(input: StorageMultiproofInput<Factory>) -> Self {
283        Self::Storage(input)
284    }
285}
286
287impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
288    fn from(input: MultiproofInput<Factory>) -> Self {
289        Self::Regular(input)
290    }
291}
292
293/// Input parameters for spawning a dedicated storage multiproof calculation.
294#[derive(Debug)]
295struct StorageMultiproofInput<Factory> {
296    config: MultiProofConfig<Factory>,
297    source: Option<StateChangeSource>,
298    hashed_state_update: HashedPostState,
299    hashed_address: B256,
300    proof_targets: B256Set,
301    proof_sequence_number: u64,
302    state_root_message_sender: Sender<MultiProofMessage>,
303    multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
304}
305
306impl<Factory> StorageMultiproofInput<Factory> {
307    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
308    fn send_empty_proof(self) {
309        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
310            sequence_number: self.proof_sequence_number,
311            state: self.hashed_state_update,
312        });
313    }
314}
315
316/// Input parameters for spawning a multiproof calculation.
317#[derive(Debug)]
318struct MultiproofInput<Factory> {
319    config: MultiProofConfig<Factory>,
320    source: Option<StateChangeSource>,
321    hashed_state_update: HashedPostState,
322    proof_targets: MultiProofTargets,
323    proof_sequence_number: u64,
324    state_root_message_sender: Sender<MultiProofMessage>,
325    multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
326}
327
328impl<Factory> MultiproofInput<Factory> {
329    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
330    fn send_empty_proof(self) {
331        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
332            sequence_number: self.proof_sequence_number,
333            state: self.hashed_state_update,
334        });
335    }
336}
337
338/// Manages concurrent multiproof calculations.
339/// Takes care of not having more calculations in flight than a given maximum
340/// concurrency, further calculation requests are queued and spawn later, after
341/// availability has been signaled.
342#[derive(Debug)]
343pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
344    /// Maximum number of concurrent calculations.
345    max_concurrent: usize,
346    /// Currently running calculations.
347    inflight: usize,
348    /// Queued calculations.
349    pending: VecDeque<PendingMultiproofTask<Factory>>,
350    /// Executor for tasks
351    executor: WorkloadExecutor,
352    /// Sender to the storage proof task.
353    storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
354    /// Cached storage proof roots for missed leaves; this maps
355    /// hashed (missed) addresses to their storage proof roots.
356    ///
357    /// It is important to cache these. Otherwise, a common account
358    /// (popular ERC-20, etc.) having missed leaves in its path would
359    /// repeatedly calculate these proofs per interacting transaction
360    /// (same account different slots).
361    ///
362    /// This also works well with chunking multiproofs, which may break
363    /// a big account change into different chunks, which may repeatedly
364    /// revisit missed leaves.
365    missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
366    /// Metrics
367    metrics: MultiProofTaskMetrics,
368}
369
370impl<Factory> MultiproofManager<Factory>
371where
372    Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
373{
374    /// Creates a new [`MultiproofManager`].
375    fn new(
376        executor: WorkloadExecutor,
377        metrics: MultiProofTaskMetrics,
378        storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
379        max_concurrent: usize,
380    ) -> Self {
381        Self {
382            pending: VecDeque::with_capacity(max_concurrent),
383            max_concurrent,
384            executor,
385            inflight: 0,
386            metrics,
387            storage_proof_task_handle,
388            missed_leaves_storage_roots: Default::default(),
389        }
390    }
391
392    const fn is_full(&self) -> bool {
393        self.inflight >= self.max_concurrent
394    }
395
396    /// Spawns a new multiproof calculation or enqueues it for later if
397    /// `max_concurrent` are already inflight.
398    fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
399        // If there are no proof targets, we can just send an empty multiproof back immediately
400        if input.proof_targets_is_empty() {
401            debug!(
402                sequence_number = input.proof_sequence_number(),
403                "No proof targets, sending empty multiproof back immediately"
404            );
405            input.send_empty_proof();
406            return
407        }
408
409        if self.is_full() {
410            self.pending.push_back(input);
411            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
412            return;
413        }
414
415        self.spawn_multiproof_task(input);
416    }
417
418    /// Signals that a multiproof calculation has finished and there's room to
419    /// spawn a new calculation if needed.
420    fn on_calculation_complete(&mut self) {
421        self.inflight = self.inflight.saturating_sub(1);
422        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
423
424        if let Some(input) = self.pending.pop_front() {
425            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
426            self.spawn_multiproof_task(input);
427        }
428    }
429
430    /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
431    /// multiproof, and dispatching to `spawn_multiproof` otherwise.
432    fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
433        match input {
434            PendingMultiproofTask::Storage(storage_input) => {
435                self.spawn_storage_proof(storage_input);
436            }
437            PendingMultiproofTask::Regular(multiproof_input) => {
438                self.spawn_multiproof(multiproof_input);
439            }
440        }
441    }
442
443    /// Spawns a single storage proof calculation task.
444    fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
445        let StorageMultiproofInput {
446            config,
447            source,
448            hashed_state_update,
449            hashed_address,
450            proof_targets,
451            proof_sequence_number,
452            state_root_message_sender,
453            multi_added_removed_keys,
454        } = storage_multiproof_input;
455
456        let storage_proof_task_handle = self.storage_proof_task_handle.clone();
457        let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
458
459        self.executor.spawn_blocking(move || {
460            let storage_targets = proof_targets.len();
461
462            trace!(
463                target: "engine::root",
464                proof_sequence_number,
465                ?proof_targets,
466                storage_targets,
467                "Starting dedicated storage proof calculation",
468            );
469            let start = Instant::now();
470            let proof_result = ParallelProof::new(
471                config.consistent_view,
472                config.nodes_sorted,
473                config.state_sorted,
474                config.prefix_sets,
475                missed_leaves_storage_roots,
476                storage_proof_task_handle.clone(),
477            )
478            .with_branch_node_masks(true)
479            .with_multi_added_removed_keys(Some(multi_added_removed_keys))
480            .storage_proof(hashed_address, proof_targets);
481            let elapsed = start.elapsed();
482            trace!(
483                target: "engine::root",
484                proof_sequence_number,
485                ?elapsed,
486                ?source,
487                storage_targets,
488                "Storage multiproofs calculated",
489            );
490
491            match proof_result {
492                Ok(proof) => {
493                    let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
494                        Box::new(ProofCalculated {
495                            sequence_number: proof_sequence_number,
496                            update: SparseTrieUpdate {
497                                state: hashed_state_update,
498                                multiproof: DecodedMultiProof::from_storage_proof(
499                                    hashed_address,
500                                    proof,
501                                ),
502                            },
503                            elapsed,
504                        }),
505                    ));
506                }
507                Err(error) => {
508                    let _ = state_root_message_sender
509                        .send(MultiProofMessage::ProofCalculationError(error.into()));
510                }
511            }
512        });
513
514        self.inflight += 1;
515        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
516    }
517
518    /// Spawns a single multiproof calculation task.
519    fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
520        let MultiproofInput {
521            config,
522            source,
523            hashed_state_update,
524            proof_targets,
525            proof_sequence_number,
526            state_root_message_sender,
527            multi_added_removed_keys,
528        } = multiproof_input;
529        let storage_proof_task_handle = self.storage_proof_task_handle.clone();
530        let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
531
532        self.executor.spawn_blocking(move || {
533            let account_targets = proof_targets.len();
534            let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
535
536            trace!(
537                target: "engine::root",
538                proof_sequence_number,
539                ?proof_targets,
540                account_targets,
541                storage_targets,
542                ?source,
543                "Starting multiproof calculation",
544            );
545
546            let start = Instant::now();
547            let proof_result = ParallelProof::new(
548                config.consistent_view,
549                config.nodes_sorted,
550                config.state_sorted,
551                config.prefix_sets,
552                missed_leaves_storage_roots,
553                storage_proof_task_handle.clone(),
554            )
555            .with_branch_node_masks(true)
556            .with_multi_added_removed_keys(multi_added_removed_keys)
557            .decoded_multiproof(proof_targets);
558            let elapsed = start.elapsed();
559            trace!(
560                target: "engine::root",
561                proof_sequence_number,
562                ?elapsed,
563                ?source,
564                account_targets,
565                storage_targets,
566                "Multiproof calculated",
567            );
568
569            match proof_result {
570                Ok(proof) => {
571                    let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
572                        Box::new(ProofCalculated {
573                            sequence_number: proof_sequence_number,
574                            update: SparseTrieUpdate {
575                                state: hashed_state_update,
576                                multiproof: proof,
577                            },
578                            elapsed,
579                        }),
580                    ));
581                }
582                Err(error) => {
583                    let _ = state_root_message_sender
584                        .send(MultiProofMessage::ProofCalculationError(error.into()));
585                }
586            }
587        });
588
589        self.inflight += 1;
590        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
591    }
592}
593
594#[derive(Metrics, Clone)]
595#[metrics(scope = "tree.root")]
596pub(crate) struct MultiProofTaskMetrics {
597    /// Histogram of inflight multiproofs.
598    pub inflight_multiproofs_histogram: Histogram,
599    /// Histogram of pending multiproofs.
600    pub pending_multiproofs_histogram: Histogram,
601
602    /// Histogram of the number of prefetch proof target accounts.
603    pub prefetch_proof_targets_accounts_histogram: Histogram,
604    /// Histogram of the number of prefetch proof target storages.
605    pub prefetch_proof_targets_storages_histogram: Histogram,
606    /// Histogram of the number of prefetch proof target chunks.
607    pub prefetch_proof_chunks_histogram: Histogram,
608
609    /// Histogram of the number of state update proof target accounts.
610    pub state_update_proof_targets_accounts_histogram: Histogram,
611    /// Histogram of the number of state update proof target storages.
612    pub state_update_proof_targets_storages_histogram: Histogram,
613    /// Histogram of the number of state update proof target chunks.
614    pub state_update_proof_chunks_histogram: Histogram,
615
616    /// Histogram of proof calculation durations.
617    pub proof_calculation_duration_histogram: Histogram,
618
619    /// Histogram of sparse trie update durations.
620    pub sparse_trie_update_duration_histogram: Histogram,
621    /// Histogram of sparse trie final update durations.
622    pub sparse_trie_final_update_duration_histogram: Histogram,
623    /// Histogram of sparse trie total durations.
624    pub sparse_trie_total_duration_histogram: Histogram,
625
626    /// Histogram of state updates received.
627    pub state_updates_received_histogram: Histogram,
628    /// Histogram of proofs processed.
629    pub proofs_processed_histogram: Histogram,
630    /// Histogram of total time spent in the multiproof task.
631    pub multiproof_task_total_duration_histogram: Histogram,
632    /// Total time spent waiting for the first state update or prefetch request.
633    pub first_update_wait_time_histogram: Histogram,
634    /// Total time spent waiting for the last proof result.
635    pub last_proof_wait_time_histogram: Histogram,
636}
637
638/// Standalone task that receives a transaction state stream and updates relevant
639/// data structures to calculate state root.
640///
641/// It is responsible of  initializing a blinded sparse trie and subscribe to
642/// transaction state stream. As it receives transaction execution results, it
643/// fetches the proofs for relevant accounts from the database and reveal them
644/// to the tree.
645/// Then it updates relevant leaves according to the result of the transaction.
646/// This feeds updates to the sparse trie task.
647#[derive(Debug)]
648pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
649    /// The size of proof targets chunk to spawn in one calculation.
650    ///
651    /// If [`None`], then chunking is disabled.
652    chunk_size: Option<usize>,
653    /// Task configuration.
654    config: MultiProofConfig<Factory>,
655    /// Receiver for state root related messages.
656    rx: Receiver<MultiProofMessage>,
657    /// Sender for state root related messages.
658    tx: Sender<MultiProofMessage>,
659    /// Sender for state updates emitted by this type.
660    to_sparse_trie: Sender<SparseTrieUpdate>,
661    /// Proof targets that have been already fetched.
662    fetched_proof_targets: MultiProofTargets,
663    /// Tracks keys which have been added and removed throughout the entire block.
664    multi_added_removed_keys: MultiAddedRemovedKeys,
665    /// Proof sequencing handler.
666    proof_sequencer: ProofSequencer,
667    /// Manages calculation of multiproofs.
668    multiproof_manager: MultiproofManager<Factory>,
669    /// multi proof task metrics
670    metrics: MultiProofTaskMetrics,
671}
672
673impl<Factory> MultiProofTask<Factory>
674where
675    Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
676{
677    /// Creates a new multi proof task with the unified message channel
678    pub(super) fn new(
679        config: MultiProofConfig<Factory>,
680        executor: WorkloadExecutor,
681        proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
682        to_sparse_trie: Sender<SparseTrieUpdate>,
683        max_concurrency: usize,
684        chunk_size: Option<usize>,
685    ) -> Self {
686        let (tx, rx) = channel();
687        let metrics = MultiProofTaskMetrics::default();
688
689        Self {
690            chunk_size,
691            config,
692            rx,
693            tx,
694            to_sparse_trie,
695            fetched_proof_targets: Default::default(),
696            multi_added_removed_keys: MultiAddedRemovedKeys::new(),
697            proof_sequencer: ProofSequencer::default(),
698            multiproof_manager: MultiproofManager::new(
699                executor,
700                metrics.clone(),
701                proof_task_handle,
702                max_concurrency,
703            ),
704            metrics,
705        }
706    }
707
708    /// Returns a [`Sender`] that can be used to send arbitrary [`MultiProofMessage`]s to this task.
709    pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
710        self.tx.clone()
711    }
712
713    /// Handles request for proof prefetch.
714    ///
715    /// Returns a number of proofs that were spawned.
716    fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
717        let proof_targets = self.get_prefetch_proof_targets(targets);
718        self.fetched_proof_targets.extend_ref(&proof_targets);
719
720        // Make sure all target accounts have an `AddedRemovedKeySet` in the
721        // [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
722        // we still want to optimistically fetch extension children for the leaf addition case.
723        self.multi_added_removed_keys.touch_accounts(proof_targets.keys().copied());
724
725        // Clone+Arc MultiAddedRemovedKeys for sharing with the spawned multiproof tasks
726        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
727
728        self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
729        self.metrics
730            .prefetch_proof_targets_storages_histogram
731            .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
732
733        // Process proof targets in chunks.
734        let mut chunks = 0;
735        let should_chunk = !self.multiproof_manager.is_full();
736
737        let mut spawn = |proof_targets| {
738            self.multiproof_manager.spawn_or_queue(
739                MultiproofInput {
740                    config: self.config.clone(),
741                    source: None,
742                    hashed_state_update: Default::default(),
743                    proof_targets,
744                    proof_sequence_number: self.proof_sequencer.next_sequence(),
745                    state_root_message_sender: self.tx.clone(),
746                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
747                }
748                .into(),
749            );
750            chunks += 1;
751        };
752
753        if should_chunk && let Some(chunk_size) = self.chunk_size {
754            for proof_targets_chunk in proof_targets.chunks(chunk_size) {
755                spawn(proof_targets_chunk);
756            }
757        } else {
758            spawn(proof_targets);
759        }
760
761        self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
762
763        chunks
764    }
765
766    // Returns true if all state updates finished and all proofs processed.
767    fn is_done(
768        &self,
769        proofs_processed: u64,
770        state_update_proofs_requested: u64,
771        prefetch_proofs_requested: u64,
772        updates_finished: bool,
773    ) -> bool {
774        let all_proofs_processed =
775            proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
776        let no_pending = !self.proof_sequencer.has_pending();
777        debug!(
778            target: "engine::root",
779            proofs_processed,
780            state_update_proofs_requested,
781            prefetch_proofs_requested,
782            no_pending,
783            updates_finished,
784            "Checking end condition"
785        );
786        all_proofs_processed && no_pending && updates_finished
787    }
788
789    /// Calls `get_proof_targets` with existing proof targets for prefetching.
790    fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
791        // Here we want to filter out any targets that are already fetched
792        //
793        // This means we need to remove any storage slots that have already been fetched
794        let mut duplicates = 0;
795
796        // First remove all storage targets that are subsets of already fetched storage slots
797        targets.retain(|hashed_address, target_storage| {
798            let keep = self
799                .fetched_proof_targets
800                .get(hashed_address)
801                // do NOT remove if None, because that means the account has not been fetched yet
802                .is_none_or(|fetched_storage| {
803                    // remove if a subset
804                    !target_storage.is_subset(fetched_storage)
805                });
806
807            if !keep {
808                duplicates += target_storage.len();
809            }
810
811            keep
812        });
813
814        // For all non-subset remaining targets, we have to calculate the difference
815        for (hashed_address, target_storage) in targets.deref_mut() {
816            let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
817                // this means the account has not been fetched yet, so we must fetch everything
818                // associated with this account
819                continue
820            };
821
822            let prev_target_storage_len = target_storage.len();
823
824            // keep only the storage slots that have not been fetched yet
825            //
826            // we already removed subsets, so this should only remove duplicates
827            target_storage.retain(|slot| !fetched_storage.contains(slot));
828
829            duplicates += prev_target_storage_len - target_storage.len();
830        }
831
832        if duplicates > 0 {
833            trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
834        }
835
836        targets
837    }
838
839    /// Handles state updates.
840    ///
841    /// Returns a number of proofs that were spawned.
842    fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
843        let hashed_state_update = evm_state_to_hashed_post_state(update);
844
845        // Update removed keys based on the state update.
846        self.multi_added_removed_keys.update_with_state(&hashed_state_update);
847
848        // Split the state update into already fetched and not fetched according to the proof
849        // targets.
850        let (fetched_state_update, not_fetched_state_update) = hashed_state_update
851            .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
852
853        let mut state_updates = 0;
854        // If there are any accounts or storage slots that we already fetched the proofs for,
855        // send them immediately, as they don't require spawning any additional multiproofs.
856        if !fetched_state_update.is_empty() {
857            let _ = self.tx.send(MultiProofMessage::EmptyProof {
858                sequence_number: self.proof_sequencer.next_sequence(),
859                state: fetched_state_update,
860            });
861            state_updates += 1;
862        }
863
864        // Clone+Arc MultiAddedRemovedKeys for sharing with the spawned multiproof tasks
865        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
866
867        // Process state updates in chunks.
868        let mut chunks = 0;
869        let should_chunk = !self.multiproof_manager.is_full();
870
871        let mut spawned_proof_targets = MultiProofTargets::default();
872
873        let mut spawn = |hashed_state_update| {
874            let proof_targets = get_proof_targets(
875                &hashed_state_update,
876                &self.fetched_proof_targets,
877                &multi_added_removed_keys,
878            );
879            spawned_proof_targets.extend_ref(&proof_targets);
880
881            self.multiproof_manager.spawn_or_queue(
882                MultiproofInput {
883                    config: self.config.clone(),
884                    source: Some(source),
885                    hashed_state_update,
886                    proof_targets,
887                    proof_sequence_number: self.proof_sequencer.next_sequence(),
888                    state_root_message_sender: self.tx.clone(),
889                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
890                }
891                .into(),
892            );
893
894            chunks += 1;
895        };
896
897        if should_chunk && let Some(chunk_size) = self.chunk_size {
898            for chunk in not_fetched_state_update.chunks(chunk_size) {
899                spawn(chunk);
900            }
901        } else {
902            spawn(not_fetched_state_update);
903        }
904
905        self.metrics
906            .state_update_proof_targets_accounts_histogram
907            .record(spawned_proof_targets.len() as f64);
908        self.metrics
909            .state_update_proof_targets_storages_histogram
910            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
911        self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
912
913        self.fetched_proof_targets.extend(spawned_proof_targets);
914
915        state_updates + chunks
916    }
917
918    /// Handler for new proof calculated, aggregates all the existing sequential proofs.
919    fn on_proof(
920        &mut self,
921        sequence_number: u64,
922        update: SparseTrieUpdate,
923    ) -> Option<SparseTrieUpdate> {
924        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
925
926        ready_proofs
927            .into_iter()
928            // Merge all ready proofs and state updates
929            .reduce(|mut acc_update, update| {
930                acc_update.extend(update);
931                acc_update
932            })
933            // Return None if the resulting proof is empty
934            .filter(|proof| !proof.is_empty())
935    }
936
937    /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the
938    /// sparse trie, updates the sparse trie, and eventually returns the state root.
939    ///
940    /// The lifecycle is the following:
941    /// 1. Either [`MultiProofMessage::PrefetchProofs`] or [`MultiProofMessage::StateUpdate`] is
942    ///    received from the engine.
943    ///    * For [`MultiProofMessage::StateUpdate`], the state update is hashed with
944    ///      [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are
945    ///      extracted with [`get_proof_targets`].
946    ///    * For both messages, proof targets are deduplicated according to `fetched_proof_targets`,
947    ///      so that the proofs for accounts and storage slots that were already fetched are not
948    ///      requested again.
949    /// 2. Using the proof targets, a new multiproof is calculated using
950    ///    [`MultiproofManager::spawn_or_queue`].
951    ///    * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
952    ///      sent back to this task along with the original state update.
953    ///    * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`]
954    ///      message is sent back to this task along with the resulting multiproof, proof targets
955    ///      and original state update.
956    /// 3. Either [`MultiProofMessage::EmptyProof`] or [`MultiProofMessage::ProofCalculated`] is
957    ///    received.
958    ///    * The multiproof is added to the (proof sequencer)[`ProofSequencer`].
959    ///    * If the proof sequencer has a contiguous sequence of multiproofs in the same order as
960    ///      state updates arrived (i.e. transaction order), such sequence is returned.
961    /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state
962    ///    updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse
963    ///    trie task.
964    /// 5. Steps above are repeated until this task receives a
965    ///    [`MultiProofMessage::FinishedStateUpdates`].
966    ///    * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
967    ///      [`MultiProofMessage::ProofCalculated`] message, we check if there are any proofs are
968    ///      currently being calculated, or if there are any pending proofs in the proof sequencer
969    ///      left to be revealed by checking the pending tasks.
970    /// 6. This task exits after all pending proofs are processed.
971    pub(crate) fn run(mut self) {
972        // TODO convert those into fields
973        let mut prefetch_proofs_requested = 0;
974        let mut state_update_proofs_requested = 0;
975        let mut proofs_processed = 0;
976
977        let mut updates_finished = false;
978
979        // Timestamp before the first state update or prefetch was received
980        let start = Instant::now();
981
982        // Timestamp when the first state update or prefetch was received
983        let mut first_update_time = None;
984        // Timestamp when state updates have finished
985        let mut updates_finished_time = None;
986
987        loop {
988            trace!(target: "engine::root", "entering main channel receiving loop");
989            match self.rx.recv() {
990                Ok(message) => match message {
991                    MultiProofMessage::PrefetchProofs(targets) => {
992                        trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
993                        if first_update_time.is_none() {
994                            // record the wait time
995                            self.metrics
996                                .first_update_wait_time_histogram
997                                .record(start.elapsed().as_secs_f64());
998                            first_update_time = Some(Instant::now());
999                            debug!(target: "engine::root", "Started state root calculation");
1000                        }
1001
1002                        let account_targets = targets.len();
1003                        let storage_targets =
1004                            targets.values().map(|slots| slots.len()).sum::<usize>();
1005                        prefetch_proofs_requested += self.on_prefetch_proof(targets);
1006                        debug!(
1007                            target: "engine::root",
1008                            account_targets,
1009                            storage_targets,
1010                            prefetch_proofs_requested,
1011                            "Prefetching proofs"
1012                        );
1013                    }
1014                    MultiProofMessage::StateUpdate(source, update) => {
1015                        trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
1016                        if first_update_time.is_none() {
1017                            // record the wait time
1018                            self.metrics
1019                                .first_update_wait_time_histogram
1020                                .record(start.elapsed().as_secs_f64());
1021                            first_update_time = Some(Instant::now());
1022                            debug!(target: "engine::root", "Started state root calculation");
1023                        }
1024
1025                        let len = update.len();
1026                        state_update_proofs_requested += self.on_state_update(source, update);
1027                        debug!(
1028                            target: "engine::root",
1029                            ?source,
1030                            len,
1031                            ?state_update_proofs_requested,
1032                            "Received new state update"
1033                        );
1034                    }
1035                    MultiProofMessage::FinishedStateUpdates => {
1036                        trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
1037                        updates_finished = true;
1038                        updates_finished_time = Some(Instant::now());
1039                        if self.is_done(
1040                            proofs_processed,
1041                            state_update_proofs_requested,
1042                            prefetch_proofs_requested,
1043                            updates_finished,
1044                        ) {
1045                            debug!(
1046                                target: "engine::root",
1047                                "State updates finished and all proofs processed, ending calculation"
1048                            );
1049                            break
1050                        }
1051                    }
1052                    MultiProofMessage::EmptyProof { sequence_number, state } => {
1053                        trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
1054
1055                        proofs_processed += 1;
1056
1057                        if let Some(combined_update) = self.on_proof(
1058                            sequence_number,
1059                            SparseTrieUpdate { state, multiproof: Default::default() },
1060                        ) {
1061                            let _ = self.to_sparse_trie.send(combined_update);
1062                        }
1063
1064                        if self.is_done(
1065                            proofs_processed,
1066                            state_update_proofs_requested,
1067                            prefetch_proofs_requested,
1068                            updates_finished,
1069                        ) {
1070                            debug!(
1071                                target: "engine::root",
1072                                "State updates finished and all proofs processed, ending calculation"
1073                            );
1074                            break
1075                        }
1076                    }
1077                    MultiProofMessage::ProofCalculated(proof_calculated) => {
1078                        trace!(target: "engine::root", "processing
1079        MultiProofMessage::ProofCalculated");
1080
1081                        // we increment proofs_processed for both state updates and prefetches,
1082                        // because both are used for the root termination condition.
1083                        proofs_processed += 1;
1084
1085                        self.metrics
1086                            .proof_calculation_duration_histogram
1087                            .record(proof_calculated.elapsed);
1088
1089                        debug!(
1090                            target: "engine::root",
1091                            sequence = proof_calculated.sequence_number,
1092                            total_proofs = proofs_processed,
1093                            "Processing calculated proof"
1094                        );
1095
1096                        self.multiproof_manager.on_calculation_complete();
1097
1098                        if let Some(combined_update) =
1099                            self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
1100                        {
1101                            let _ = self.to_sparse_trie.send(combined_update);
1102                        }
1103
1104                        if self.is_done(
1105                            proofs_processed,
1106                            state_update_proofs_requested,
1107                            prefetch_proofs_requested,
1108                            updates_finished,
1109                        ) {
1110                            debug!(
1111                                target: "engine::root",
1112                                "State updates finished and all proofs processed, ending calculation");
1113                            break
1114                        }
1115                    }
1116                    MultiProofMessage::ProofCalculationError(err) => {
1117                        error!(
1118                            target: "engine::root",
1119                            ?err,
1120                            "proof calculation error"
1121                        );
1122                        return
1123                    }
1124                },
1125                Err(_) => {
1126                    // this means our internal message channel is closed, which shouldn't happen
1127                    // in normal operation since we hold both ends
1128                    error!(target: "engine::root", "Internal message channel closed unexpectedly");
1129                    return
1130                }
1131            }
1132        }
1133
1134        debug!(
1135            target: "engine::root",
1136            total_updates = state_update_proofs_requested,
1137            total_proofs = proofs_processed,
1138            total_time = ?first_update_time.map(|t|t.elapsed()),
1139            time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1140            "All proofs processed, ending calculation"
1141        );
1142
1143        // update total metrics on finish
1144        self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1145        self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1146        if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1147            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1148        }
1149
1150        if let Some(updates_finished_time) = updates_finished_time {
1151            self.metrics
1152                .last_proof_wait_time_histogram
1153                .record(updates_finished_time.elapsed().as_secs_f64());
1154        }
1155    }
1156}
1157
1158/// Returns accounts only with those storages that were not already fetched, and
1159/// if there are no such storages and the account itself was already fetched, the
1160/// account shouldn't be included.
1161fn get_proof_targets(
1162    state_update: &HashedPostState,
1163    fetched_proof_targets: &MultiProofTargets,
1164    multi_added_removed_keys: &MultiAddedRemovedKeys,
1165) -> MultiProofTargets {
1166    let mut targets = MultiProofTargets::default();
1167
1168    // first collect all new accounts (not previously fetched)
1169    for &hashed_address in state_update.accounts.keys() {
1170        if !fetched_proof_targets.contains_key(&hashed_address) {
1171            targets.insert(hashed_address, HashSet::default());
1172        }
1173    }
1174
1175    // then process storage slots for all accounts in the state update
1176    for (hashed_address, storage) in &state_update.storages {
1177        let fetched = fetched_proof_targets.get(hashed_address);
1178        let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1179        let mut changed_slots = storage
1180            .storage
1181            .keys()
1182            .filter(|slot| {
1183                !fetched.is_some_and(|f| f.contains(*slot)) ||
1184                    storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1185            })
1186            .peekable();
1187
1188        // If the storage is wiped, we still need to fetch the account proof.
1189        if storage.wiped && fetched.is_none() {
1190            targets.entry(*hashed_address).or_default();
1191        }
1192
1193        if changed_slots.peek().is_some() {
1194            targets.entry(*hashed_address).or_default().extend(changed_slots);
1195        }
1196    }
1197
1198    targets
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use super::*;
1204    use alloy_primitives::map::B256Set;
1205    use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
1206    use reth_trie::{MultiProof, TrieInput};
1207    use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1208    use revm_primitives::{B256, U256};
1209    use std::sync::Arc;
1210
1211    fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
1212    where
1213        F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
1214    {
1215        let consistent_view = ConsistentDbView::new(factory, None);
1216        let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
1217        let state_sorted = Arc::new(input.state.clone().into_sorted());
1218        let prefix_sets = Arc::new(input.prefix_sets);
1219
1220        MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
1221    }
1222
1223    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
1224    where
1225        F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
1226    {
1227        let executor = WorkloadExecutor::default();
1228        let config = create_state_root_config(factory, TrieInput::default());
1229        let task_ctx = ProofTaskCtx::new(
1230            config.nodes_sorted.clone(),
1231            config.state_sorted.clone(),
1232            config.prefix_sets.clone(),
1233        );
1234        let proof_task = ProofTaskManager::new(
1235            executor.handle().clone(),
1236            config.consistent_view.clone(),
1237            task_ctx,
1238            1,
1239            1,
1240        )
1241        .expect("Failed to create ProofTaskManager");
1242        let channel = channel();
1243
1244        MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
1245    }
1246
1247    #[test]
1248    fn test_add_proof_in_sequence() {
1249        let mut sequencer = ProofSequencer::default();
1250        let proof1 = MultiProof::default();
1251        let proof2 = MultiProof::default();
1252        sequencer.next_sequence = 2;
1253
1254        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1255        assert_eq!(ready.len(), 1);
1256        assert!(!sequencer.has_pending());
1257
1258        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1259        assert_eq!(ready.len(), 1);
1260        assert!(!sequencer.has_pending());
1261    }
1262
1263    #[test]
1264    fn test_add_proof_out_of_order() {
1265        let mut sequencer = ProofSequencer::default();
1266        let proof1 = MultiProof::default();
1267        let proof2 = MultiProof::default();
1268        let proof3 = MultiProof::default();
1269        sequencer.next_sequence = 3;
1270
1271        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1272        assert_eq!(ready.len(), 0);
1273        assert!(sequencer.has_pending());
1274
1275        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1276        assert_eq!(ready.len(), 1);
1277        assert!(sequencer.has_pending());
1278
1279        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1280        assert_eq!(ready.len(), 2);
1281        assert!(!sequencer.has_pending());
1282    }
1283
1284    #[test]
1285    fn test_add_proof_with_gaps() {
1286        let mut sequencer = ProofSequencer::default();
1287        let proof1 = MultiProof::default();
1288        let proof3 = MultiProof::default();
1289        sequencer.next_sequence = 3;
1290
1291        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1292        assert_eq!(ready.len(), 1);
1293
1294        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1295        assert_eq!(ready.len(), 0);
1296        assert!(sequencer.has_pending());
1297    }
1298
1299    #[test]
1300    fn test_add_proof_duplicate_sequence() {
1301        let mut sequencer = ProofSequencer::default();
1302        let proof1 = MultiProof::default();
1303        let proof2 = MultiProof::default();
1304
1305        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1306        assert_eq!(ready.len(), 1);
1307
1308        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1309        assert_eq!(ready.len(), 0);
1310        assert!(!sequencer.has_pending());
1311    }
1312
1313    #[test]
1314    fn test_add_proof_batch_processing() {
1315        let mut sequencer = ProofSequencer::default();
1316        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1317        sequencer.next_sequence = 5;
1318
1319        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1320        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1321        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1322        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1323
1324        let ready =
1325            sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1326        assert_eq!(ready.len(), 5);
1327        assert!(!sequencer.has_pending());
1328    }
1329
1330    fn create_get_proof_targets_state() -> HashedPostState {
1331        let mut state = HashedPostState::default();
1332
1333        let addr1 = B256::random();
1334        let addr2 = B256::random();
1335        state.accounts.insert(addr1, Some(Default::default()));
1336        state.accounts.insert(addr2, Some(Default::default()));
1337
1338        let mut storage = HashedStorage::default();
1339        let slot1 = B256::random();
1340        let slot2 = B256::random();
1341        storage.storage.insert(slot1, U256::ZERO);
1342        storage.storage.insert(slot2, U256::from(1));
1343        state.storages.insert(addr1, storage);
1344
1345        state
1346    }
1347
1348    #[test]
1349    fn test_get_proof_targets_new_account_targets() {
1350        let state = create_get_proof_targets_state();
1351        let fetched = MultiProofTargets::default();
1352
1353        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1354
1355        // should return all accounts as targets since nothing was fetched before
1356        assert_eq!(targets.len(), state.accounts.len());
1357        for addr in state.accounts.keys() {
1358            assert!(targets.contains_key(addr));
1359        }
1360    }
1361
1362    #[test]
1363    fn test_get_proof_targets_new_storage_targets() {
1364        let state = create_get_proof_targets_state();
1365        let fetched = MultiProofTargets::default();
1366
1367        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1368
1369        // verify storage slots are included for accounts with storage
1370        for (addr, storage) in &state.storages {
1371            assert!(targets.contains_key(addr));
1372            let target_slots = &targets[addr];
1373            assert_eq!(target_slots.len(), storage.storage.len());
1374            for slot in storage.storage.keys() {
1375                assert!(target_slots.contains(slot));
1376            }
1377        }
1378    }
1379
1380    #[test]
1381    fn test_get_proof_targets_filter_already_fetched_accounts() {
1382        let state = create_get_proof_targets_state();
1383        let mut fetched = MultiProofTargets::default();
1384
1385        // select an account that has no storage updates
1386        let fetched_addr = state
1387            .accounts
1388            .keys()
1389            .find(|&&addr| !state.storages.contains_key(&addr))
1390            .expect("Should have an account without storage");
1391
1392        // mark the account as already fetched
1393        fetched.insert(*fetched_addr, HashSet::default());
1394
1395        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1396
1397        // should not include the already fetched account since it has no storage updates
1398        assert!(!targets.contains_key(fetched_addr));
1399        // other accounts should still be included
1400        assert_eq!(targets.len(), state.accounts.len() - 1);
1401    }
1402
1403    #[test]
1404    fn test_get_proof_targets_filter_already_fetched_storage() {
1405        let state = create_get_proof_targets_state();
1406        let mut fetched = MultiProofTargets::default();
1407
1408        // mark one storage slot as already fetched
1409        let (addr, storage) = state.storages.iter().next().unwrap();
1410        let mut fetched_slots = HashSet::default();
1411        let fetched_slot = *storage.storage.keys().next().unwrap();
1412        fetched_slots.insert(fetched_slot);
1413        fetched.insert(*addr, fetched_slots);
1414
1415        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1416
1417        // should not include the already fetched storage slot
1418        let target_slots = &targets[addr];
1419        assert!(!target_slots.contains(&fetched_slot));
1420        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1421    }
1422
1423    #[test]
1424    fn test_get_proof_targets_empty_state() {
1425        let state = HashedPostState::default();
1426        let fetched = MultiProofTargets::default();
1427
1428        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1429
1430        assert!(targets.is_empty());
1431    }
1432
1433    #[test]
1434    fn test_get_proof_targets_mixed_fetched_state() {
1435        let mut state = HashedPostState::default();
1436        let mut fetched = MultiProofTargets::default();
1437
1438        let addr1 = B256::random();
1439        let addr2 = B256::random();
1440        let slot1 = B256::random();
1441        let slot2 = B256::random();
1442
1443        state.accounts.insert(addr1, Some(Default::default()));
1444        state.accounts.insert(addr2, Some(Default::default()));
1445
1446        let mut storage = HashedStorage::default();
1447        storage.storage.insert(slot1, U256::ZERO);
1448        storage.storage.insert(slot2, U256::from(1));
1449        state.storages.insert(addr1, storage);
1450
1451        let mut fetched_slots = HashSet::default();
1452        fetched_slots.insert(slot1);
1453        fetched.insert(addr1, fetched_slots);
1454
1455        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1456
1457        assert!(targets.contains_key(&addr2));
1458        assert!(!targets[&addr1].contains(&slot1));
1459        assert!(targets[&addr1].contains(&slot2));
1460    }
1461
1462    #[test]
1463    fn test_get_proof_targets_unmodified_account_with_storage() {
1464        let mut state = HashedPostState::default();
1465        let fetched = MultiProofTargets::default();
1466
1467        let addr = B256::random();
1468        let slot1 = B256::random();
1469        let slot2 = B256::random();
1470
1471        // don't add the account to state.accounts (simulating unmodified account)
1472        // but add storage updates for this account
1473        let mut storage = HashedStorage::default();
1474        storage.storage.insert(slot1, U256::from(1));
1475        storage.storage.insert(slot2, U256::from(2));
1476        state.storages.insert(addr, storage);
1477
1478        assert!(!state.accounts.contains_key(&addr));
1479        assert!(!fetched.contains_key(&addr));
1480
1481        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1482
1483        // verify that we still get the storage slots for the unmodified account
1484        assert!(targets.contains_key(&addr));
1485
1486        let target_slots = &targets[&addr];
1487        assert_eq!(target_slots.len(), 2);
1488        assert!(target_slots.contains(&slot1));
1489        assert!(target_slots.contains(&slot2));
1490    }
1491
1492    #[test]
1493    fn test_get_prefetch_proof_targets_no_duplicates() {
1494        let test_provider_factory = create_test_provider_factory();
1495        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1496
1497        // populate some targets
1498        let mut targets = MultiProofTargets::default();
1499        let addr1 = B256::random();
1500        let addr2 = B256::random();
1501        let slot1 = B256::random();
1502        let slot2 = B256::random();
1503        targets.insert(addr1, std::iter::once(slot1).collect());
1504        targets.insert(addr2, std::iter::once(slot2).collect());
1505
1506        let prefetch_proof_targets =
1507            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1508
1509        // check that the prefetch proof targets are the same because there are no fetched proof
1510        // targets yet
1511        assert_eq!(prefetch_proof_targets, targets);
1512
1513        // add a different addr and slot to fetched proof targets
1514        let addr3 = B256::random();
1515        let slot3 = B256::random();
1516        test_state_root_task.fetched_proof_targets.insert(addr3, std::iter::once(slot3).collect());
1517
1518        let prefetch_proof_targets =
1519            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1520
1521        // check that the prefetch proof targets are the same because the fetched proof targets
1522        // don't overlap with the prefetch targets
1523        assert_eq!(prefetch_proof_targets, targets);
1524    }
1525
1526    #[test]
1527    fn test_get_prefetch_proof_targets_remove_subset() {
1528        let test_provider_factory = create_test_provider_factory();
1529        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1530
1531        // populate some targe
1532        let mut targets = MultiProofTargets::default();
1533        let addr1 = B256::random();
1534        let addr2 = B256::random();
1535        let slot1 = B256::random();
1536        let slot2 = B256::random();
1537        targets.insert(addr1, std::iter::once(slot1).collect());
1538        targets.insert(addr2, std::iter::once(slot2).collect());
1539
1540        // add a subset of the first target to fetched proof targets
1541        test_state_root_task.fetched_proof_targets.insert(addr1, std::iter::once(slot1).collect());
1542
1543        let prefetch_proof_targets =
1544            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1545
1546        // check that the prefetch proof targets do not include the subset
1547        assert_eq!(prefetch_proof_targets.len(), 1);
1548        assert!(!prefetch_proof_targets.contains_key(&addr1));
1549        assert!(prefetch_proof_targets.contains_key(&addr2));
1550
1551        // now add one more slot to the prefetch targets
1552        let slot3 = B256::random();
1553        targets.get_mut(&addr1).unwrap().insert(slot3);
1554
1555        let prefetch_proof_targets =
1556            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1557
1558        // check that the prefetch proof targets do not include the subset
1559        // but include the new slot
1560        assert_eq!(prefetch_proof_targets.len(), 2);
1561        assert!(prefetch_proof_targets.contains_key(&addr1));
1562        assert_eq!(
1563            *prefetch_proof_targets.get(&addr1).unwrap(),
1564            std::iter::once(slot3).collect::<B256Set>()
1565        );
1566        assert!(prefetch_proof_targets.contains_key(&addr2));
1567        assert_eq!(
1568            *prefetch_proof_targets.get(&addr2).unwrap(),
1569            std::iter::once(slot2).collect::<B256Set>()
1570        );
1571    }
1572
1573    #[test]
1574    fn test_get_proof_targets_with_removed_storage_keys() {
1575        let mut state = HashedPostState::default();
1576        let mut fetched = MultiProofTargets::default();
1577        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1578
1579        let addr = B256::random();
1580        let slot1 = B256::random();
1581        let slot2 = B256::random();
1582
1583        // add account to state
1584        state.accounts.insert(addr, Some(Default::default()));
1585
1586        // add storage updates
1587        let mut storage = HashedStorage::default();
1588        storage.storage.insert(slot1, U256::from(100));
1589        storage.storage.insert(slot2, U256::from(200));
1590        state.storages.insert(addr, storage);
1591
1592        // mark slot1 as already fetched
1593        let mut fetched_slots = HashSet::default();
1594        fetched_slots.insert(slot1);
1595        fetched.insert(addr, fetched_slots);
1596
1597        // update multi_added_removed_keys to mark slot1 as removed
1598        let mut removed_state = HashedPostState::default();
1599        let mut removed_storage = HashedStorage::default();
1600        removed_storage.storage.insert(slot1, U256::ZERO); // U256::ZERO marks as removed
1601        removed_state.storages.insert(addr, removed_storage);
1602        multi_added_removed_keys.update_with_state(&removed_state);
1603
1604        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1605
1606        // slot1 should be included despite being fetched, because it's marked as removed
1607        assert!(targets.contains_key(&addr));
1608        let target_slots = &targets[&addr];
1609        assert_eq!(target_slots.len(), 2);
1610        assert!(target_slots.contains(&slot1)); // included because it's removed
1611        assert!(target_slots.contains(&slot2)); // included because it's not fetched
1612    }
1613
1614    #[test]
1615    fn test_get_proof_targets_with_wiped_storage() {
1616        let mut state = HashedPostState::default();
1617        let fetched = MultiProofTargets::default();
1618        let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1619
1620        let addr = B256::random();
1621        let slot1 = B256::random();
1622
1623        // add account to state
1624        state.accounts.insert(addr, Some(Default::default()));
1625
1626        // add wiped storage
1627        let mut storage = HashedStorage::new(true);
1628        storage.storage.insert(slot1, U256::from(100));
1629        state.storages.insert(addr, storage);
1630
1631        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1632
1633        // account should be included because storage is wiped and account wasn't fetched
1634        assert!(targets.contains_key(&addr));
1635        let target_slots = &targets[&addr];
1636        assert_eq!(target_slots.len(), 1);
1637        assert!(target_slots.contains(&slot1));
1638    }
1639
1640    #[test]
1641    fn test_get_proof_targets_removed_keys_not_in_state_update() {
1642        let mut state = HashedPostState::default();
1643        let mut fetched = MultiProofTargets::default();
1644        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1645
1646        let addr = B256::random();
1647        let slot1 = B256::random();
1648        let slot2 = B256::random();
1649        let slot3 = B256::random();
1650
1651        // add account to state
1652        state.accounts.insert(addr, Some(Default::default()));
1653
1654        // add storage updates for slot1 and slot2 only
1655        let mut storage = HashedStorage::default();
1656        storage.storage.insert(slot1, U256::from(100));
1657        storage.storage.insert(slot2, U256::from(200));
1658        state.storages.insert(addr, storage);
1659
1660        // mark all slots as already fetched
1661        let mut fetched_slots = HashSet::default();
1662        fetched_slots.insert(slot1);
1663        fetched_slots.insert(slot2);
1664        fetched_slots.insert(slot3); // slot3 is fetched but not in state update
1665        fetched.insert(addr, fetched_slots);
1666
1667        // mark slot3 as removed (even though it's not in the state update)
1668        let mut removed_state = HashedPostState::default();
1669        let mut removed_storage = HashedStorage::default();
1670        removed_storage.storage.insert(slot3, U256::ZERO);
1671        removed_state.storages.insert(addr, removed_storage);
1672        multi_added_removed_keys.update_with_state(&removed_state);
1673
1674        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1675
1676        // only slots in the state update can be included, so slot3 should not appear
1677        assert!(!targets.contains_key(&addr));
1678    }
1679}