reth_engine_tree/tree/payload_processor/
multiproof.rs1use alloy_evm::block::StateChangeSource;
4use alloy_primitives::{keccak256, B256};
5use crossbeam_channel::Sender as CrossbeamSender;
6use derive_more::derive::Deref;
7use metrics::{Gauge, Histogram};
8use reth_metrics::Metrics;
9use reth_revm::state::EvmState;
10use reth_trie::{HashedPostState, HashedStorage};
11use reth_trie_common::MultiProofTargetsV2;
12use std::sync::Arc;
13use tracing::trace;
14
15#[derive(Clone, Copy)]
17pub enum Source {
18 Evm(StateChangeSource),
20 BlockAccessList,
22}
23
24impl std::fmt::Debug for Source {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 Self::Evm(source) => source.fmt(f),
28 Self::BlockAccessList => f.write_str("BlockAccessList"),
29 }
30 }
31}
32
33impl From<StateChangeSource> for Source {
34 fn from(source: StateChangeSource) -> Self {
35 Self::Evm(source)
36 }
37}
38
39pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
42
43#[derive(Debug)]
45pub enum MultiProofMessage {
46 PrefetchProofs(MultiProofTargetsV2),
48 StateUpdate(Source, EvmState),
50 HashedStateUpdate(HashedPostState),
52 BlockAccessList(Arc<alloy_eip7928::BlockAccessList>),
57 FinishedStateUpdates,
62}
63
64#[derive(Deref, Debug)]
70pub struct StateHookSender(CrossbeamSender<MultiProofMessage>);
71
72impl StateHookSender {
73 pub const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
75 Self(inner)
76 }
77}
78
79impl Drop for StateHookSender {
80 fn drop(&mut self) {
81 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
83 }
84}
85
86pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
87 let mut hashed_state = HashedPostState::with_capacity(update.len());
88
89 for (address, account) in update {
90 if account.is_touched() {
91 let hashed_address = keccak256(address);
92 trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
93
94 let destroyed = account.is_selfdestructed();
95 let info = if destroyed { None } else { Some(account.info.into()) };
96 hashed_state.accounts.insert(hashed_address, info);
97
98 let mut changed_storage_iter = account
99 .storage
100 .into_iter()
101 .filter(|(_slot, value)| value.is_changed())
102 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
103 .peekable();
104
105 if destroyed {
106 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
107 } else if changed_storage_iter.peek().is_some() {
108 hashed_state
109 .storages
110 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
111 }
112 }
113 }
114
115 hashed_state
116}
117
118#[derive(Metrics, Clone)]
119#[metrics(scope = "tree.root")]
120pub(crate) struct MultiProofTaskMetrics {
121 pub sparse_trie_reveal_multiproof_duration_histogram: Histogram,
123 pub sparse_trie_proof_coalesce_duration_histogram: Histogram,
125 pub sparse_trie_channel_wait_duration_histogram: Histogram,
127 pub sparse_trie_process_updates_duration_histogram: Histogram,
129 pub sparse_trie_final_update_duration_histogram: Histogram,
131 pub sparse_trie_total_duration_histogram: Histogram,
133 pub into_trie_for_reuse_duration_histogram: Histogram,
135 pub sparse_trie_cache_wait_duration_histogram: Histogram,
137 pub sparse_trie_idle_time_seconds: Histogram,
140 pub hashing_task_idle_time_seconds: Histogram,
143
144 pub sparse_trie_account_cache_hits: Histogram,
146 pub sparse_trie_account_cache_misses: Histogram,
148 pub sparse_trie_storage_cache_hits: Histogram,
150 pub sparse_trie_storage_cache_misses: Histogram,
152
153 pub sparse_trie_retained_memory_bytes: Gauge,
155 pub sparse_trie_retained_storage_tries: Gauge,
157}
158
159#[allow(clippy::too_many_arguments)]
162pub(crate) fn dispatch_with_chunking<T, I>(
163 items: T,
164 chunking_len: usize,
165 chunk_size: usize,
166 max_targets_for_chunking: usize,
167 available_account_workers: usize,
168 available_storage_workers: usize,
169 chunker: impl FnOnce(T, usize) -> I,
170 mut dispatch: impl FnMut(T),
171) -> usize
172where
173 I: IntoIterator<Item = T>,
174{
175 let should_chunk = chunking_len > max_targets_for_chunking ||
176 available_account_workers > 1 ||
177 available_storage_workers > 1;
178
179 if should_chunk && chunking_len > chunk_size {
180 let mut num_chunks = 0usize;
181 for chunk in chunker(items, chunk_size) {
182 dispatch(chunk);
183 num_chunks += 1;
184 }
185 return num_chunks;
186 }
187
188 dispatch(items);
189 1
190}