reth_trie_parallel/
state_root_task.rs1use crate::root::ParallelStateRootError;
4use alloy_eip7928::BlockAccessList;
5use alloy_evm::block::StateChangeSource;
6use alloy_primitives::{keccak256, B256};
7use derive_more::derive::Deref;
8use reth_trie::{updates::TrieUpdates, HashedPostState, HashedStorage, MultiProofTargetsV2};
9use revm_state::EvmState;
10use std::sync::Arc;
11use tracing::trace;
12
13#[derive(Clone, Copy)]
15pub enum Source {
16 Evm(StateChangeSource),
18 BlockAccessList,
20}
21
22impl std::fmt::Debug for Source {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 Self::Evm(source) => source.fmt(f),
26 Self::BlockAccessList => f.write_str("BlockAccessList"),
27 }
28 }
29}
30
31impl From<StateChangeSource> for Source {
32 fn from(source: StateChangeSource) -> Self {
33 Self::Evm(source)
34 }
35}
36
37#[derive(Debug)]
39pub enum StateRootMessage {
40 PrefetchProofs(MultiProofTargetsV2),
42 StateUpdate(Source, EvmState),
44 HashedStateUpdate(HashedPostState),
46 BlockAccessList(Arc<BlockAccessList>),
51 FinishedStateUpdates,
56}
57
58#[derive(Debug, Clone)]
61pub struct StateRootComputeOutcome {
62 pub state_root: B256,
64 pub trie_updates: Arc<TrieUpdates>,
66 #[cfg(feature = "trie-debug")]
69 pub debug_recorders: Vec<(Option<B256>, reth_trie_sparse::debug_recorder::TrieDebugRecorder)>,
70}
71
72#[derive(Debug)]
80pub struct StateRootHandle {
81 cached_trie_state_root: B256,
83 updates_tx: crossbeam_channel::Sender<StateRootMessage>,
85 state_root_rx:
87 Option<std::sync::mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
88}
89
90impl StateRootHandle {
91 pub const fn new(
93 cached_trie_state_root: B256,
94 updates_tx: crossbeam_channel::Sender<StateRootMessage>,
95 state_root_rx: std::sync::mpsc::Receiver<
96 Result<StateRootComputeOutcome, ParallelStateRootError>,
97 >,
98 ) -> Self {
99 Self { cached_trie_state_root, updates_tx, state_root_rx: Some(state_root_rx) }
100 }
101
102 pub const fn cached_trie_state_root(&self) -> B256 {
104 self.cached_trie_state_root
105 }
106
107 pub const fn updates_tx(&self) -> &crossbeam_channel::Sender<StateRootMessage> {
109 &self.updates_tx
110 }
111
112 pub fn state_hook(&self) -> impl alloy_evm::block::OnStateHook {
116 let sender = StateHookSender::new(self.updates_tx.clone());
117
118 move |source: StateChangeSource, state: &EvmState| {
119 let _ = sender.send(StateRootMessage::StateUpdate(source.into(), state.clone()));
120 }
121 }
122
123 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
129 self.state_root_rx
130 .take()
131 .expect("state_root already taken")
132 .recv()
133 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
134 }
135
136 pub const fn take_state_root_rx(
142 &mut self,
143 ) -> std::sync::mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
144 self.state_root_rx.take().expect("state_root already taken")
145 }
146}
147
148#[derive(Deref, Debug)]
154pub struct StateHookSender(crossbeam_channel::Sender<StateRootMessage>);
155
156impl StateHookSender {
157 pub const fn new(inner: crossbeam_channel::Sender<StateRootMessage>) -> Self {
159 Self(inner)
160 }
161}
162
163impl Drop for StateHookSender {
164 fn drop(&mut self) {
165 let _ = self.0.send(StateRootMessage::FinishedStateUpdates);
167 }
168}
169
170pub fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
172 let mut hashed_state = HashedPostState::with_capacity(update.len());
173
174 for (address, account) in update {
175 if account.is_touched() {
176 let hashed_address = keccak256(address);
177 trace!(target: "trie::parallel::sparse", ?address, ?hashed_address, "Adding account to state update");
178
179 let destroyed = account.is_selfdestructed();
180 let info = if destroyed { None } else { Some(account.info.into()) };
181 hashed_state.accounts.insert(hashed_address, info);
182
183 let mut changed_storage_iter = account
184 .storage
185 .into_iter()
186 .filter(|(_slot, value)| value.is_changed())
187 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
188 .peekable();
189
190 if destroyed {
191 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
192 } else if changed_storage_iter.peek().is_some() {
193 hashed_state
194 .storages
195 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
196 }
197 }
198 }
199
200 hashed_state
201}