reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{keccak256, map::HashSet, B256};
6use derive_more::derive::Deref;
7use metrics::Histogram;
8use reth_errors::ProviderError;
9use reth_metrics::Metrics;
10use reth_provider::{
11    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
12};
13use reth_revm::state::EvmState;
14use reth_trie::{
15    prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, HashedPostState,
16    HashedPostStateSorted, HashedStorage, MultiProof, MultiProofTargets, TrieInput,
17};
18use reth_trie_parallel::proof::ParallelProof;
19use std::{
20    collections::{BTreeMap, VecDeque},
21    ops::DerefMut,
22    sync::{
23        mpsc::{channel, Receiver, Sender},
24        Arc,
25    },
26    time::{Duration, Instant},
27};
28use tracing::{debug, error, trace};
29
30/// The size of proof targets chunk to spawn in one calculation.
31const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
32
33/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
34/// state.
35#[derive(Default, Debug)]
36pub struct SparseTrieUpdate {
37    /// The state update that was used to calculate the proof
38    pub(crate) state: HashedPostState,
39    /// The calculated multiproof
40    pub(crate) multiproof: MultiProof,
41}
42
43impl SparseTrieUpdate {
44    /// Returns true if the update is empty.
45    pub(super) fn is_empty(&self) -> bool {
46        self.state.is_empty() && self.multiproof.is_empty()
47    }
48
49    /// Construct update from multiproof.
50    #[cfg(test)]
51    pub(super) fn from_multiproof(multiproof: MultiProof) -> Self {
52        Self { multiproof, ..Default::default() }
53    }
54
55    /// Extend update with contents of the other.
56    pub(super) fn extend(&mut self, other: Self) {
57        self.state.extend(other.state);
58        self.multiproof.extend(other.multiproof);
59    }
60}
61
62/// Common configuration for multi proof tasks
63#[derive(Debug, Clone)]
64pub(super) struct MultiProofConfig<Factory> {
65    /// View over the state in the database.
66    pub consistent_view: ConsistentDbView<Factory>,
67    /// The sorted collection of cached in-memory intermediate trie nodes that
68    /// can be reused for computation.
69    pub nodes_sorted: Arc<TrieUpdatesSorted>,
70    /// The sorted in-memory overlay hashed state.
71    pub state_sorted: Arc<HashedPostStateSorted>,
72    /// The collection of prefix sets for the computation. Since the prefix sets _always_
73    /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here,
74    /// if we have cached nodes for them.
75    pub prefix_sets: Arc<TriePrefixSetsMut>,
76}
77
78impl<Factory> MultiProofConfig<Factory> {
79    /// Creates a new state root config from the consistent view and the trie input.
80    pub(super) fn new_from_input(
81        consistent_view: ConsistentDbView<Factory>,
82        input: TrieInput,
83    ) -> Self {
84        Self {
85            consistent_view,
86            nodes_sorted: Arc::new(input.nodes.into_sorted()),
87            state_sorted: Arc::new(input.state.into_sorted()),
88            prefix_sets: Arc::new(input.prefix_sets),
89        }
90    }
91}
92
93/// Messages used internally by the multi proof task.
94#[derive(Debug)]
95pub(super) enum MultiProofMessage {
96    /// Prefetch proof targets
97    PrefetchProofs(MultiProofTargets),
98    /// New state update from transaction execution with its source
99    StateUpdate(StateChangeSource, EvmState),
100    /// State update that can be applied to the sparse trie without any new proofs.
101    ///
102    /// It can be the case when all accounts and storage slots from the state update were already
103    /// fetched and revealed.
104    EmptyProof {
105        /// The index of this proof in the sequence of state updates
106        sequence_number: u64,
107        /// The state update that was used to calculate the proof
108        state: HashedPostState,
109    },
110    /// Proof calculation completed for a specific state update
111    ProofCalculated(Box<ProofCalculated>),
112    /// Error during proof calculation
113    ProofCalculationError(ProviderError),
114    /// Signals state update stream end.
115    ///
116    /// This is triggered by block execution, indicating that no additional state updates are
117    /// expected.
118    FinishedStateUpdates,
119}
120
121/// Message about completion of proof calculation for a specific state update
122#[derive(Debug)]
123pub(super) struct ProofCalculated {
124    /// The index of this proof in the sequence of state updates
125    sequence_number: u64,
126    /// Sparse trie update
127    update: SparseTrieUpdate,
128    /// The time taken to calculate the proof.
129    elapsed: Duration,
130}
131
132/// Handle to track proof calculation ordering.
133#[derive(Debug, Default)]
134struct ProofSequencer {
135    /// The next proof sequence number to be produced.
136    next_sequence: u64,
137    /// The next sequence number expected to be delivered.
138    next_to_deliver: u64,
139    /// Buffer for out-of-order proofs and corresponding state updates
140    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
141}
142
143impl ProofSequencer {
144    /// Gets the next sequence number and increments the counter
145    fn next_sequence(&mut self) -> u64 {
146        let seq = self.next_sequence;
147        self.next_sequence += 1;
148        seq
149    }
150
151    /// Adds a proof with the corresponding state update and returns all sequential proofs and state
152    /// updates if we have a continuous sequence
153    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
154        if sequence >= self.next_to_deliver {
155            self.pending_proofs.insert(sequence, update);
156        }
157
158        // return early if we don't have the next expected proof
159        if !self.pending_proofs.contains_key(&self.next_to_deliver) {
160            return Vec::new()
161        }
162
163        let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
164        let mut current_sequence = self.next_to_deliver;
165
166        // keep collecting proofs and state updates as long as we have consecutive sequence numbers
167        while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
168            consecutive_proofs.push(pending);
169            current_sequence += 1;
170
171            // if we don't have the next number, stop collecting
172            if !self.pending_proofs.contains_key(&current_sequence) {
173                break;
174            }
175        }
176
177        self.next_to_deliver += consecutive_proofs.len() as u64;
178
179        consecutive_proofs
180    }
181
182    /// Returns true if we still have pending proofs
183    pub(crate) fn has_pending(&self) -> bool {
184        !self.pending_proofs.is_empty()
185    }
186}
187
188/// A wrapper for the sender that signals completion when dropped.
189///
190/// This type is intended to be used in combination with the evm executor statehook.
191/// This should trigger once the block has been executed (after) the last state update has been
192/// sent. This triggers the exit condition of the multi proof task.
193#[derive(Deref, Debug)]
194pub(super) struct StateHookSender(Sender<MultiProofMessage>);
195
196impl StateHookSender {
197    pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
198        Self(inner)
199    }
200}
201
202impl Drop for StateHookSender {
203    fn drop(&mut self) {
204        // Send completion signal when the sender is dropped
205        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
206    }
207}
208
209pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
210    let mut hashed_state = HashedPostState::with_capacity(update.len());
211
212    for (address, account) in update {
213        if account.is_touched() {
214            let hashed_address = keccak256(address);
215            trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
216
217            let destroyed = account.is_selfdestructed();
218            let info = if destroyed { None } else { Some(account.info.into()) };
219            hashed_state.accounts.insert(hashed_address, info);
220
221            let mut changed_storage_iter = account
222                .storage
223                .into_iter()
224                .filter(|(_slot, value)| value.is_changed())
225                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
226                .peekable();
227
228            if destroyed {
229                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
230            } else if changed_storage_iter.peek().is_some() {
231                hashed_state
232                    .storages
233                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
234            }
235        }
236    }
237
238    hashed_state
239}
240
241/// Input parameters for spawning a multiproof calculation.
242#[derive(Debug)]
243struct MultiproofInput<Factory> {
244    config: MultiProofConfig<Factory>,
245    source: Option<StateChangeSource>,
246    hashed_state_update: HashedPostState,
247    proof_targets: MultiProofTargets,
248    proof_sequence_number: u64,
249    state_root_message_sender: Sender<MultiProofMessage>,
250}
251
252/// Manages concurrent multiproof calculations.
253/// Takes care of not having more calculations in flight than a given maximum
254/// concurrency, further calculation requests are queued and spawn later, after
255/// availability has been signaled.
256#[derive(Debug)]
257pub struct MultiproofManager<Factory> {
258    /// Maximum number of concurrent calculations.
259    max_concurrent: usize,
260    /// Currently running calculations.
261    inflight: usize,
262    /// Queued calculations.
263    pending: VecDeque<MultiproofInput<Factory>>,
264    /// Executor for tassks
265    executor: WorkloadExecutor,
266    /// Metrics
267    metrics: MultiProofTaskMetrics,
268}
269
270impl<Factory> MultiproofManager<Factory>
271where
272    Factory:
273        DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
274{
275    /// Creates a new [`MultiproofManager`].
276    fn new(executor: WorkloadExecutor, metrics: MultiProofTaskMetrics) -> Self {
277        let max_concurrent = executor.rayon_pool().current_num_threads();
278        Self {
279            pending: VecDeque::with_capacity(max_concurrent),
280            max_concurrent,
281            executor,
282            inflight: 0,
283            metrics,
284        }
285    }
286
287    /// Spawns a new multiproof calculation or enqueues it for later if
288    /// `max_concurrent` are already inflight.
289    fn spawn_or_queue(&mut self, input: MultiproofInput<Factory>) {
290        // If there are no proof targets, we can just send an empty multiproof back immediately
291        if input.proof_targets.is_empty() {
292            debug!(
293                sequence_number = input.proof_sequence_number,
294                "No proof targets, sending empty multiproof back immediately"
295            );
296            let _ = input.state_root_message_sender.send(MultiProofMessage::EmptyProof {
297                sequence_number: input.proof_sequence_number,
298                state: input.hashed_state_update,
299            });
300            return
301        }
302
303        if self.inflight >= self.max_concurrent {
304            self.pending.push_back(input);
305            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
306            return;
307        }
308
309        self.spawn_multiproof(input);
310    }
311
312    /// Signals that a multiproof calculation has finished and there's room to
313    /// spawn a new calculation if needed.
314    fn on_calculation_complete(&mut self) {
315        self.inflight = self.inflight.saturating_sub(1);
316        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
317
318        if let Some(input) = self.pending.pop_front() {
319            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
320            self.spawn_multiproof(input);
321        }
322    }
323
324    /// Spawns a single multiproof calculation task.
325    fn spawn_multiproof(
326        &mut self,
327        MultiproofInput {
328            config,
329            source,
330            hashed_state_update,
331            proof_targets,
332            proof_sequence_number,
333            state_root_message_sender,
334        }: MultiproofInput<Factory>,
335    ) {
336        let executor = self.executor.clone();
337
338        self.executor.spawn_blocking(move || {
339            let account_targets = proof_targets.len();
340            let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
341
342            trace!(
343                target: "engine::root",
344                proof_sequence_number,
345                ?proof_targets,
346                account_targets,
347                storage_targets,
348                "Starting multiproof calculation",
349            );
350            let start = Instant::now();
351            let result = ParallelProof::new(
352                config.consistent_view,
353                config.nodes_sorted,
354                config.state_sorted,
355                config.prefix_sets,
356                executor.handle().clone(),
357            )
358            .with_branch_node_masks(true)
359            .multiproof(proof_targets);
360            let elapsed = start.elapsed();
361            trace!(
362                target: "engine::root",
363                proof_sequence_number,
364                ?elapsed,
365                ?source,
366                account_targets,
367                storage_targets,
368                "Multiproof calculated",
369            );
370
371            match result {
372                Ok(proof) => {
373                    let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
374                        Box::new(ProofCalculated {
375                            sequence_number: proof_sequence_number,
376                            update: SparseTrieUpdate {
377                                state: hashed_state_update,
378                                multiproof: proof,
379                            },
380                            elapsed,
381                        }),
382                    ));
383                }
384                Err(error) => {
385                    let _ = state_root_message_sender
386                        .send(MultiProofMessage::ProofCalculationError(error.into()));
387                }
388            }
389        });
390
391        self.inflight += 1;
392        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
393    }
394}
395
396#[derive(Metrics, Clone)]
397#[metrics(scope = "tree.root")]
398pub(crate) struct MultiProofTaskMetrics {
399    /// Histogram of inflight multiproofs.
400    pub inflight_multiproofs_histogram: Histogram,
401    /// Histogram of pending multiproofs.
402    pub pending_multiproofs_histogram: Histogram,
403
404    /// Histogram of the number of prefetch proof target accounts.
405    pub prefetch_proof_targets_accounts_histogram: Histogram,
406    /// Histogram of the number of prefetch proof target storages.
407    pub prefetch_proof_targets_storages_histogram: Histogram,
408    /// Histogram of the number of prefetch proof target chunks.
409    pub prefetch_proof_chunks_histogram: Histogram,
410
411    /// Histogram of the number of state update proof target accounts.
412    pub state_update_proof_targets_accounts_histogram: Histogram,
413    /// Histogram of the number of state update proof target storages.
414    pub state_update_proof_targets_storages_histogram: Histogram,
415    /// Histogram of the number of state update proof target chunks.
416    pub state_update_proof_chunks_histogram: Histogram,
417
418    /// Histogram of proof calculation durations.
419    pub proof_calculation_duration_histogram: Histogram,
420
421    /// Histogram of sparse trie update durations.
422    pub sparse_trie_update_duration_histogram: Histogram,
423    /// Histogram of sparse trie final update durations.
424    pub sparse_trie_final_update_duration_histogram: Histogram,
425    /// Histogram of sparse trie total durations.
426    pub sparse_trie_total_duration_histogram: Histogram,
427
428    /// Histogram of state updates received.
429    pub state_updates_received_histogram: Histogram,
430    /// Histogram of proofs processed.
431    pub proofs_processed_histogram: Histogram,
432    /// Histogram of total time spent in the multiproof task.
433    pub multiproof_task_total_duration_histogram: Histogram,
434    /// Total time spent waiting for the first state update or prefetch request.
435    pub first_update_wait_time_histogram: Histogram,
436}
437
438/// Standalone task that receives a transaction state stream and updates relevant
439/// data structures to calculate state root.
440///
441/// It is responsible of  initializing a blinded sparse trie and subscribe to
442/// transaction state stream. As it receives transaction execution results, it
443/// fetches the proofs for relevant accounts from the database and reveal them
444/// to the tree.
445/// Then it updates relevant leaves according to the result of the transaction.
446/// This feeds updates to the sparse trie task.
447#[derive(Debug)]
448pub(super) struct MultiProofTask<Factory> {
449    /// Task configuration.
450    config: MultiProofConfig<Factory>,
451    /// Receiver for state root related messages.
452    rx: Receiver<MultiProofMessage>,
453    /// Sender for state root related messages.
454    tx: Sender<MultiProofMessage>,
455    /// Sender for state updates emitted by this type.
456    to_sparse_trie: Sender<SparseTrieUpdate>,
457    /// Proof targets that have been already fetched.
458    fetched_proof_targets: MultiProofTargets,
459    /// Proof sequencing handler.
460    proof_sequencer: ProofSequencer,
461    /// Manages calculation of multiproofs.
462    multiproof_manager: MultiproofManager<Factory>,
463    /// multi proof task metrics
464    metrics: MultiProofTaskMetrics,
465}
466
467impl<Factory> MultiProofTask<Factory>
468where
469    Factory:
470        DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
471{
472    /// Creates a new multi proof task with the unified message channel
473    pub(super) fn new(
474        config: MultiProofConfig<Factory>,
475        executor: WorkloadExecutor,
476        to_sparse_trie: Sender<SparseTrieUpdate>,
477    ) -> Self {
478        let (tx, rx) = channel();
479        let metrics = MultiProofTaskMetrics::default();
480        Self {
481            config,
482            rx,
483            tx,
484            to_sparse_trie,
485            fetched_proof_targets: Default::default(),
486            proof_sequencer: ProofSequencer::default(),
487            multiproof_manager: MultiproofManager::new(executor, metrics.clone()),
488            metrics,
489        }
490    }
491
492    /// Returns a [`Sender`] that can be used to send arbitrary [`MultiProofMessage`]s to this task.
493    pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
494        self.tx.clone()
495    }
496
497    /// Handles request for proof prefetch.
498    ///
499    /// Returns a number of proofs that were spawned.
500    fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
501        let proof_targets = self.get_prefetch_proof_targets(targets);
502        self.fetched_proof_targets.extend_ref(&proof_targets);
503
504        self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
505        self.metrics
506            .prefetch_proof_targets_storages_histogram
507            .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
508
509        // Process proof targets in chunks.
510        let mut chunks = 0;
511        for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
512            self.multiproof_manager.spawn_or_queue(MultiproofInput {
513                config: self.config.clone(),
514                source: None,
515                hashed_state_update: Default::default(),
516                proof_targets: proof_targets_chunk,
517                proof_sequence_number: self.proof_sequencer.next_sequence(),
518                state_root_message_sender: self.tx.clone(),
519            });
520            chunks += 1;
521        }
522        self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
523
524        chunks
525    }
526
527    // Returns true if all state updates finished and all proofs processed.
528    fn is_done(
529        &self,
530        proofs_processed: u64,
531        state_update_proofs_requested: u64,
532        prefetch_proofs_requested: u64,
533        updates_finished: bool,
534    ) -> bool {
535        let all_proofs_processed =
536            proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
537        let no_pending = !self.proof_sequencer.has_pending();
538        debug!(
539            target: "engine::root",
540            proofs_processed,
541            state_update_proofs_requested,
542            prefetch_proofs_requested,
543            no_pending,
544            updates_finished,
545            "Checking end condition"
546        );
547        all_proofs_processed && no_pending && updates_finished
548    }
549
550    /// Calls `get_proof_targets` with existing proof targets for prefetching.
551    fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
552        // Here we want to filter out any targets that are already fetched
553        //
554        // This means we need to remove any storage slots that have already been fetched
555        let mut duplicates = 0;
556
557        // First remove all storage targets that are subsets of already fetched storage slots
558        targets.retain(|hashed_address, target_storage| {
559            let keep = self
560                .fetched_proof_targets
561                .get(hashed_address)
562                // do NOT remove if None, because that means the account has not been fetched yet
563                .is_none_or(|fetched_storage| {
564                    // remove if a subset
565                    !target_storage.is_subset(fetched_storage)
566                });
567
568            if !keep {
569                duplicates += target_storage.len();
570            }
571
572            keep
573        });
574
575        // For all non-subset remaining targets, we have to calculate the difference
576        for (hashed_address, target_storage) in targets.deref_mut() {
577            let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
578                // this means the account has not been fetched yet, so we must fetch everything
579                // associated with this account
580                continue
581            };
582
583            let prev_target_storage_len = target_storage.len();
584
585            // keep only the storage slots that have not been fetched yet
586            //
587            // we already removed subsets, so this should only remove duplicates
588            target_storage.retain(|slot| !fetched_storage.contains(slot));
589
590            duplicates += prev_target_storage_len - target_storage.len();
591        }
592
593        if duplicates > 0 {
594            trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
595        }
596
597        targets
598    }
599
600    /// Handles state updates.
601    ///
602    /// Returns a number of proofs that were spawned.
603    fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
604        let hashed_state_update = evm_state_to_hashed_post_state(update);
605        // Split the state update into already fetched and not fetched according to the proof
606        // targets.
607        let (fetched_state_update, not_fetched_state_update) =
608            hashed_state_update.partition_by_targets(&self.fetched_proof_targets);
609
610        let mut state_updates = 0;
611        // If there are any accounts or storage slots that we already fetched the proofs for,
612        // send them immediately, as they don't require spawning any additional multiproofs.
613        if !fetched_state_update.is_empty() {
614            let _ = self.tx.send(MultiProofMessage::EmptyProof {
615                sequence_number: self.proof_sequencer.next_sequence(),
616                state: fetched_state_update,
617            });
618            state_updates += 1;
619        }
620
621        // Process state updates in chunks.
622        let mut chunks = 0;
623        let mut spawned_proof_targets = MultiProofTargets::default();
624        for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
625            let proof_targets = get_proof_targets(&chunk, &self.fetched_proof_targets);
626            spawned_proof_targets.extend_ref(&proof_targets);
627
628            self.multiproof_manager.spawn_or_queue(MultiproofInput {
629                config: self.config.clone(),
630                source: Some(source),
631                hashed_state_update: chunk,
632                proof_targets,
633                proof_sequence_number: self.proof_sequencer.next_sequence(),
634                state_root_message_sender: self.tx.clone(),
635            });
636            chunks += 1;
637        }
638
639        self.metrics
640            .state_update_proof_targets_accounts_histogram
641            .record(spawned_proof_targets.len() as f64);
642        self.metrics
643            .state_update_proof_targets_storages_histogram
644            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
645        self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
646
647        self.fetched_proof_targets.extend(spawned_proof_targets);
648
649        state_updates + chunks
650    }
651
652    /// Handler for new proof calculated, aggregates all the existing sequential proofs.
653    fn on_proof(
654        &mut self,
655        sequence_number: u64,
656        update: SparseTrieUpdate,
657    ) -> Option<SparseTrieUpdate> {
658        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
659
660        ready_proofs
661            .into_iter()
662            // Merge all ready proofs and state updates
663            .reduce(|mut acc_update, update| {
664                acc_update.extend(update);
665                acc_update
666            })
667            // Return None if the resulting proof is empty
668            .filter(|proof| !proof.is_empty())
669    }
670
671    /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the
672    /// sparse trie, updates the sparse trie, and eventually returns the state root.
673    ///
674    /// The lifecycle is the following:
675    /// 1. Either [`MultiProofMessage::PrefetchProofs`] or [`MultiProofMessage::StateUpdate`] is
676    ///    received from the engine.
677    ///    * For [`MultiProofMessage::StateUpdate`], the state update is hashed with
678    ///      [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are
679    ///      extracted with [`get_proof_targets`].
680    ///    * For both messages, proof targets are deduplicated according to `fetched_proof_targets`,
681    ///      so that the proofs for accounts and storage slots that were already fetched are not
682    ///      requested again.
683    /// 2. Using the proof targets, a new multiproof is calculated using
684    ///    [`MultiproofManager::spawn_or_queue`].
685    ///    * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
686    ///      sent back to this task along with the original state update.
687    ///    * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`]
688    ///      message is sent back to this task along with the resulting multiproof, proof targets
689    ///      and original state update.
690    /// 3. Either [`MultiProofMessage::EmptyProof`] or [`MultiProofMessage::ProofCalculated`] is
691    ///    received.
692    ///    * The multiproof is added to the (proof sequencer)[`ProofSequencer`].
693    ///    * If the proof sequencer has a contiguous sequence of multiproofs in the same order as
694    ///      state updates arrived (i.e. transaction order), such sequence is returned.
695    /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state
696    ///    updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse
697    ///    trie task.
698    /// 5. Steps above are repeated until this task receives a
699    ///    [`MultiProofMessage::FinishedStateUpdates`].
700    ///    * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
701    ///      [`MultiProofMessage::ProofCalculated`] message, we check if there are any proofs are
702    ///      currently being calculated, or if there are any pending proofs in the proof sequencer
703    ///      left to be revealed by checking the pending tasks.
704    /// 6. This task exits after all pending proofs are processed.
705    pub(crate) fn run(mut self) {
706        // TODO convert those into fields
707        let mut prefetch_proofs_requested = 0;
708        let mut state_update_proofs_requested = 0;
709        let mut proofs_processed = 0;
710
711        let mut updates_finished = false;
712
713        // Timestamp before the first state update or prefetch was received
714        let start = Instant::now();
715
716        // Timestamp when the first state update or prefetch was received
717        let mut first_update_time = None;
718        // Timestamp when the last state update was received
719        let mut last_update_time = None;
720
721        loop {
722            trace!(target: "engine::root", "entering main channel receiving loop");
723            match self.rx.recv() {
724                Ok(message) => match message {
725                    MultiProofMessage::PrefetchProofs(targets) => {
726                        trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
727                        if first_update_time.is_none() {
728                            // record the wait time
729                            self.metrics
730                                .first_update_wait_time_histogram
731                                .record(start.elapsed().as_secs_f64());
732                            first_update_time = Some(Instant::now());
733                            debug!(target: "engine::root", "Started state root calculation");
734                        }
735
736                        let account_targets = targets.len();
737                        let storage_targets =
738                            targets.values().map(|slots| slots.len()).sum::<usize>();
739                        prefetch_proofs_requested += self.on_prefetch_proof(targets);
740                        debug!(
741                            target: "engine::root",
742                            account_targets,
743                            storage_targets,
744                            prefetch_proofs_requested,
745                            "Prefetching proofs"
746                        );
747                    }
748                    MultiProofMessage::StateUpdate(source, update) => {
749                        trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
750                        if first_update_time.is_none() {
751                            // record the wait time
752                            self.metrics
753                                .first_update_wait_time_histogram
754                                .record(start.elapsed().as_secs_f64());
755                            first_update_time = Some(Instant::now());
756                            debug!(target: "engine::root", "Started state root calculation");
757                        }
758                        last_update_time = Some(Instant::now());
759
760                        let len = update.len();
761                        state_update_proofs_requested += self.on_state_update(source, update);
762                        debug!(
763                            target: "engine::root",
764                            ?source,
765                            len,
766                            ?state_update_proofs_requested,
767                            "Received new state update"
768                        );
769                    }
770                    MultiProofMessage::FinishedStateUpdates => {
771                        trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
772                        updates_finished = true;
773                        if self.is_done(
774                            proofs_processed,
775                            state_update_proofs_requested,
776                            prefetch_proofs_requested,
777                            updates_finished,
778                        ) {
779                            debug!(
780                                target: "engine::root",
781                                "State updates finished and all proofs processed, ending calculation"
782                            );
783                            break
784                        }
785                    }
786                    MultiProofMessage::EmptyProof { sequence_number, state } => {
787                        trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
788
789                        proofs_processed += 1;
790
791                        if let Some(combined_update) = self.on_proof(
792                            sequence_number,
793                            SparseTrieUpdate { state, multiproof: MultiProof::default() },
794                        ) {
795                            let _ = self.to_sparse_trie.send(combined_update);
796                        }
797
798                        if self.is_done(
799                            proofs_processed,
800                            state_update_proofs_requested,
801                            prefetch_proofs_requested,
802                            updates_finished,
803                        ) {
804                            debug!(
805                                target: "engine::root",
806                                "State updates finished and all proofs processed, ending calculation"
807                            );
808                            break
809                        }
810                    }
811                    MultiProofMessage::ProofCalculated(proof_calculated) => {
812                        trace!(target: "engine::root", "processing
813        MultiProofMessage::ProofCalculated");
814
815                        // we increment proofs_processed for both state updates and prefetches,
816                        // because both are used for the root termination condition.
817                        proofs_processed += 1;
818
819                        self.metrics
820                            .proof_calculation_duration_histogram
821                            .record(proof_calculated.elapsed);
822
823                        debug!(
824                            target: "engine::root",
825                            sequence = proof_calculated.sequence_number,
826                            total_proofs = proofs_processed,
827                            "Processing calculated proof"
828                        );
829
830                        self.multiproof_manager.on_calculation_complete();
831
832                        if let Some(combined_update) =
833                            self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
834                        {
835                            let _ = self.to_sparse_trie.send(combined_update);
836                        }
837
838                        if self.is_done(
839                            proofs_processed,
840                            state_update_proofs_requested,
841                            prefetch_proofs_requested,
842                            updates_finished,
843                        ) {
844                            debug!(
845                                target: "engine::root",
846                                "State updates finished and all proofs processed, ending calculation");
847                            break
848                        }
849                    }
850                    MultiProofMessage::ProofCalculationError(err) => {
851                        error!(
852                            target: "engine::root",
853                            ?err,
854                            "proof calculation error"
855                        );
856                        return
857                    }
858                },
859                Err(_) => {
860                    // this means our internal message channel is closed, which shouldn't happen
861                    // in normal operation since we hold both ends
862                    error!(
863                        target: "engine::root",
864                        "Internal message channel closed unexpectedly"
865                    );
866                }
867            }
868        }
869
870        debug!(
871            target: "engine::root",
872            total_updates = state_update_proofs_requested,
873            total_proofs = proofs_processed,
874            total_time = ?first_update_time.map(|t|t.elapsed()),
875            time_from_last_update = ?last_update_time.map(|t|t.elapsed()),
876            "All proofs processed, ending calculation"
877        );
878
879        // update total metrics on finish
880        self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
881        self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
882        if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
883            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
884        }
885    }
886}
887
888/// Returns accounts only with those storages that were not already fetched, and
889/// if there are no such storages and the account itself was already fetched, the
890/// account shouldn't be included.
891fn get_proof_targets(
892    state_update: &HashedPostState,
893    fetched_proof_targets: &MultiProofTargets,
894) -> MultiProofTargets {
895    let mut targets = MultiProofTargets::default();
896
897    // first collect all new accounts (not previously fetched)
898    for &hashed_address in state_update.accounts.keys() {
899        if !fetched_proof_targets.contains_key(&hashed_address) {
900            targets.insert(hashed_address, HashSet::default());
901        }
902    }
903
904    // then process storage slots for all accounts in the state update
905    for (hashed_address, storage) in &state_update.storages {
906        let fetched = fetched_proof_targets.get(hashed_address);
907        let mut changed_slots = storage
908            .storage
909            .keys()
910            .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
911            .peekable();
912
913        if changed_slots.peek().is_some() {
914            targets.entry(*hashed_address).or_default().extend(changed_slots);
915        }
916    }
917
918    targets
919}
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924    use alloy_primitives::map::B256Set;
925    use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
926    use reth_trie::TrieInput;
927    use revm_primitives::{B256, U256};
928    use std::sync::Arc;
929
930    fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
931    where
932        F: DatabaseProviderFactory<Provider: BlockReader>
933            + StateCommitmentProvider
934            + Clone
935            + 'static,
936    {
937        let consistent_view = ConsistentDbView::new(factory, None);
938        let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
939        let state_sorted = Arc::new(input.state.clone().into_sorted());
940        let prefix_sets = Arc::new(input.prefix_sets);
941
942        MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
943    }
944
945    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
946    where
947        F: DatabaseProviderFactory<Provider: BlockReader>
948            + StateCommitmentProvider
949            + Clone
950            + 'static,
951    {
952        let executor = WorkloadExecutor::with_num_cpu_threads(2);
953        let config = create_state_root_config(factory, TrieInput::default());
954        let channel = channel();
955
956        MultiProofTask::new(config, executor, channel.0)
957    }
958
959    #[test]
960    fn test_add_proof_in_sequence() {
961        let mut sequencer = ProofSequencer::default();
962        let proof1 = MultiProof::default();
963        let proof2 = MultiProof::default();
964        sequencer.next_sequence = 2;
965
966        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
967        assert_eq!(ready.len(), 1);
968        assert!(!sequencer.has_pending());
969
970        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
971        assert_eq!(ready.len(), 1);
972        assert!(!sequencer.has_pending());
973    }
974
975    #[test]
976    fn test_add_proof_out_of_order() {
977        let mut sequencer = ProofSequencer::default();
978        let proof1 = MultiProof::default();
979        let proof2 = MultiProof::default();
980        let proof3 = MultiProof::default();
981        sequencer.next_sequence = 3;
982
983        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
984        assert_eq!(ready.len(), 0);
985        assert!(sequencer.has_pending());
986
987        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
988        assert_eq!(ready.len(), 1);
989        assert!(sequencer.has_pending());
990
991        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2));
992        assert_eq!(ready.len(), 2);
993        assert!(!sequencer.has_pending());
994    }
995
996    #[test]
997    fn test_add_proof_with_gaps() {
998        let mut sequencer = ProofSequencer::default();
999        let proof1 = MultiProof::default();
1000        let proof3 = MultiProof::default();
1001        sequencer.next_sequence = 3;
1002
1003        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1004        assert_eq!(ready.len(), 1);
1005
1006        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3));
1007        assert_eq!(ready.len(), 0);
1008        assert!(sequencer.has_pending());
1009    }
1010
1011    #[test]
1012    fn test_add_proof_duplicate_sequence() {
1013        let mut sequencer = ProofSequencer::default();
1014        let proof1 = MultiProof::default();
1015        let proof2 = MultiProof::default();
1016
1017        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1));
1018        assert_eq!(ready.len(), 1);
1019
1020        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2));
1021        assert_eq!(ready.len(), 0);
1022        assert!(!sequencer.has_pending());
1023    }
1024
1025    #[test]
1026    fn test_add_proof_batch_processing() {
1027        let mut sequencer = ProofSequencer::default();
1028        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1029        sequencer.next_sequence = 5;
1030
1031        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()));
1032        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()));
1033        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()));
1034        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()));
1035
1036        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()));
1037        assert_eq!(ready.len(), 5);
1038        assert!(!sequencer.has_pending());
1039    }
1040
1041    fn create_get_proof_targets_state() -> HashedPostState {
1042        let mut state = HashedPostState::default();
1043
1044        let addr1 = B256::random();
1045        let addr2 = B256::random();
1046        state.accounts.insert(addr1, Some(Default::default()));
1047        state.accounts.insert(addr2, Some(Default::default()));
1048
1049        let mut storage = HashedStorage::default();
1050        let slot1 = B256::random();
1051        let slot2 = B256::random();
1052        storage.storage.insert(slot1, U256::ZERO);
1053        storage.storage.insert(slot2, U256::from(1));
1054        state.storages.insert(addr1, storage);
1055
1056        state
1057    }
1058
1059    #[test]
1060    fn test_get_proof_targets_new_account_targets() {
1061        let state = create_get_proof_targets_state();
1062        let fetched = MultiProofTargets::default();
1063
1064        let targets = get_proof_targets(&state, &fetched);
1065
1066        // should return all accounts as targets since nothing was fetched before
1067        assert_eq!(targets.len(), state.accounts.len());
1068        for addr in state.accounts.keys() {
1069            assert!(targets.contains_key(addr));
1070        }
1071    }
1072
1073    #[test]
1074    fn test_get_proof_targets_new_storage_targets() {
1075        let state = create_get_proof_targets_state();
1076        let fetched = MultiProofTargets::default();
1077
1078        let targets = get_proof_targets(&state, &fetched);
1079
1080        // verify storage slots are included for accounts with storage
1081        for (addr, storage) in &state.storages {
1082            assert!(targets.contains_key(addr));
1083            let target_slots = &targets[addr];
1084            assert_eq!(target_slots.len(), storage.storage.len());
1085            for slot in storage.storage.keys() {
1086                assert!(target_slots.contains(slot));
1087            }
1088        }
1089    }
1090
1091    #[test]
1092    fn test_get_proof_targets_filter_already_fetched_accounts() {
1093        let state = create_get_proof_targets_state();
1094        let mut fetched = MultiProofTargets::default();
1095
1096        // select an account that has no storage updates
1097        let fetched_addr = state
1098            .accounts
1099            .keys()
1100            .find(|&&addr| !state.storages.contains_key(&addr))
1101            .expect("Should have an account without storage");
1102
1103        // mark the account as already fetched
1104        fetched.insert(*fetched_addr, HashSet::default());
1105
1106        let targets = get_proof_targets(&state, &fetched);
1107
1108        // should not include the already fetched account since it has no storage updates
1109        assert!(!targets.contains_key(fetched_addr));
1110        // other accounts should still be included
1111        assert_eq!(targets.len(), state.accounts.len() - 1);
1112    }
1113
1114    #[test]
1115    fn test_get_proof_targets_filter_already_fetched_storage() {
1116        let state = create_get_proof_targets_state();
1117        let mut fetched = MultiProofTargets::default();
1118
1119        // mark one storage slot as already fetched
1120        let (addr, storage) = state.storages.iter().next().unwrap();
1121        let mut fetched_slots = HashSet::default();
1122        let fetched_slot = *storage.storage.keys().next().unwrap();
1123        fetched_slots.insert(fetched_slot);
1124        fetched.insert(*addr, fetched_slots);
1125
1126        let targets = get_proof_targets(&state, &fetched);
1127
1128        // should not include the already fetched storage slot
1129        let target_slots = &targets[addr];
1130        assert!(!target_slots.contains(&fetched_slot));
1131        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1132    }
1133
1134    #[test]
1135    fn test_get_proof_targets_empty_state() {
1136        let state = HashedPostState::default();
1137        let fetched = MultiProofTargets::default();
1138
1139        let targets = get_proof_targets(&state, &fetched);
1140
1141        assert!(targets.is_empty());
1142    }
1143
1144    #[test]
1145    fn test_get_proof_targets_mixed_fetched_state() {
1146        let mut state = HashedPostState::default();
1147        let mut fetched = MultiProofTargets::default();
1148
1149        let addr1 = B256::random();
1150        let addr2 = B256::random();
1151        let slot1 = B256::random();
1152        let slot2 = B256::random();
1153
1154        state.accounts.insert(addr1, Some(Default::default()));
1155        state.accounts.insert(addr2, Some(Default::default()));
1156
1157        let mut storage = HashedStorage::default();
1158        storage.storage.insert(slot1, U256::ZERO);
1159        storage.storage.insert(slot2, U256::from(1));
1160        state.storages.insert(addr1, storage);
1161
1162        let mut fetched_slots = HashSet::default();
1163        fetched_slots.insert(slot1);
1164        fetched.insert(addr1, fetched_slots);
1165
1166        let targets = get_proof_targets(&state, &fetched);
1167
1168        assert!(targets.contains_key(&addr2));
1169        assert!(!targets[&addr1].contains(&slot1));
1170        assert!(targets[&addr1].contains(&slot2));
1171    }
1172
1173    #[test]
1174    fn test_get_proof_targets_unmodified_account_with_storage() {
1175        let mut state = HashedPostState::default();
1176        let fetched = MultiProofTargets::default();
1177
1178        let addr = B256::random();
1179        let slot1 = B256::random();
1180        let slot2 = B256::random();
1181
1182        // don't add the account to state.accounts (simulating unmodified account)
1183        // but add storage updates for this account
1184        let mut storage = HashedStorage::default();
1185        storage.storage.insert(slot1, U256::from(1));
1186        storage.storage.insert(slot2, U256::from(2));
1187        state.storages.insert(addr, storage);
1188
1189        assert!(!state.accounts.contains_key(&addr));
1190        assert!(!fetched.contains_key(&addr));
1191
1192        let targets = get_proof_targets(&state, &fetched);
1193
1194        // verify that we still get the storage slots for the unmodified account
1195        assert!(targets.contains_key(&addr));
1196
1197        let target_slots = &targets[&addr];
1198        assert_eq!(target_slots.len(), 2);
1199        assert!(target_slots.contains(&slot1));
1200        assert!(target_slots.contains(&slot2));
1201    }
1202
1203    #[test]
1204    fn test_get_prefetch_proof_targets_no_duplicates() {
1205        let test_provider_factory = create_test_provider_factory();
1206        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1207
1208        // populate some targets
1209        let mut targets = MultiProofTargets::default();
1210        let addr1 = B256::random();
1211        let addr2 = B256::random();
1212        let slot1 = B256::random();
1213        let slot2 = B256::random();
1214        targets.insert(addr1, vec![slot1].into_iter().collect());
1215        targets.insert(addr2, vec![slot2].into_iter().collect());
1216
1217        let prefetch_proof_targets =
1218            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1219
1220        // check that the prefetch proof targets are the same because there are no fetched proof
1221        // targets yet
1222        assert_eq!(prefetch_proof_targets, targets);
1223
1224        // add a different addr and slot to fetched proof targets
1225        let addr3 = B256::random();
1226        let slot3 = B256::random();
1227        test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect());
1228
1229        let prefetch_proof_targets =
1230            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1231
1232        // check that the prefetch proof targets are the same because the fetched proof targets
1233        // don't overlap with the prefetch targets
1234        assert_eq!(prefetch_proof_targets, targets);
1235    }
1236
1237    #[test]
1238    fn test_get_prefetch_proof_targets_remove_subset() {
1239        let test_provider_factory = create_test_provider_factory();
1240        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1241
1242        // populate some targe
1243        let mut targets = MultiProofTargets::default();
1244        let addr1 = B256::random();
1245        let addr2 = B256::random();
1246        let slot1 = B256::random();
1247        let slot2 = B256::random();
1248        targets.insert(addr1, vec![slot1].into_iter().collect());
1249        targets.insert(addr2, vec![slot2].into_iter().collect());
1250
1251        // add a subset of the first target to fetched proof targets
1252        test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect());
1253
1254        let prefetch_proof_targets =
1255            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1256
1257        // check that the prefetch proof targets do not include the subset
1258        assert_eq!(prefetch_proof_targets.len(), 1);
1259        assert!(!prefetch_proof_targets.contains_key(&addr1));
1260        assert!(prefetch_proof_targets.contains_key(&addr2));
1261
1262        // now add one more slot to the prefetch targets
1263        let slot3 = B256::random();
1264        targets.get_mut(&addr1).unwrap().insert(slot3);
1265
1266        let prefetch_proof_targets =
1267            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1268
1269        // check that the prefetch proof targets do not include the subset
1270        // but include the new slot
1271        assert_eq!(prefetch_proof_targets.len(), 2);
1272        assert!(prefetch_proof_targets.contains_key(&addr1));
1273        assert_eq!(
1274            *prefetch_proof_targets.get(&addr1).unwrap(),
1275            vec![slot3].into_iter().collect::<B256Set>()
1276        );
1277        assert!(prefetch_proof_targets.contains_key(&addr2));
1278        assert_eq!(
1279            *prefetch_proof_targets.get(&addr2).unwrap(),
1280            vec![slot2].into_iter().collect::<B256Set>()
1281        );
1282    }
1283}