reth_engine_tree/tree/payload_processor/
multiproof.rs1use alloy_evm::block::StateChangeSource;
4use alloy_primitives::{keccak256, B256};
5use crossbeam_channel::Sender as CrossbeamSender;
6use derive_more::derive::Deref;
7use metrics::{Gauge, Histogram};
8use reth_metrics::Metrics;
9use reth_revm::state::EvmState;
10use reth_trie::{HashedPostState, HashedStorage};
11use reth_trie_common::MultiProofTargetsV2;
12use std::sync::Arc;
13use tracing::trace;
14
15#[derive(Clone, Copy)]
17pub enum Source {
18 Evm(StateChangeSource),
20 BlockAccessList,
22}
23
24impl std::fmt::Debug for Source {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 Self::Evm(source) => source.fmt(f),
28 Self::BlockAccessList => f.write_str("BlockAccessList"),
29 }
30 }
31}
32
33impl From<StateChangeSource> for Source {
34 fn from(source: StateChangeSource) -> Self {
35 Self::Evm(source)
36 }
37}
38
39pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
42
43#[derive(Debug)]
45pub enum MultiProofMessage {
46 PrefetchProofs(MultiProofTargetsV2),
48 StateUpdate(Source, EvmState),
50 EmptyProof {
55 sequence_number: u64,
57 state: HashedPostState,
59 },
60 HashedStateUpdate(HashedPostState),
62 BlockAccessList(Arc<alloy_eip7928::BlockAccessList>),
67 FinishedStateUpdates,
72}
73
74#[derive(Deref, Debug)]
80pub struct StateHookSender(CrossbeamSender<MultiProofMessage>);
81
82impl StateHookSender {
83 pub const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
85 Self(inner)
86 }
87}
88
89impl Drop for StateHookSender {
90 fn drop(&mut self) {
91 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
93 }
94}
95
96pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
97 let mut hashed_state = HashedPostState::with_capacity(update.len());
98
99 for (address, account) in update {
100 if account.is_touched() {
101 let hashed_address = keccak256(address);
102 trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
103
104 let destroyed = account.is_selfdestructed();
105 let info = if destroyed { None } else { Some(account.info.into()) };
106 hashed_state.accounts.insert(hashed_address, info);
107
108 let mut changed_storage_iter = account
109 .storage
110 .into_iter()
111 .filter(|(_slot, value)| value.is_changed())
112 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
113 .peekable();
114
115 if destroyed {
116 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
117 } else if changed_storage_iter.peek().is_some() {
118 hashed_state
119 .storages
120 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
121 }
122 }
123 }
124
125 hashed_state
126}
127
128#[derive(Metrics, Clone)]
129#[metrics(scope = "tree.root")]
130pub(crate) struct MultiProofTaskMetrics {
131 pub active_storage_workers_histogram: Histogram,
133 pub active_account_workers_histogram: Histogram,
135 pub max_storage_workers: Gauge,
137 pub max_account_workers: Gauge,
139 pub pending_storage_multiproofs_histogram: Histogram,
141 pub pending_account_multiproofs_histogram: Histogram,
143
144 pub prefetch_proof_targets_accounts_histogram: Histogram,
146 pub prefetch_proof_targets_storages_histogram: Histogram,
148 pub prefetch_proof_chunks_histogram: Histogram,
150
151 pub state_update_proof_targets_accounts_histogram: Histogram,
153 pub state_update_proof_targets_storages_histogram: Histogram,
155 pub state_update_proof_chunks_histogram: Histogram,
157
158 pub prefetch_batch_size_histogram: Histogram,
160
161 pub proof_calculation_duration_histogram: Histogram,
163
164 pub sparse_trie_update_duration_histogram: Histogram,
166 pub sparse_trie_reveal_multiproof_duration_histogram: Histogram,
168 pub sparse_trie_proof_coalesce_duration_histogram: Histogram,
170 pub sparse_trie_channel_wait_duration_histogram: Histogram,
172 pub sparse_trie_process_updates_duration_histogram: Histogram,
174 pub sparse_trie_final_update_duration_histogram: Histogram,
176 pub sparse_trie_total_duration_histogram: Histogram,
178
179 pub state_updates_received_histogram: Histogram,
181 pub proofs_processed_histogram: Histogram,
183 pub multiproof_task_total_duration_histogram: Histogram,
185 pub first_update_wait_time_histogram: Histogram,
187 pub last_proof_wait_time_histogram: Histogram,
189 pub into_trie_for_reuse_duration_histogram: Histogram,
191 pub sparse_trie_cache_wait_duration_histogram: Histogram,
193
194 pub sparse_trie_account_cache_hits: Histogram,
196 pub sparse_trie_account_cache_misses: Histogram,
198 pub sparse_trie_storage_cache_hits: Histogram,
200 pub sparse_trie_storage_cache_misses: Histogram,
202
203 pub sparse_trie_retained_memory_bytes: Gauge,
205 pub sparse_trie_retained_storage_tries: Gauge,
207}
208
209#[allow(clippy::too_many_arguments)]
212pub(crate) fn dispatch_with_chunking<T, I>(
213 items: T,
214 chunking_len: usize,
215 chunk_size: usize,
216 max_targets_for_chunking: usize,
217 available_account_workers: usize,
218 available_storage_workers: usize,
219 chunker: impl FnOnce(T, usize) -> I,
220 mut dispatch: impl FnMut(T),
221) -> usize
222where
223 I: IntoIterator<Item = T>,
224{
225 let should_chunk = chunking_len > max_targets_for_chunking ||
226 available_account_workers > 1 ||
227 available_storage_workers > 1;
228
229 if should_chunk && chunking_len > chunk_size {
230 let mut num_chunks = 0usize;
231 for chunk in chunker(items, chunk_size) {
232 dispatch(chunk);
233 num_chunks += 1;
234 }
235 return num_chunks;
236 }
237
238 dispatch(items);
239 1
240}