Skip to main content

reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use alloy_evm::block::StateChangeSource;
4use alloy_primitives::{keccak256, B256};
5use crossbeam_channel::Sender as CrossbeamSender;
6use derive_more::derive::Deref;
7use metrics::{Gauge, Histogram};
8use reth_metrics::Metrics;
9use reth_revm::state::EvmState;
10use reth_trie::{HashedPostState, HashedStorage};
11use reth_trie_common::MultiProofTargetsV2;
12use std::sync::Arc;
13use tracing::trace;
14
15/// Source of state changes, either from EVM execution or from a Block Access List.
16#[derive(Clone, Copy)]
17pub enum Source {
18    /// State changes from EVM execution.
19    Evm(StateChangeSource),
20    /// State changes from Block Access List (EIP-7928).
21    BlockAccessList,
22}
23
24impl std::fmt::Debug for Source {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Self::Evm(source) => source.fmt(f),
28            Self::BlockAccessList => f.write_str("BlockAccessList"),
29        }
30    }
31}
32
33impl From<StateChangeSource> for Source {
34    fn from(source: StateChangeSource) -> Self {
35        Self::Evm(source)
36    }
37}
38
39/// The default max targets, for limiting the number of account and storage proof targets to be
40/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
41pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
42
43/// Messages used internally by the multi proof task.
44#[derive(Debug)]
45pub enum MultiProofMessage {
46    /// Prefetch proof targets
47    PrefetchProofs(MultiProofTargetsV2),
48    /// New state update from transaction execution with its source
49    StateUpdate(Source, EvmState),
50    /// State update that can be applied to the sparse trie without any new proofs.
51    ///
52    /// It can be the case when all accounts and storage slots from the state update were already
53    /// fetched and revealed.
54    EmptyProof {
55        /// The index of this proof in the sequence of state updates
56        sequence_number: u64,
57        /// The state update that was used to calculate the proof
58        state: HashedPostState,
59    },
60    /// Pre-hashed state update from BAL conversion that can be applied directly without proofs.
61    HashedStateUpdate(HashedPostState),
62    /// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
63    ///
64    /// When received, the task generates a single state update from the BAL and processes it.
65    /// No further messages are expected after receiving this variant.
66    BlockAccessList(Arc<alloy_eip7928::BlockAccessList>),
67    /// Signals state update stream end.
68    ///
69    /// This is triggered by block execution, indicating that no additional state updates are
70    /// expected.
71    FinishedStateUpdates,
72}
73
74/// A wrapper for the sender that signals completion when dropped.
75///
76/// This type is intended to be used in combination with the evm executor statehook.
77/// This should trigger once the block has been executed (after) the last state update has been
78/// sent. This triggers the exit condition of the multi proof task.
79#[derive(Deref, Debug)]
80pub struct StateHookSender(CrossbeamSender<MultiProofMessage>);
81
82impl StateHookSender {
83    /// Creates a new [`StateHookSender`] wrapping the given channel sender.
84    pub const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
85        Self(inner)
86    }
87}
88
89impl Drop for StateHookSender {
90    fn drop(&mut self) {
91        // Send completion signal when the sender is dropped
92        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
93    }
94}
95
96pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
97    let mut hashed_state = HashedPostState::with_capacity(update.len());
98
99    for (address, account) in update {
100        if account.is_touched() {
101            let hashed_address = keccak256(address);
102            trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
103
104            let destroyed = account.is_selfdestructed();
105            let info = if destroyed { None } else { Some(account.info.into()) };
106            hashed_state.accounts.insert(hashed_address, info);
107
108            let mut changed_storage_iter = account
109                .storage
110                .into_iter()
111                .filter(|(_slot, value)| value.is_changed())
112                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
113                .peekable();
114
115            if destroyed {
116                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
117            } else if changed_storage_iter.peek().is_some() {
118                hashed_state
119                    .storages
120                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
121            }
122        }
123    }
124
125    hashed_state
126}
127
128#[derive(Metrics, Clone)]
129#[metrics(scope = "tree.root")]
130pub(crate) struct MultiProofTaskMetrics {
131    /// Histogram of active storage workers processing proofs.
132    pub active_storage_workers_histogram: Histogram,
133    /// Histogram of active account workers processing proofs.
134    pub active_account_workers_histogram: Histogram,
135    /// Gauge for the maximum number of storage workers in the pool.
136    pub max_storage_workers: Gauge,
137    /// Gauge for the maximum number of account workers in the pool.
138    pub max_account_workers: Gauge,
139    /// Histogram of pending storage multiproofs in the queue.
140    pub pending_storage_multiproofs_histogram: Histogram,
141    /// Histogram of pending account multiproofs in the queue.
142    pub pending_account_multiproofs_histogram: Histogram,
143
144    /// Histogram of the number of prefetch proof target accounts.
145    pub prefetch_proof_targets_accounts_histogram: Histogram,
146    /// Histogram of the number of prefetch proof target storages.
147    pub prefetch_proof_targets_storages_histogram: Histogram,
148    /// Histogram of the number of prefetch proof target chunks.
149    pub prefetch_proof_chunks_histogram: Histogram,
150
151    /// Histogram of the number of state update proof target accounts.
152    pub state_update_proof_targets_accounts_histogram: Histogram,
153    /// Histogram of the number of state update proof target storages.
154    pub state_update_proof_targets_storages_histogram: Histogram,
155    /// Histogram of the number of state update proof target chunks.
156    pub state_update_proof_chunks_histogram: Histogram,
157
158    /// Histogram of prefetch proof batch sizes (number of messages merged).
159    pub prefetch_batch_size_histogram: Histogram,
160
161    /// Histogram of proof calculation durations.
162    pub proof_calculation_duration_histogram: Histogram,
163
164    /// Histogram of sparse trie update durations.
165    pub sparse_trie_update_duration_histogram: Histogram,
166    /// Histogram of durations spent revealing multiproof results into the sparse trie.
167    pub sparse_trie_reveal_multiproof_duration_histogram: Histogram,
168    /// Histogram of durations spent coalescing multiple proof results from the channel.
169    pub sparse_trie_proof_coalesce_duration_histogram: Histogram,
170    /// Histogram of durations the event loop spent blocked waiting on channels.
171    pub sparse_trie_channel_wait_duration_histogram: Histogram,
172    /// Histogram of durations spent processing trie updates and promoting pending accounts.
173    pub sparse_trie_process_updates_duration_histogram: Histogram,
174    /// Histogram of sparse trie final update durations.
175    pub sparse_trie_final_update_duration_histogram: Histogram,
176    /// Histogram of sparse trie total durations.
177    pub sparse_trie_total_duration_histogram: Histogram,
178
179    /// Histogram of state updates received.
180    pub state_updates_received_histogram: Histogram,
181    /// Histogram of proofs processed.
182    pub proofs_processed_histogram: Histogram,
183    /// Histogram of total time spent in the multiproof task.
184    pub multiproof_task_total_duration_histogram: Histogram,
185    /// Total time spent waiting for the first state update or prefetch request.
186    pub first_update_wait_time_histogram: Histogram,
187    /// Total time spent waiting for the last proof result.
188    pub last_proof_wait_time_histogram: Histogram,
189    /// Time spent preparing the sparse trie for reuse after state root computation.
190    pub into_trie_for_reuse_duration_histogram: Histogram,
191    /// Time spent waiting for preserved sparse trie cache to become available.
192    pub sparse_trie_cache_wait_duration_histogram: Histogram,
193
194    /// Number of account leaf updates applied without needing a new proof (cache hits).
195    pub sparse_trie_account_cache_hits: Histogram,
196    /// Number of account leaf updates that required a new proof (cache misses).
197    pub sparse_trie_account_cache_misses: Histogram,
198    /// Number of storage leaf updates applied without needing a new proof (cache hits).
199    pub sparse_trie_storage_cache_hits: Histogram,
200    /// Number of storage leaf updates that required a new proof (cache misses).
201    pub sparse_trie_storage_cache_misses: Histogram,
202
203    /// Retained memory of the preserved sparse trie cache in bytes.
204    pub sparse_trie_retained_memory_bytes: Gauge,
205    /// Number of storage tries retained in the preserved sparse trie cache.
206    pub sparse_trie_retained_storage_tries: Gauge,
207}
208
209/// Dispatches work items as a single unit or in chunks based on target size and worker
210/// availability.
211#[allow(clippy::too_many_arguments)]
212pub(crate) fn dispatch_with_chunking<T, I>(
213    items: T,
214    chunking_len: usize,
215    chunk_size: usize,
216    max_targets_for_chunking: usize,
217    available_account_workers: usize,
218    available_storage_workers: usize,
219    chunker: impl FnOnce(T, usize) -> I,
220    mut dispatch: impl FnMut(T),
221) -> usize
222where
223    I: IntoIterator<Item = T>,
224{
225    let should_chunk = chunking_len > max_targets_for_chunking ||
226        available_account_workers > 1 ||
227        available_storage_workers > 1;
228
229    if should_chunk && chunking_len > chunk_size {
230        let mut num_chunks = 0usize;
231        for chunk in chunker(items, chunk_size) {
232            dispatch(chunk);
233            num_chunks += 1;
234        }
235        return num_chunks;
236    }
237
238    dispatch(items);
239    1
240}