reth_trie_parallel/
state_root_task.rs1use crate::root::ParallelStateRootError;
4use alloy_eip7928::BlockAccessList;
5use alloy_primitives::{keccak256, B256};
6use derive_more::derive::Deref;
7use reth_trie::{updates::TrieUpdates, HashedPostState, HashedStorage, MultiProofTargetsV2};
8use revm_state::EvmState;
9use std::sync::Arc;
10use tracing::trace;
11
12#[derive(Debug)]
14pub enum StateRootMessage {
15 PrefetchProofs(MultiProofTargetsV2),
17 StateUpdate(EvmState),
19 HashedStateUpdate(HashedPostState),
21 BlockAccessList(Arc<BlockAccessList>),
26 FinishedStateUpdates,
31}
32
33#[derive(Debug, Clone)]
36pub struct StateRootComputeOutcome {
37 pub state_root: B256,
39 pub trie_updates: Arc<TrieUpdates>,
41 #[cfg(feature = "trie-debug")]
44 pub debug_recorders: Vec<(Option<B256>, reth_trie_sparse::debug_recorder::TrieDebugRecorder)>,
45}
46
47#[derive(Debug)]
55pub struct StateRootHandle {
56 cached_trie_state_root: B256,
58 updates_tx: crossbeam_channel::Sender<StateRootMessage>,
60 state_root_rx:
62 Option<std::sync::mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
63 hashed_state_rx: Option<std::sync::mpsc::Receiver<HashedPostState>>,
65}
66
67impl StateRootHandle {
68 pub const fn new(
70 cached_trie_state_root: B256,
71 updates_tx: crossbeam_channel::Sender<StateRootMessage>,
72 state_root_rx: std::sync::mpsc::Receiver<
73 Result<StateRootComputeOutcome, ParallelStateRootError>,
74 >,
75 hashed_state_rx: std::sync::mpsc::Receiver<HashedPostState>,
76 ) -> Self {
77 Self {
78 cached_trie_state_root,
79 updates_tx,
80 state_root_rx: Some(state_root_rx),
81 hashed_state_rx: Some(hashed_state_rx),
82 }
83 }
84
85 pub const fn cached_trie_state_root(&self) -> B256 {
87 self.cached_trie_state_root
88 }
89
90 pub const fn updates_tx(&self) -> &crossbeam_channel::Sender<StateRootMessage> {
92 &self.updates_tx
93 }
94
95 pub fn state_hook(&self) -> impl alloy_evm::block::OnStateHook {
99 let sender = StateHookSender::new(self.updates_tx.clone());
100
101 move |state: &EvmState| {
102 let _ = sender.send(StateRootMessage::StateUpdate(state.clone()));
103 }
104 }
105
106 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
112 self.state_root_rx
113 .take()
114 .expect("state_root already taken")
115 .recv()
116 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
117 }
118
119 pub const fn take_state_root_rx(
125 &mut self,
126 ) -> std::sync::mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
127 self.state_root_rx.take().expect("state_root already taken")
128 }
129
130 pub const fn take_hashed_state_rx(&mut self) -> std::sync::mpsc::Receiver<HashedPostState> {
136 self.hashed_state_rx.take().expect("hashed_state already taken")
137 }
138}
139
140#[derive(Deref, Debug)]
146pub struct StateHookSender(crossbeam_channel::Sender<StateRootMessage>);
147
148impl StateHookSender {
149 pub const fn new(inner: crossbeam_channel::Sender<StateRootMessage>) -> Self {
151 Self(inner)
152 }
153}
154
155impl Drop for StateHookSender {
156 fn drop(&mut self) {
157 let _ = self.0.send(StateRootMessage::FinishedStateUpdates);
159 }
160}
161
162pub fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
164 let mut hashed_state = HashedPostState::with_capacity(update.len());
165
166 for (address, account) in update {
167 if account.is_touched() {
168 let hashed_address = keccak256(address);
169 trace!(target: "trie::parallel::sparse", ?address, ?hashed_address, "Adding account to state update");
170
171 let destroyed = account.is_selfdestructed();
172 if account.info != account.original_info() {
173 let info = if destroyed { None } else { Some(account.info.into()) };
174 hashed_state.accounts.insert(hashed_address, info);
175 }
176
177 let mut changed_storage_iter = account
178 .storage
179 .into_iter()
180 .filter(|(_slot, value)| value.is_changed())
181 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
182 .peekable();
183
184 if destroyed {
185 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
186 } else if changed_storage_iter.peek().is_some() {
187 hashed_state
188 .storages
189 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
190 }
191 }
192 }
193
194 hashed_state
195}