Skip to main content

reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use alloy_evm::block::StateChangeSource;
4use alloy_primitives::{keccak256, B256};
5use crossbeam_channel::Sender as CrossbeamSender;
6use derive_more::derive::Deref;
7use metrics::{Gauge, Histogram};
8use reth_metrics::Metrics;
9use reth_revm::state::EvmState;
10use reth_trie::{HashedPostState, HashedStorage};
11use reth_trie_common::MultiProofTargetsV2;
12use std::sync::Arc;
13use tracing::trace;
14
15/// Source of state changes, either from EVM execution or from a Block Access List.
16#[derive(Clone, Copy)]
17pub enum Source {
18    /// State changes from EVM execution.
19    Evm(StateChangeSource),
20    /// State changes from Block Access List (EIP-7928).
21    BlockAccessList,
22}
23
24impl std::fmt::Debug for Source {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Self::Evm(source) => source.fmt(f),
28            Self::BlockAccessList => f.write_str("BlockAccessList"),
29        }
30    }
31}
32
33impl From<StateChangeSource> for Source {
34    fn from(source: StateChangeSource) -> Self {
35        Self::Evm(source)
36    }
37}
38
39/// The default max targets, for limiting the number of account and storage proof targets to be
40/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
41pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
42
43/// Messages used internally by the multi proof task.
44#[derive(Debug)]
45pub enum MultiProofMessage {
46    /// Prefetch proof targets
47    PrefetchProofs(MultiProofTargetsV2),
48    /// New state update from transaction execution with its source
49    StateUpdate(Source, EvmState),
50    /// Pre-hashed state update from BAL conversion that can be applied directly without proofs.
51    HashedStateUpdate(HashedPostState),
52    /// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
53    ///
54    /// When received, the task generates a single state update from the BAL and processes it.
55    /// No further messages are expected after receiving this variant.
56    BlockAccessList(Arc<alloy_eip7928::BlockAccessList>),
57    /// Signals state update stream end.
58    ///
59    /// This is triggered by block execution, indicating that no additional state updates are
60    /// expected.
61    FinishedStateUpdates,
62}
63
64/// A wrapper for the sender that signals completion when dropped.
65///
66/// This type is intended to be used in combination with the evm executor statehook.
67/// This should trigger once the block has been executed (after) the last state update has been
68/// sent. This triggers the exit condition of the multi proof task.
69#[derive(Deref, Debug)]
70pub struct StateHookSender(CrossbeamSender<MultiProofMessage>);
71
72impl StateHookSender {
73    /// Creates a new [`StateHookSender`] wrapping the given channel sender.
74    pub const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
75        Self(inner)
76    }
77}
78
79impl Drop for StateHookSender {
80    fn drop(&mut self) {
81        // Send completion signal when the sender is dropped
82        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
83    }
84}
85
86pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
87    let mut hashed_state = HashedPostState::with_capacity(update.len());
88
89    for (address, account) in update {
90        if account.is_touched() {
91            let hashed_address = keccak256(address);
92            trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
93
94            let destroyed = account.is_selfdestructed();
95            let info = if destroyed { None } else { Some(account.info.into()) };
96            hashed_state.accounts.insert(hashed_address, info);
97
98            let mut changed_storage_iter = account
99                .storage
100                .into_iter()
101                .filter(|(_slot, value)| value.is_changed())
102                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
103                .peekable();
104
105            if destroyed {
106                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
107            } else if changed_storage_iter.peek().is_some() {
108                hashed_state
109                    .storages
110                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
111            }
112        }
113    }
114
115    hashed_state
116}
117
118#[derive(Metrics, Clone)]
119#[metrics(scope = "tree.root")]
120pub(crate) struct MultiProofTaskMetrics {
121    /// Histogram of durations spent revealing multiproof results into the sparse trie.
122    pub sparse_trie_reveal_multiproof_duration_histogram: Histogram,
123    /// Histogram of durations spent coalescing multiple proof results from the channel.
124    pub sparse_trie_proof_coalesce_duration_histogram: Histogram,
125    /// Histogram of durations the event loop spent blocked waiting on channels.
126    pub sparse_trie_channel_wait_duration_histogram: Histogram,
127    /// Histogram of durations spent processing trie updates and promoting pending accounts.
128    pub sparse_trie_process_updates_duration_histogram: Histogram,
129    /// Histogram of sparse trie final update durations.
130    pub sparse_trie_final_update_duration_histogram: Histogram,
131    /// Histogram of sparse trie total durations.
132    pub sparse_trie_total_duration_histogram: Histogram,
133    /// Time spent preparing the sparse trie for reuse after state root computation.
134    pub into_trie_for_reuse_duration_histogram: Histogram,
135    /// Time spent waiting for preserved sparse trie cache to become available.
136    pub sparse_trie_cache_wait_duration_histogram: Histogram,
137    /// Histogram for sparse trie task idle time in seconds (waiting for updates or proof
138    /// results). Excludes the final wait after the channel is closed.
139    pub sparse_trie_idle_time_seconds: Histogram,
140    /// Histogram for hashing task idle time in seconds (waiting for messages from execution).
141    /// Excludes the final wait after the channel is closed.
142    pub hashing_task_idle_time_seconds: Histogram,
143
144    /// Number of account leaf updates applied without needing a new proof (cache hits).
145    pub sparse_trie_account_cache_hits: Histogram,
146    /// Number of account leaf updates that required a new proof (cache misses).
147    pub sparse_trie_account_cache_misses: Histogram,
148    /// Number of storage leaf updates applied without needing a new proof (cache hits).
149    pub sparse_trie_storage_cache_hits: Histogram,
150    /// Number of storage leaf updates that required a new proof (cache misses).
151    pub sparse_trie_storage_cache_misses: Histogram,
152
153    /// Retained memory of the preserved sparse trie cache in bytes.
154    pub sparse_trie_retained_memory_bytes: Gauge,
155    /// Number of storage tries retained in the preserved sparse trie cache.
156    pub sparse_trie_retained_storage_tries: Gauge,
157}
158
159/// Dispatches work items as a single unit or in chunks based on target size and worker
160/// availability.
161#[allow(clippy::too_many_arguments)]
162pub(crate) fn dispatch_with_chunking<T, I>(
163    items: T,
164    chunking_len: usize,
165    chunk_size: usize,
166    max_targets_for_chunking: usize,
167    available_account_workers: usize,
168    available_storage_workers: usize,
169    chunker: impl FnOnce(T, usize) -> I,
170    mut dispatch: impl FnMut(T),
171) -> usize
172where
173    I: IntoIterator<Item = T>,
174{
175    let should_chunk = chunking_len > max_targets_for_chunking ||
176        available_account_workers > 1 ||
177        available_storage_workers > 1;
178
179    if should_chunk && chunking_len > chunk_size {
180        let mut num_chunks = 0usize;
181        for chunk in chunker(items, chunk_size) {
182            dispatch(chunk);
183            num_chunks += 1;
184        }
185        return num_chunks;
186    }
187
188    dispatch(items);
189    1
190}