reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{
6    keccak256,
7    map::{B256Set, HashSet},
8    B256,
9};
10use derive_more::derive::Deref;
11use metrics::Histogram;
12use reth_errors::ProviderError;
13use reth_metrics::Metrics;
14use reth_provider::{
15    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx,
16    StateCommitmentProvider,
17};
18use reth_revm::state::EvmState;
19use reth_trie::{
20    prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, HashedPostState,
21    HashedPostStateSorted, HashedStorage, MultiProof, MultiProofTargets, TrieInput,
22};
23use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle};
24use std::{
25    collections::{BTreeMap, VecDeque},
26    ops::DerefMut,
27    sync::{
28        mpsc::{channel, Receiver, Sender},
29        Arc,
30    },
31    time::{Duration, Instant},
32};
33use tracing::{debug, error, trace};
34
35/// The size of proof targets chunk to spawn in one calculation.
36const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
37
38/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
39/// state.
40#[derive(Default, Debug)]
41pub struct SparseTrieUpdate {
42    /// The state update that was used to calculate the proof
43    pub(crate) state: HashedPostState,
44    /// The calculated multiproof
45    pub(crate) multiproof: MultiProof,
46}
47
48impl SparseTrieUpdate {
49    /// Returns true if the update is empty.
50    pub(super) fn is_empty(&self) -> bool {
51        self.state.is_empty() && self.multiproof.is_empty()
52    }
53
54    /// Construct update from multiproof.
55    #[cfg(test)]
56    pub(super) fn from_multiproof(multiproof: MultiProof) -> Self {
57        Self { multiproof, ..Default::default() }
58    }
59
60    /// Extend update with contents of the other.
61    pub(super) fn extend(&mut self, other: Self) {
62        self.state.extend(other.state);
63        self.multiproof.extend(other.multiproof);
64    }
65}
66
67/// Common configuration for multi proof tasks
68#[derive(Debug, Clone)]
69pub(super) struct MultiProofConfig<Factory> {
70    /// View over the state in the database.
71    pub consistent_view: ConsistentDbView<Factory>,
72    /// The sorted collection of cached in-memory intermediate trie nodes that
73    /// can be reused for computation.
74    pub nodes_sorted: Arc<TrieUpdatesSorted>,
75    /// The sorted in-memory overlay hashed state.
76    pub state_sorted: Arc<HashedPostStateSorted>,
77    /// The collection of prefix sets for the computation. Since the prefix sets _always_
78    /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here,
79    /// if we have cached nodes for them.
80    pub prefix_sets: Arc<TriePrefixSetsMut>,
81}
82
83impl<Factory> MultiProofConfig<Factory> {
84    /// Creates a new state root config from the consistent view and the trie input.
85    pub(super) fn new_from_input(
86        consistent_view: ConsistentDbView<Factory>,
87        input: TrieInput,
88    ) -> Self {
89        Self {
90            consistent_view,
91            nodes_sorted: Arc::new(input.nodes.into_sorted()),
92            state_sorted: Arc::new(input.state.into_sorted()),
93            prefix_sets: Arc::new(input.prefix_sets),
94        }
95    }
96}
97
98/// Messages used internally by the multi proof task.
99#[derive(Debug)]
100pub(super) enum MultiProofMessage {
101    /// Prefetch proof targets
102    PrefetchProofs(MultiProofTargets),
103    /// New state update from transaction execution with its source
104    StateUpdate(StateChangeSource, EvmState),
105    /// State update that can be applied to the sparse trie without any new proofs.
106    ///
107    /// It can be the case when all accounts and storage slots from the state update were already
108    /// fetched and revealed.
109    EmptyProof {
110        /// The index of this proof in the sequence of state updates
111        sequence_number: u64,
112        /// The state update that was used to calculate the proof
113        state: HashedPostState,
114    },
115    /// Proof calculation completed for a specific state update
116    ProofCalculated(Box<ProofCalculated>),
117    /// Error during proof calculation
118    ProofCalculationError(ProviderError),
119    /// Signals state update stream end.
120    ///
121    /// This is triggered by block execution, indicating that no additional state updates are
122    /// expected.
123    FinishedStateUpdates,
124}
125
126/// Message about completion of proof calculation for a specific state update
127#[derive(Debug)]
128pub(super) struct ProofCalculated {
129    /// The index of this proof in the sequence of state updates
130    sequence_number: u64,
131    /// Sparse trie update
132    update: SparseTrieUpdate,
133    /// The time taken to calculate the proof.
134    elapsed: Duration,
135}
136
137/// Handle to track proof calculation ordering.
138#[derive(Debug, Default)]
139struct ProofSequencer {
140    /// The next proof sequence number to be produced.
141    next_sequence: u64,
142    /// The next sequence number expected to be delivered.
143    next_to_deliver: u64,
144    /// Buffer for out-of-order proofs and corresponding state updates
145    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
146}
147
148impl ProofSequencer {
149    /// Gets the next sequence number and increments the counter
150    const fn next_sequence(&mut self) -> u64 {
151        let seq = self.next_sequence;
152        self.next_sequence += 1;
153        seq
154    }
155
156    /// Adds a proof with the corresponding state update and returns all sequential proofs and state
157    /// updates if we have a continuous sequence
158    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
159        if sequence >= self.next_to_deliver {
160            self.pending_proofs.insert(sequence, update);
161        }
162
163        // return early if we don't have the next expected proof
164        if !self.pending_proofs.contains_key(&self.next_to_deliver) {
165            return Vec::new()
166        }
167
168        let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
169        let mut current_sequence = self.next_to_deliver;
170
171        // keep collecting proofs and state updates as long as we have consecutive sequence numbers
172        while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
173            consecutive_proofs.push(pending);
174            current_sequence += 1;
175
176            // if we don't have the next number, stop collecting
177            if !self.pending_proofs.contains_key(&current_sequence) {
178                break;
179            }
180        }
181
182        self.next_to_deliver += consecutive_proofs.len() as u64;
183
184        consecutive_proofs
185    }
186
187    /// Returns true if we still have pending proofs
188    pub(crate) fn has_pending(&self) -> bool {
189        !self.pending_proofs.is_empty()
190    }
191}
192
193/// A wrapper for the sender that signals completion when dropped.
194///
195/// This type is intended to be used in combination with the evm executor statehook.
196/// This should trigger once the block has been executed (after) the last state update has been
197/// sent. This triggers the exit condition of the multi proof task.
198#[derive(Deref, Debug)]
199pub(super) struct StateHookSender(Sender<MultiProofMessage>);
200
201impl StateHookSender {
202    pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
203        Self(inner)
204    }
205}
206
207impl Drop for StateHookSender {
208    fn drop(&mut self) {
209        // Send completion signal when the sender is dropped
210        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
211    }
212}
213
214pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
215    let mut hashed_state = HashedPostState::with_capacity(update.len());
216
217    for (address, account) in update {
218        if account.is_touched() {
219            let hashed_address = keccak256(address);
220            trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
221
222            let destroyed = account.is_selfdestructed();
223            let info = if destroyed { None } else { Some(account.info.into()) };
224            hashed_state.accounts.insert(hashed_address, info);
225
226            let mut changed_storage_iter = account
227                .storage
228                .into_iter()
229                .filter(|(_slot, value)| value.is_changed())
230                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
231                .peekable();
232
233            if destroyed {
234                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
235            } else if changed_storage_iter.peek().is_some() {
236                hashed_state
237                    .storages
238                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
239            }
240        }
241    }
242
243    hashed_state
244}
245
246/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
247#[derive(Debug)]
248enum PendingMultiproofTask<Factory> {
249    /// A storage multiproof task input.
250    Storage(StorageMultiproofInput<Factory>),
251    /// A regular multiproof task input.
252    Regular(MultiproofInput<Factory>),
253}
254
255impl<Factory> PendingMultiproofTask<Factory> {
256    /// Returns the proof sequence number of the task.
257    const fn proof_sequence_number(&self) -> u64 {
258        match self {
259            Self::Storage(input) => input.proof_sequence_number,
260            Self::Regular(input) => input.proof_sequence_number,
261        }
262    }
263
264    /// Returns whether or not the proof targets are empty.
265    fn proof_targets_is_empty(&self) -> bool {
266        match self {
267            Self::Storage(input) => input.proof_targets.is_empty(),
268            Self::Regular(input) => input.proof_targets.is_empty(),
269        }
270    }
271
272    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
273    fn send_empty_proof(self) {
274        match self {
275            Self::Storage(input) => input.send_empty_proof(),
276            Self::Regular(input) => input.send_empty_proof(),
277        }
278    }
279}
280
281impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
282    fn from(input: StorageMultiproofInput<Factory>) -> Self {
283        Self::Storage(input)
284    }
285}
286
287impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
288    fn from(input: MultiproofInput<Factory>) -> Self {
289        Self::Regular(input)
290    }
291}
292
293/// Input parameters for spawning a dedicated storage multiproof calculation.
294#[derive(Debug)]
295struct StorageMultiproofInput<Factory> {
296    config: MultiProofConfig<Factory>,
297    source: Option<StateChangeSource>,
298    hashed_state_update: HashedPostState,
299    hashed_address: B256,
300    proof_targets: B256Set,
301    proof_sequence_number: u64,
302    state_root_message_sender: Sender<MultiProofMessage>,
303}
304
305impl<Factory> StorageMultiproofInput<Factory> {
306    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
307    fn send_empty_proof(self) {
308        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
309            sequence_number: self.proof_sequence_number,
310            state: self.hashed_state_update,
311        });
312    }
313}
314
315/// Input parameters for spawning a multiproof calculation.
316#[derive(Debug)]
317struct MultiproofInput<Factory> {
318    config: MultiProofConfig<Factory>,
319    source: Option<StateChangeSource>,
320    hashed_state_update: HashedPostState,
321    proof_targets: MultiProofTargets,
322    proof_sequence_number: u64,
323    state_root_message_sender: Sender<MultiProofMessage>,
324}
325
326impl<Factory> MultiproofInput<Factory> {
327    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
328    fn send_empty_proof(self) {
329        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
330            sequence_number: self.proof_sequence_number,
331            state: self.hashed_state_update,
332        });
333    }
334}
335
336/// Manages concurrent multiproof calculations.
337/// Takes care of not having more calculations in flight than a given maximum
338/// concurrency, further calculation requests are queued and spawn later, after
339/// availability has been signaled.
340#[derive(Debug)]
341pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
342    /// Maximum number of concurrent calculations.
343    max_concurrent: usize,
344    /// Currently running calculations.
345    inflight: usize,
346    /// Queued calculations.
347    pending: VecDeque<PendingMultiproofTask<Factory>>,
348    /// Executor for tasks
349    executor: WorkloadExecutor,
350    /// Sender to the storage proof task.
351    storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
352    /// Metrics
353    metrics: MultiProofTaskMetrics,
354}
355
356impl<Factory> MultiproofManager<Factory>
357where
358    Factory:
359        DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
360{
361    /// Creates a new [`MultiproofManager`].
362    fn new(
363        executor: WorkloadExecutor,
364        metrics: MultiProofTaskMetrics,
365        storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
366        max_concurrent: usize,
367    ) -> Self {
368        Self {
369            pending: VecDeque::with_capacity(max_concurrent),
370            max_concurrent,
371            executor,
372            inflight: 0,
373            metrics,
374            storage_proof_task_handle,
375        }
376    }
377
378    /// Spawns a new multiproof calculation or enqueues it for later if
379    /// `max_concurrent` are already inflight.
380    fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
381        // If there are no proof targets, we can just send an empty multiproof back immediately
382        if input.proof_targets_is_empty() {
383            debug!(
384                sequence_number = input.proof_sequence_number(),
385                "No proof targets, sending empty multiproof back immediately"
386            );
387            input.send_empty_proof();
388            return
389        }
390
391        if self.inflight >= self.max_concurrent {
392            self.pending.push_back(input);
393            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
394            return;
395        }
396
397        self.spawn_multiproof_task(input);
398    }
399
400    /// Signals that a multiproof calculation has finished and there's room to
401    /// spawn a new calculation if needed.
402    fn on_calculation_complete(&mut self) {
403        self.inflight = self.inflight.saturating_sub(1);
404        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
405
406        if let Some(input) = self.pending.pop_front() {
407            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
408            self.spawn_multiproof_task(input);
409        }
410    }
411
412    /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
413    /// multiproof, and dispatching to `spawn_multiproof` otherwise.
414    fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
415        match input {
416            PendingMultiproofTask::Storage(storage_input) => {
417                self.spawn_storage_proof(storage_input);
418            }
419            PendingMultiproofTask::Regular(multiproof_input) => {
420                self.spawn_multiproof(multiproof_input);
421            }
422        }
423    }
424
425    /// Spawns a single storage proof calculation task.
426    fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
427        let StorageMultiproofInput {
428            config,
429            source,
430            hashed_state_update,
431            hashed_address,
432            proof_targets,
433            proof_sequence_number,
434            state_root_message_sender,
435        } = storage_multiproof_input;
436
437        let storage_proof_task_handle = self.storage_proof_task_handle.clone();
438
439        self.executor.spawn_blocking(move || {
440            let storage_targets = proof_targets.len();
441
442            trace!(
443                target: "engine::root",
444                proof_sequence_number,
445                ?proof_targets,
446                storage_targets,
447                "Starting dedicated storage proof calculation",
448            );
449            let start = Instant::now();
450            let result = ParallelProof::new(
451                config.consistent_view,
452                config.nodes_sorted,
453                config.state_sorted,
454                config.prefix_sets,
455                storage_proof_task_handle.clone(),
456            )
457            .with_branch_node_masks(true)
458            .storage_proof(hashed_address, proof_targets);
459            let elapsed = start.elapsed();
460            trace!(
461                target: "engine::root",
462                proof_sequence_number,
463                ?elapsed,
464                ?source,
465                storage_targets,
466                "Storage multiproofs calculated",
467            );
468
469            match result {
470                Ok(proof) => {
471                    let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
472                        Box::new(ProofCalculated {
473                            sequence_number: proof_sequence_number,
474                            update: SparseTrieUpdate {
475                                state: hashed_state_update,
476                                multiproof: MultiProof::from_storage_proof(hashed_address, proof),
477                            },
478                            elapsed,
479                        }),
480                    ));
481                }
482                Err(error) => {
483                    let _ = state_root_message_sender
484                        .send(MultiProofMessage::ProofCalculationError(error.into()));
485                }
486            }
487        });
488
489        self.inflight += 1;
490        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
491    }
492
493    /// Spawns a single multiproof calculation task.
494    fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
495        let MultiproofInput {
496            config,
497            source,
498            hashed_state_update,
499            proof_targets,
500            proof_sequence_number,
501            state_root_message_sender,
502        } = multiproof_input;
503        let storage_proof_task_handle = self.storage_proof_task_handle.clone();
504
505        self.executor.spawn_blocking(move || {
506            let account_targets = proof_targets.len();
507            let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
508
509            trace!(
510                target: "engine::root",
511                proof_sequence_number,
512                ?proof_targets,
513                account_targets,
514                storage_targets,
515                "Starting multiproof calculation",
516            );
517            let start = Instant::now();
518            let result = ParallelProof::new(
519                config.consistent_view,
520                config.nodes_sorted,
521                config.state_sorted,
522                config.prefix_sets,
523                storage_proof_task_handle.clone(),
524            )
525            .with_branch_node_masks(true)
526            .multiproof(proof_targets);
527            let elapsed = start.elapsed();
528            trace!(
529                target: "engine::root",
530                proof_sequence_number,
531                ?elapsed,
532                ?source,
533                account_targets,
534                storage_targets,
535                "Multiproof calculated",
536            );
537
538            match result {
539                Ok(proof) => {
540                    let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
541                        Box::new(ProofCalculated {
542                            sequence_number: proof_sequence_number,
543                            update: SparseTrieUpdate {
544                                state: hashed_state_update,
545                                multiproof: proof,
546                            },
547                            elapsed,
548                        }),
549                    ));
550                }
551                Err(error) => {
552                    let _ = state_root_message_sender
553                        .send(MultiProofMessage::ProofCalculationError(error.into()));
554                }
555            }
556        });
557
558        self.inflight += 1;
559        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
560    }
561}
562
563#[derive(Metrics, Clone)]
564#[metrics(scope = "tree.root")]
565pub(crate) struct MultiProofTaskMetrics {
566    /// Histogram of inflight multiproofs.
567    pub inflight_multiproofs_histogram: Histogram,
568    /// Histogram of pending multiproofs.
569    pub pending_multiproofs_histogram: Histogram,
570
571    /// Histogram of the number of prefetch proof target accounts.
572    pub prefetch_proof_targets_accounts_histogram: Histogram,
573    /// Histogram of the number of prefetch proof target storages.
574    pub prefetch_proof_targets_storages_histogram: Histogram,
575    /// Histogram of the number of prefetch proof target chunks.
576    pub prefetch_proof_chunks_histogram: Histogram,
577
578    /// Histogram of the number of state update proof target accounts.
579    pub state_update_proof_targets_accounts_histogram: Histogram,
580    /// Histogram of the number of state update proof target storages.
581    pub state_update_proof_targets_storages_histogram: Histogram,
582    /// Histogram of the number of state update proof target chunks.
583    pub state_update_proof_chunks_histogram: Histogram,
584
585    /// Histogram of proof calculation durations.
586    pub proof_calculation_duration_histogram: Histogram,
587
588    /// Histogram of sparse trie update durations.
589    pub sparse_trie_update_duration_histogram: Histogram,
590    /// Histogram of sparse trie final update durations.
591    pub sparse_trie_final_update_duration_histogram: Histogram,
592    /// Histogram of sparse trie total durations.
593    pub sparse_trie_total_duration_histogram: Histogram,
594
595    /// Histogram of state updates received.
596    pub state_updates_received_histogram: Histogram,
597    /// Histogram of proofs processed.
598    pub proofs_processed_histogram: Histogram,
599    /// Histogram of total time spent in the multiproof task.
600    pub multiproof_task_total_duration_histogram: Histogram,
601    /// Total time spent waiting for the first state update or prefetch request.
602    pub first_update_wait_time_histogram: Histogram,
603    /// Total time spent waiting for the last proof result.
604    pub last_proof_wait_time_histogram: Histogram,
605}
606
607/// Standalone task that receives a transaction state stream and updates relevant
608/// data structures to calculate state root.
609///
610/// It is responsible of  initializing a blinded sparse trie and subscribe to
611/// transaction state stream. As it receives transaction execution results, it
612/// fetches the proofs for relevant accounts from the database and reveal them
613/// to the tree.
614/// Then it updates relevant leaves according to the result of the transaction.
615/// This feeds updates to the sparse trie task.
616#[derive(Debug)]
617pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
618    /// Task configuration.
619    config: MultiProofConfig<Factory>,
620    /// Receiver for state root related messages.
621    rx: Receiver<MultiProofMessage>,
622    /// Sender for state root related messages.
623    tx: Sender<MultiProofMessage>,
624    /// Sender for state updates emitted by this type.
625    to_sparse_trie: Sender<SparseTrieUpdate>,
626    /// Proof targets that have been already fetched.
627    fetched_proof_targets: MultiProofTargets,
628    /// Proof sequencing handler.
629    proof_sequencer: ProofSequencer,
630    /// Manages calculation of multiproofs.
631    multiproof_manager: MultiproofManager<Factory>,
632    /// multi proof task metrics
633    metrics: MultiProofTaskMetrics,
634}
635
636impl<Factory> MultiProofTask<Factory>
637where
638    Factory:
639        DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
640{
641    /// Creates a new multi proof task with the unified message channel
642    pub(super) fn new(
643        config: MultiProofConfig<Factory>,
644        executor: WorkloadExecutor,
645        proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
646        to_sparse_trie: Sender<SparseTrieUpdate>,
647        max_concurrency: usize,
648    ) -> Self {
649        let (tx, rx) = channel();
650        let metrics = MultiProofTaskMetrics::default();
651
652        Self {
653            config,
654            rx,
655            tx,
656            to_sparse_trie,
657            fetched_proof_targets: Default::default(),
658            proof_sequencer: ProofSequencer::default(),
659            multiproof_manager: MultiproofManager::new(
660                executor,
661                metrics.clone(),
662                proof_task_handle,
663                max_concurrency,
664            ),
665            metrics,
666        }
667    }
668
669    /// Returns a [`Sender`] that can be used to send arbitrary [`MultiProofMessage`]s to this task.
670    pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
671        self.tx.clone()
672    }
673
674    /// Handles request for proof prefetch.
675    ///
676    /// Returns a number of proofs that were spawned.
677    fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
678        let proof_targets = self.get_prefetch_proof_targets(targets);
679        self.fetched_proof_targets.extend_ref(&proof_targets);
680
681        self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
682        self.metrics
683            .prefetch_proof_targets_storages_histogram
684            .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
685
686        // Process proof targets in chunks.
687        let mut chunks = 0;
688        for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
689            self.multiproof_manager.spawn_or_queue(
690                MultiproofInput {
691                    config: self.config.clone(),
692                    source: None,
693                    hashed_state_update: Default::default(),
694                    proof_targets: proof_targets_chunk,
695                    proof_sequence_number: self.proof_sequencer.next_sequence(),
696                    state_root_message_sender: self.tx.clone(),
697                }
698                .into(),
699            );
700            chunks += 1;
701        }
702        self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
703
704        chunks
705    }
706
707    // Returns true if all state updates finished and all proofs processed.
708    fn is_done(
709        &self,
710        proofs_processed: u64,
711        state_update_proofs_requested: u64,
712        prefetch_proofs_requested: u64,
713        updates_finished: bool,
714    ) -> bool {
715        let all_proofs_processed =
716            proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
717        let no_pending = !self.proof_sequencer.has_pending();
718        debug!(
719            target: "engine::root",
720            proofs_processed,
721            state_update_proofs_requested,
722            prefetch_proofs_requested,
723            no_pending,
724            updates_finished,
725            "Checking end condition"
726        );
727        all_proofs_processed && no_pending && updates_finished
728    }
729
730    /// Calls `get_proof_targets` with existing proof targets for prefetching.
731    fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
732        // Here we want to filter out any targets that are already fetched
733        //
734        // This means we need to remove any storage slots that have already been fetched
735        let mut duplicates = 0;
736
737        // First remove all storage targets that are subsets of already fetched storage slots
738        targets.retain(|hashed_address, target_storage| {
739            let keep = self
740                .fetched_proof_targets
741                .get(hashed_address)
742                // do NOT remove if None, because that means the account has not been fetched yet
743                .is_none_or(|fetched_storage| {
744                    // remove if a subset
745                    !target_storage.is_subset(fetched_storage)
746                });
747
748            if !keep {
749                duplicates += target_storage.len();
750            }
751
752            keep
753        });
754
755        // For all non-subset remaining targets, we have to calculate the difference
756        for (hashed_address, target_storage) in targets.deref_mut() {
757            let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
758                // this means the account has not been fetched yet, so we must fetch everything
759                // associated with this account
760                continue
761            };
762
763            let prev_target_storage_len = target_storage.len();
764
765            // keep only the storage slots that have not been fetched yet
766            //
767            // we already removed subsets, so this should only remove duplicates
768            target_storage.retain(|slot| !fetched_storage.contains(slot));
769
770            duplicates += prev_target_storage_len - target_storage.len();
771        }
772
773        if duplicates > 0 {
774            trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
775        }
776
777        targets
778    }
779
780    /// Handles state updates.
781    ///
782    /// Returns a number of proofs that were spawned.
783    fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
784        let hashed_state_update = evm_state_to_hashed_post_state(update);
785        // Split the state update into already fetched and not fetched according to the proof
786        // targets.
787        let (fetched_state_update, not_fetched_state_update) =
788            hashed_state_update.partition_by_targets(&self.fetched_proof_targets);
789
790        let mut state_updates = 0;
791        // If there are any accounts or storage slots that we already fetched the proofs for,
792        // send them immediately, as they don't require spawning any additional multiproofs.
793        if !fetched_state_update.is_empty() {
794            let _ = self.tx.send(MultiProofMessage::EmptyProof {
795                sequence_number: self.proof_sequencer.next_sequence(),
796                state: fetched_state_update,
797            });
798            state_updates += 1;
799        }
800
801        // Process state updates in chunks.
802        let mut chunks = 0;
803        let mut spawned_proof_targets = MultiProofTargets::default();
804        for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
805            let proof_targets = get_proof_targets(&chunk, &self.fetched_proof_targets);
806            spawned_proof_targets.extend_ref(&proof_targets);
807
808            self.multiproof_manager.spawn_or_queue(
809                MultiproofInput {
810                    config: self.config.clone(),
811                    source: Some(source),
812                    hashed_state_update: chunk,
813                    proof_targets,
814                    proof_sequence_number: self.proof_sequencer.next_sequence(),
815                    state_root_message_sender: self.tx.clone(),
816                }
817                .into(),
818            );
819            chunks += 1;
820        }
821
822        self.metrics
823            .state_update_proof_targets_accounts_histogram
824            .record(spawned_proof_targets.len() as f64);
825        self.metrics
826            .state_update_proof_targets_storages_histogram
827            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
828        self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
829
830        self.fetched_proof_targets.extend(spawned_proof_targets);
831
832        state_updates + chunks
833    }
834
835    /// Handler for new proof calculated, aggregates all the existing sequential proofs.
836    fn on_proof(
837        &mut self,
838        sequence_number: u64,
839        update: SparseTrieUpdate,
840    ) -> Option<SparseTrieUpdate> {
841        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
842
843        ready_proofs
844            .into_iter()
845            // Merge all ready proofs and state updates
846            .reduce(|mut acc_update, update| {
847                acc_update.extend(update);
848                acc_update
849            })
850            // Return None if the resulting proof is empty
851            .filter(|proof| !proof.is_empty())
852    }
853
854    /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the
855    /// sparse trie, updates the sparse trie, and eventually returns the state root.
856    ///
857    /// The lifecycle is the following:
858    /// 1. Either [`MultiProofMessage::PrefetchProofs`] or [`MultiProofMessage::StateUpdate`] is
859    ///    received from the engine.
860    ///    * For [`MultiProofMessage::StateUpdate`], the state update is hashed with
861    ///      [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are
862    ///      extracted with [`get_proof_targets`].
863    ///    * For both messages, proof targets are deduplicated according to `fetched_proof_targets`,
864    ///      so that the proofs for accounts and storage slots that were already fetched are not
865    ///      requested again.
866    /// 2. Using the proof targets, a new multiproof is calculated using
867    ///    [`MultiproofManager::spawn_or_queue`].
868    ///    * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
869    ///      sent back to this task along with the original state update.
870    ///    * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`]
871    ///      message is sent back to this task along with the resulting multiproof, proof targets
872    ///      and original state update.
873    /// 3. Either [`MultiProofMessage::EmptyProof`] or [`MultiProofMessage::ProofCalculated`] is
874    ///    received.
875    ///    * The multiproof is added to the (proof sequencer)[`ProofSequencer`].
876    ///    * If the proof sequencer has a contiguous sequence of multiproofs in the same order as
877    ///      state updates arrived (i.e. transaction order), such sequence is returned.
878    /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state
879    ///    updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse
880    ///    trie task.
881    /// 5. Steps above are repeated until this task receives a
882    ///    [`MultiProofMessage::FinishedStateUpdates`].
883    ///    * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
884    ///      [`MultiProofMessage::ProofCalculated`] message, we check if there are any proofs are
885    ///      currently being calculated, or if there are any pending proofs in the proof sequencer
886    ///      left to be revealed by checking the pending tasks.
887    /// 6. This task exits after all pending proofs are processed.
888    pub(crate) fn run(mut self) {
889        // TODO convert those into fields
890        let mut prefetch_proofs_requested = 0;
891        let mut state_update_proofs_requested = 0;
892        let mut proofs_processed = 0;
893
894        let mut updates_finished = false;
895
896        // Timestamp before the first state update or prefetch was received
897        let start = Instant::now();
898
899        // Timestamp when the first state update or prefetch was received
900        let mut first_update_time = None;
901        // Timestamp when state updates have finished
902        let mut updates_finished_time = None;
903
904        loop {
905            trace!(target: "engine::root", "entering main channel receiving loop");
906            match self.rx.recv() {
907                Ok(message) => match message {
908                    MultiProofMessage::PrefetchProofs(targets) => {
909                        trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
910                        if first_update_time.is_none() {
911                            // record the wait time
912                            self.metrics
913                                .first_update_wait_time_histogram
914                                .record(start.elapsed().as_secs_f64());
915                            first_update_time = Some(Instant::now());
916                            debug!(target: "engine::root", "Started state root calculation");
917                        }
918
919                        let account_targets = targets.len();
920                        let storage_targets =
921                            targets.values().map(|slots| slots.len()).sum::<usize>();
922                        prefetch_proofs_requested += self.on_prefetch_proof(targets);
923                        debug!(
924                            target: "engine::root",
925                            account_targets,
926                            storage_targets,
927                            prefetch_proofs_requested,
928                            "Prefetching proofs"
929                        );
930                    }
931                    MultiProofMessage::StateUpdate(source, update) => {
932                        trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
933                        if first_update_time.is_none() {
934                            // record the wait time
935                            self.metrics
936                                .first_update_wait_time_histogram
937                                .record(start.elapsed().as_secs_f64());
938                            first_update_time = Some(Instant::now());
939                            debug!(target: "engine::root", "Started state root calculation");
940                        }
941
942                        let len = update.len();
943                        state_update_proofs_requested += self.on_state_update(source, update);
944                        debug!(
945                            target: "engine::root",
946                            ?source,
947                            len,
948                            ?state_update_proofs_requested,
949                            "Received new state update"
950                        );
951                    }
952                    MultiProofMessage::FinishedStateUpdates => {
953                        trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
954                        updates_finished = true;
955                        updates_finished_time = Some(Instant::now());
956                        if self.is_done(
957                            proofs_processed,
958                            state_update_proofs_requested,
959                            prefetch_proofs_requested,
960                            updates_finished,
961                        ) {
962                            debug!(
963                                target: "engine::root",
964                                "State updates finished and all proofs processed, ending calculation"
965                            );
966                            break
967                        }
968                    }
969                    MultiProofMessage::EmptyProof { sequence_number, state } => {
970                        trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
971
972                        proofs_processed += 1;
973
974                        if let Some(combined_update) = self.on_proof(
975                            sequence_number,
976                            SparseTrieUpdate { state, multiproof: MultiProof::default() },
977                        ) {
978                            let _ = self.to_sparse_trie.send(combined_update);
979                        }
980
981                        if self.is_done(
982                            proofs_processed,
983                            state_update_proofs_requested,
984                            prefetch_proofs_requested,
985                            updates_finished,
986                        ) {
987                            debug!(
988                                target: "engine::root",
989                                "State updates finished and all proofs processed, ending calculation"
990                            );
991                            break
992                        }
993                    }
994                    MultiProofMessage::ProofCalculated(proof_calculated) => {
995                        trace!(target: "engine::root", "processing
996        MultiProofMessage::ProofCalculated");
997
998                        // we increment proofs_processed for both state updates and prefetches,
999                        // because both are used for the root termination condition.
1000                        proofs_processed += 1;
1001
1002                        self.metrics
1003                            .proof_calculation_duration_histogram
1004                            .record(proof_calculated.elapsed);
1005
1006                        debug!(
1007                            target: "engine::root",
1008                            sequence = proof_calculated.sequence_number,
1009                            total_proofs = proofs_processed,
1010                            "Processing calculated proof"
1011                        );
1012
1013                        self.multiproof_manager.on_calculation_complete();
1014
1015                        if let Some(combined_update) =
1016                            self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
1017                        {
1018                            let _ = self.to_sparse_trie.send(combined_update);
1019                        }
1020
1021                        if self.is_done(
1022                            proofs_processed,
1023                            state_update_proofs_requested,
1024                            prefetch_proofs_requested,
1025                            updates_finished,
1026                        ) {
1027                            debug!(
1028                                target: "engine::root",
1029                                "State updates finished and all proofs processed, ending calculation");
1030                            break
1031                        }
1032                    }
1033                    MultiProofMessage::ProofCalculationError(err) => {
1034                        error!(
1035                            target: "engine::root",
1036                            ?err,
1037                            "proof calculation error"
1038                        );
1039                        return
1040                    }
1041                },
1042                Err(_) => {
1043                    // this means our internal message channel is closed, which shouldn't happen
1044                    // in normal operation since we hold both ends
1045                    error!(
1046                        target: "engine::root",
1047                        "Internal message channel closed unexpectedly"
1048                    );
1049                }
1050            }
1051        }
1052
1053        debug!(
1054            target: "engine::root",
1055            total_updates = state_update_proofs_requested,
1056            total_proofs = proofs_processed,
1057            total_time = ?first_update_time.map(|t|t.elapsed()),
1058            time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1059            "All proofs processed, ending calculation"
1060        );
1061
1062        // update total metrics on finish
1063        self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1064        self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1065        if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1066            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1067        }
1068
1069        if let Some(updates_finished_time) = updates_finished_time {
1070            self.metrics
1071                .last_proof_wait_time_histogram
1072                .record(updates_finished_time.elapsed().as_secs_f64());
1073        }
1074    }
1075}
1076
1077/// Returns accounts only with those storages that were not already fetched, and
1078/// if there are no such storages and the account itself was already fetched, the
1079/// account shouldn't be included.
1080fn get_proof_targets(
1081    state_update: &HashedPostState,
1082    fetched_proof_targets: &MultiProofTargets,
1083) -> MultiProofTargets {
1084    let mut targets = MultiProofTargets::default();
1085
1086    // first collect all new accounts (not previously fetched)
1087    for &hashed_address in state_update.accounts.keys() {
1088        if !fetched_proof_targets.contains_key(&hashed_address) {
1089            targets.insert(hashed_address, HashSet::default());
1090        }
1091    }
1092
1093    // then process storage slots for all accounts in the state update
1094    for (hashed_address, storage) in &state_update.storages {
1095        let fetched = fetched_proof_targets.get(hashed_address);
1096        let mut changed_slots = storage
1097            .storage
1098            .keys()
1099            .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
1100            .peekable();
1101
1102        // If the storage is wiped, we still need to fetch the account proof.
1103        if storage.wiped && fetched.is_none() {
1104            targets.entry(*hashed_address).or_default();
1105        }
1106
1107        if changed_slots.peek().is_some() {
1108            targets.entry(*hashed_address).or_default().extend(changed_slots);
1109        }
1110    }
1111
1112    targets
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117    use super::*;
1118    use alloy_primitives::map::B256Set;
1119    use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
1120    use reth_trie::TrieInput;
1121    use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1122    use revm_primitives::{B256, U256};
1123    use std::sync::Arc;
1124
1125    fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
1126    where
1127        F: DatabaseProviderFactory<Provider: BlockReader>
1128            + StateCommitmentProvider
1129            + Clone
1130            + 'static,
1131    {
1132        let consistent_view = ConsistentDbView::new(factory, None);
1133        let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
1134        let state_sorted = Arc::new(input.state.clone().into_sorted());
1135        let prefix_sets = Arc::new(input.prefix_sets);
1136
1137        MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
1138    }
1139
1140    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
1141    where
1142        F: DatabaseProviderFactory<Provider: BlockReader>
1143            + StateCommitmentProvider
1144            + Clone
1145            + 'static,
1146    {
1147        let executor = WorkloadExecutor::default();
1148        let config = create_state_root_config(factory, TrieInput::default());
1149        let task_ctx = ProofTaskCtx::new(
1150            config.nodes_sorted.clone(),
1151            config.state_sorted.clone(),
1152            config.prefix_sets.clone(),
1153        );
1154        let proof_task = ProofTaskManager::new(
1155            executor.handle().clone(),
1156            config.consistent_view.clone(),
1157            task_ctx,
1158            1,
1159        );
1160        let channel = channel();
1161
1162        MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1)
1163    }
1164
1165    #[test]
1166    fn test_add_proof_in_sequence() {
1167        let mut sequencer = ProofSequencer::default();
1168        let proof1 = MultiProof::default();
1169        let proof2 = MultiProof::default();
1170        sequencer.next_sequence = 2;
1171
1172        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1173        assert_eq!(ready.len(), 1);
1174        assert!(!sequencer.has_pending());
1175
1176        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
1177        assert_eq!(ready.len(), 1);
1178        assert!(!sequencer.has_pending());
1179    }
1180
1181    #[test]
1182    fn test_add_proof_out_of_order() {
1183        let mut sequencer = ProofSequencer::default();
1184        let proof1 = MultiProof::default();
1185        let proof2 = MultiProof::default();
1186        let proof3 = MultiProof::default();
1187        sequencer.next_sequence = 3;
1188
1189        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
1190        assert_eq!(ready.len(), 0);
1191        assert!(sequencer.has_pending());
1192
1193        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1194        assert_eq!(ready.len(), 1);
1195        assert!(sequencer.has_pending());
1196
1197        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
1198        assert_eq!(ready.len(), 2);
1199        assert!(!sequencer.has_pending());
1200    }
1201
1202    #[test]
1203    fn test_add_proof_with_gaps() {
1204        let mut sequencer = ProofSequencer::default();
1205        let proof1 = MultiProof::default();
1206        let proof3 = MultiProof::default();
1207        sequencer.next_sequence = 3;
1208
1209        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1210        assert_eq!(ready.len(), 1);
1211
1212        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
1213        assert_eq!(ready.len(), 0);
1214        assert!(sequencer.has_pending());
1215    }
1216
1217    #[test]
1218    fn test_add_proof_duplicate_sequence() {
1219        let mut sequencer = ProofSequencer::default();
1220        let proof1 = MultiProof::default();
1221        let proof2 = MultiProof::default();
1222
1223        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1224        assert_eq!(ready.len(), 1);
1225
1226        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2));
1227        assert_eq!(ready.len(), 0);
1228        assert!(!sequencer.has_pending());
1229    }
1230
1231    #[test]
1232    fn test_add_proof_batch_processing() {
1233        let mut sequencer = ProofSequencer::default();
1234        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1235        sequencer.next_sequence = 5;
1236
1237        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()));
1238        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()));
1239        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()));
1240        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()));
1241
1242        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()));
1243        assert_eq!(ready.len(), 5);
1244        assert!(!sequencer.has_pending());
1245    }
1246
1247    fn create_get_proof_targets_state() -> HashedPostState {
1248        let mut state = HashedPostState::default();
1249
1250        let addr1 = B256::random();
1251        let addr2 = B256::random();
1252        state.accounts.insert(addr1, Some(Default::default()));
1253        state.accounts.insert(addr2, Some(Default::default()));
1254
1255        let mut storage = HashedStorage::default();
1256        let slot1 = B256::random();
1257        let slot2 = B256::random();
1258        storage.storage.insert(slot1, U256::ZERO);
1259        storage.storage.insert(slot2, U256::from(1));
1260        state.storages.insert(addr1, storage);
1261
1262        state
1263    }
1264
1265    #[test]
1266    fn test_get_proof_targets_new_account_targets() {
1267        let state = create_get_proof_targets_state();
1268        let fetched = MultiProofTargets::default();
1269
1270        let targets = get_proof_targets(&state, &fetched);
1271
1272        // should return all accounts as targets since nothing was fetched before
1273        assert_eq!(targets.len(), state.accounts.len());
1274        for addr in state.accounts.keys() {
1275            assert!(targets.contains_key(addr));
1276        }
1277    }
1278
1279    #[test]
1280    fn test_get_proof_targets_new_storage_targets() {
1281        let state = create_get_proof_targets_state();
1282        let fetched = MultiProofTargets::default();
1283
1284        let targets = get_proof_targets(&state, &fetched);
1285
1286        // verify storage slots are included for accounts with storage
1287        for (addr, storage) in &state.storages {
1288            assert!(targets.contains_key(addr));
1289            let target_slots = &targets[addr];
1290            assert_eq!(target_slots.len(), storage.storage.len());
1291            for slot in storage.storage.keys() {
1292                assert!(target_slots.contains(slot));
1293            }
1294        }
1295    }
1296
1297    #[test]
1298    fn test_get_proof_targets_filter_already_fetched_accounts() {
1299        let state = create_get_proof_targets_state();
1300        let mut fetched = MultiProofTargets::default();
1301
1302        // select an account that has no storage updates
1303        let fetched_addr = state
1304            .accounts
1305            .keys()
1306            .find(|&&addr| !state.storages.contains_key(&addr))
1307            .expect("Should have an account without storage");
1308
1309        // mark the account as already fetched
1310        fetched.insert(*fetched_addr, HashSet::default());
1311
1312        let targets = get_proof_targets(&state, &fetched);
1313
1314        // should not include the already fetched account since it has no storage updates
1315        assert!(!targets.contains_key(fetched_addr));
1316        // other accounts should still be included
1317        assert_eq!(targets.len(), state.accounts.len() - 1);
1318    }
1319
1320    #[test]
1321    fn test_get_proof_targets_filter_already_fetched_storage() {
1322        let state = create_get_proof_targets_state();
1323        let mut fetched = MultiProofTargets::default();
1324
1325        // mark one storage slot as already fetched
1326        let (addr, storage) = state.storages.iter().next().unwrap();
1327        let mut fetched_slots = HashSet::default();
1328        let fetched_slot = *storage.storage.keys().next().unwrap();
1329        fetched_slots.insert(fetched_slot);
1330        fetched.insert(*addr, fetched_slots);
1331
1332        let targets = get_proof_targets(&state, &fetched);
1333
1334        // should not include the already fetched storage slot
1335        let target_slots = &targets[addr];
1336        assert!(!target_slots.contains(&fetched_slot));
1337        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1338    }
1339
1340    #[test]
1341    fn test_get_proof_targets_empty_state() {
1342        let state = HashedPostState::default();
1343        let fetched = MultiProofTargets::default();
1344
1345        let targets = get_proof_targets(&state, &fetched);
1346
1347        assert!(targets.is_empty());
1348    }
1349
1350    #[test]
1351    fn test_get_proof_targets_mixed_fetched_state() {
1352        let mut state = HashedPostState::default();
1353        let mut fetched = MultiProofTargets::default();
1354
1355        let addr1 = B256::random();
1356        let addr2 = B256::random();
1357        let slot1 = B256::random();
1358        let slot2 = B256::random();
1359
1360        state.accounts.insert(addr1, Some(Default::default()));
1361        state.accounts.insert(addr2, Some(Default::default()));
1362
1363        let mut storage = HashedStorage::default();
1364        storage.storage.insert(slot1, U256::ZERO);
1365        storage.storage.insert(slot2, U256::from(1));
1366        state.storages.insert(addr1, storage);
1367
1368        let mut fetched_slots = HashSet::default();
1369        fetched_slots.insert(slot1);
1370        fetched.insert(addr1, fetched_slots);
1371
1372        let targets = get_proof_targets(&state, &fetched);
1373
1374        assert!(targets.contains_key(&addr2));
1375        assert!(!targets[&addr1].contains(&slot1));
1376        assert!(targets[&addr1].contains(&slot2));
1377    }
1378
1379    #[test]
1380    fn test_get_proof_targets_unmodified_account_with_storage() {
1381        let mut state = HashedPostState::default();
1382        let fetched = MultiProofTargets::default();
1383
1384        let addr = B256::random();
1385        let slot1 = B256::random();
1386        let slot2 = B256::random();
1387
1388        // don't add the account to state.accounts (simulating unmodified account)
1389        // but add storage updates for this account
1390        let mut storage = HashedStorage::default();
1391        storage.storage.insert(slot1, U256::from(1));
1392        storage.storage.insert(slot2, U256::from(2));
1393        state.storages.insert(addr, storage);
1394
1395        assert!(!state.accounts.contains_key(&addr));
1396        assert!(!fetched.contains_key(&addr));
1397
1398        let targets = get_proof_targets(&state, &fetched);
1399
1400        // verify that we still get the storage slots for the unmodified account
1401        assert!(targets.contains_key(&addr));
1402
1403        let target_slots = &targets[&addr];
1404        assert_eq!(target_slots.len(), 2);
1405        assert!(target_slots.contains(&slot1));
1406        assert!(target_slots.contains(&slot2));
1407    }
1408
1409    #[test]
1410    fn test_get_prefetch_proof_targets_no_duplicates() {
1411        let test_provider_factory = create_test_provider_factory();
1412        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1413
1414        // populate some targets
1415        let mut targets = MultiProofTargets::default();
1416        let addr1 = B256::random();
1417        let addr2 = B256::random();
1418        let slot1 = B256::random();
1419        let slot2 = B256::random();
1420        targets.insert(addr1, vec![slot1].into_iter().collect());
1421        targets.insert(addr2, vec![slot2].into_iter().collect());
1422
1423        let prefetch_proof_targets =
1424            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1425
1426        // check that the prefetch proof targets are the same because there are no fetched proof
1427        // targets yet
1428        assert_eq!(prefetch_proof_targets, targets);
1429
1430        // add a different addr and slot to fetched proof targets
1431        let addr3 = B256::random();
1432        let slot3 = B256::random();
1433        test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect());
1434
1435        let prefetch_proof_targets =
1436            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1437
1438        // check that the prefetch proof targets are the same because the fetched proof targets
1439        // don't overlap with the prefetch targets
1440        assert_eq!(prefetch_proof_targets, targets);
1441    }
1442
1443    #[test]
1444    fn test_get_prefetch_proof_targets_remove_subset() {
1445        let test_provider_factory = create_test_provider_factory();
1446        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1447
1448        // populate some targe
1449        let mut targets = MultiProofTargets::default();
1450        let addr1 = B256::random();
1451        let addr2 = B256::random();
1452        let slot1 = B256::random();
1453        let slot2 = B256::random();
1454        targets.insert(addr1, vec![slot1].into_iter().collect());
1455        targets.insert(addr2, vec![slot2].into_iter().collect());
1456
1457        // add a subset of the first target to fetched proof targets
1458        test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect());
1459
1460        let prefetch_proof_targets =
1461            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1462
1463        // check that the prefetch proof targets do not include the subset
1464        assert_eq!(prefetch_proof_targets.len(), 1);
1465        assert!(!prefetch_proof_targets.contains_key(&addr1));
1466        assert!(prefetch_proof_targets.contains_key(&addr2));
1467
1468        // now add one more slot to the prefetch targets
1469        let slot3 = B256::random();
1470        targets.get_mut(&addr1).unwrap().insert(slot3);
1471
1472        let prefetch_proof_targets =
1473            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1474
1475        // check that the prefetch proof targets do not include the subset
1476        // but include the new slot
1477        assert_eq!(prefetch_proof_targets.len(), 2);
1478        assert!(prefetch_proof_targets.contains_key(&addr1));
1479        assert_eq!(
1480            *prefetch_proof_targets.get(&addr1).unwrap(),
1481            vec![slot3].into_iter().collect::<B256Set>()
1482        );
1483        assert!(prefetch_proof_targets.contains_key(&addr2));
1484        assert_eq!(
1485            *prefetch_proof_targets.get(&addr2).unwrap(),
1486            vec![slot2].into_iter().collect::<B256Set>()
1487        );
1488    }
1489}