Skip to main content

reth_trie_parallel/
state_root_task.rs

1//! State root task interface types shared between the engine tree and the payload builder.
2
3use crate::root::ParallelStateRootError;
4use alloy_eip7928::BlockAccessList;
5use alloy_primitives::{keccak256, B256};
6use derive_more::derive::Deref;
7use reth_trie::{updates::TrieUpdates, HashedPostState, HashedStorage, MultiProofTargetsV2};
8use revm_state::EvmState;
9use std::sync::Arc;
10use tracing::trace;
11
12/// Messages used internally by the multi proof task.
13#[derive(Debug)]
14pub enum StateRootMessage {
15    /// Prefetch proof targets
16    PrefetchProofs(MultiProofTargetsV2),
17    /// New state update from transaction execution with its source
18    StateUpdate(EvmState),
19    /// Pre-hashed state update from BAL conversion that can be applied directly without proofs.
20    HashedStateUpdate(HashedPostState),
21    /// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
22    ///
23    /// When received, the task generates a single state update from the BAL and processes it.
24    /// No further messages are expected after receiving this variant.
25    BlockAccessList(Arc<BlockAccessList>),
26    /// Signals state update stream end.
27    ///
28    /// This is triggered by block execution, indicating that no additional state updates are
29    /// expected.
30    FinishedStateUpdates,
31}
32
33/// Outcome of the state root computation, including the state root itself with
34/// the trie updates.
35#[derive(Debug, Clone)]
36pub struct StateRootComputeOutcome {
37    /// The state root.
38    pub state_root: B256,
39    /// The trie updates.
40    pub trie_updates: Arc<TrieUpdates>,
41    /// Debug recorders taken from the sparse tries, keyed by `None` for account trie
42    /// and `Some(address)` for storage tries.
43    #[cfg(feature = "trie-debug")]
44    pub debug_recorders: Vec<(Option<B256>, reth_trie_sparse::debug_recorder::TrieDebugRecorder)>,
45}
46
47/// Handle to a background sparse trie state root computation.
48///
49/// Used by both the engine (during `newPayload`) and the payload builder (during `FCU`-triggered
50/// block building). Provides channels for streaming state updates into the pipeline and receiving
51/// the final computed state root.
52///
53/// Created by `PayloadProcessor::spawn_state_root`.
54#[derive(Debug)]
55pub struct StateRootHandle {
56    /// The state root that the cached sparse trie is anchored at (parent block's state root).
57    cached_trie_state_root: B256,
58    /// Channel for streaming state updates and proof targets into the sparse trie pipeline.
59    updates_tx: crossbeam_channel::Sender<StateRootMessage>,
60    /// Receiver for the final state root result.
61    state_root_rx:
62        Option<std::sync::mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
63    /// Receiver for the hashed post state.
64    hashed_state_rx: Option<std::sync::mpsc::Receiver<HashedPostState>>,
65}
66
67impl StateRootHandle {
68    /// Creates a new [`StateRootHandle`].
69    pub const fn new(
70        cached_trie_state_root: B256,
71        updates_tx: crossbeam_channel::Sender<StateRootMessage>,
72        state_root_rx: std::sync::mpsc::Receiver<
73            Result<StateRootComputeOutcome, ParallelStateRootError>,
74        >,
75        hashed_state_rx: std::sync::mpsc::Receiver<HashedPostState>,
76    ) -> Self {
77        Self {
78            cached_trie_state_root,
79            updates_tx,
80            state_root_rx: Some(state_root_rx),
81            hashed_state_rx: Some(hashed_state_rx),
82        }
83    }
84
85    /// Returns the state root that the cached sparse trie is anchored at.
86    pub const fn cached_trie_state_root(&self) -> B256 {
87        self.cached_trie_state_root
88    }
89
90    /// Returns a reference to the updates sender channel.
91    pub const fn updates_tx(&self) -> &crossbeam_channel::Sender<StateRootMessage> {
92        &self.updates_tx
93    }
94
95    /// Returns a state hook that streams state updates to the background state root task.
96    ///
97    /// The hook must be dropped after execution completes to signal the end of state updates.
98    pub fn state_hook(&self) -> impl alloy_evm::block::OnStateHook {
99        let sender = StateHookSender::new(self.updates_tx.clone());
100
101        move |state: &EvmState| {
102            let _ = sender.send(StateRootMessage::StateUpdate(state.clone()));
103        }
104    }
105
106    /// Awaits the state root computation result.
107    ///
108    /// # Panics
109    ///
110    /// If called more than once.
111    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
112        self.state_root_rx
113            .take()
114            .expect("state_root already taken")
115            .recv()
116            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
117    }
118
119    /// Takes the state root receiver for use with custom waiting logic (e.g., timeouts).
120    ///
121    /// # Panics
122    ///
123    /// If called more than once.
124    pub const fn take_state_root_rx(
125        &mut self,
126    ) -> std::sync::mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
127        self.state_root_rx.take().expect("state_root already taken")
128    }
129
130    /// Takes the hashed state receiver
131    ///
132    /// # Panics
133    ///
134    /// If called more than once.
135    pub const fn take_hashed_state_rx(&mut self) -> std::sync::mpsc::Receiver<HashedPostState> {
136        self.hashed_state_rx.take().expect("hashed_state already taken")
137    }
138}
139
140/// A wrapper for the sender that signals completion when dropped.
141///
142/// This type is intended to be used in combination with the evm executor statehook.
143/// This should trigger once the block has been executed (after) the last state update has been
144/// sent. This triggers the exit condition of the multi proof task.
145#[derive(Deref, Debug)]
146pub struct StateHookSender(crossbeam_channel::Sender<StateRootMessage>);
147
148impl StateHookSender {
149    /// Creates a new [`StateHookSender`] wrapping the given channel sender.
150    pub const fn new(inner: crossbeam_channel::Sender<StateRootMessage>) -> Self {
151        Self(inner)
152    }
153}
154
155impl Drop for StateHookSender {
156    fn drop(&mut self) {
157        // Send completion signal when the sender is dropped
158        let _ = self.0.send(StateRootMessage::FinishedStateUpdates);
159    }
160}
161
162/// Converts [`EvmState`] to [`HashedPostState`] by keccak256-hashing addresses and storage slots.
163pub fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
164    let mut hashed_state = HashedPostState::with_capacity(update.len());
165
166    for (address, account) in update {
167        if account.is_touched() {
168            let hashed_address = keccak256(address);
169            trace!(target: "trie::parallel::sparse", ?address, ?hashed_address, "Adding account to state update");
170
171            let destroyed = account.is_selfdestructed();
172            if account.info != account.original_info() {
173                let info = if destroyed { None } else { Some(account.info.into()) };
174                hashed_state.accounts.insert(hashed_address, info);
175            }
176
177            let mut changed_storage_iter = account
178                .storage
179                .into_iter()
180                .filter(|(_slot, value)| value.is_changed())
181                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
182                .peekable();
183
184            if destroyed {
185                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
186            } else if changed_storage_iter.peek().is_some() {
187                hashed_state
188                    .storages
189                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
190            }
191        }
192    }
193
194    hashed_state
195}