reth_engine_tree/tree/root.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
//! State root task related functionality.
use futures::Stream;
use pin_project::pin_project;
use reth_provider::providers::ConsistentDbView;
use reth_trie::{updates::TrieUpdates, TrieInput};
use reth_trie_parallel::root::ParallelStateRootError;
use revm_primitives::{EvmState, B256};
use std::{
future::Future,
pin::Pin,
sync::{mpsc, Arc},
task::{Context, Poll},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
/// Result of the state root calculation
pub(crate) type StateRootResult = Result<(B256, TrieUpdates), ParallelStateRootError>;
/// Handle to a spawned state root task.
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct StateRootHandle {
/// Channel for receiving the final result.
rx: mpsc::Receiver<StateRootResult>,
}
#[allow(dead_code)]
impl StateRootHandle {
/// Waits for the state root calculation to complete.
pub(crate) fn wait_for_result(self) -> StateRootResult {
self.rx.recv().expect("state root task was dropped without sending result")
}
}
/// Standalone task that receives a transaction state stream and updates relevant
/// data structures to calculate state root.
///
/// It is responsible of initializing a blinded sparse trie and subscribe to
/// transaction state stream. As it receives transaction execution results, it
/// fetches the proofs for relevant accounts from the database and reveal them
/// to the tree.
/// Then it updates relevant leaves according to the result of the transaction.
#[pin_project]
pub(crate) struct StateRootTask<Factory> {
/// View over the state in the database.
consistent_view: ConsistentDbView<Factory>,
/// Incoming state updates.
#[pin]
state_stream: UnboundedReceiverStream<EvmState>,
/// Latest trie input.
input: Arc<TrieInput>,
}
#[allow(dead_code)]
impl<Factory> StateRootTask<Factory>
where
Factory: Send + 'static,
{
/// Creates a new `StateRootTask`.
pub(crate) const fn new(
consistent_view: ConsistentDbView<Factory>,
input: Arc<TrieInput>,
state_stream: UnboundedReceiverStream<EvmState>,
) -> Self {
Self { consistent_view, state_stream, input }
}
/// Spawns the state root task and returns a handle to await its result.
pub(crate) fn spawn(self) -> StateRootHandle {
let (tx, rx) = mpsc::channel();
// Spawn the task that will process state updates and calculate the root
tokio::spawn(async move {
debug!(target: "engine::tree", "Starting state root task");
let result = self.await;
let _ = tx.send(result);
});
StateRootHandle { rx }
}
/// Handles state updates.
fn on_state_update(
_view: &ConsistentDbView<Factory>,
_input: &Arc<TrieInput>,
_state: EvmState,
) {
// TODO: calculate hashed state update and dispatch proof gathering for it.
}
}
impl<Factory> Future for StateRootTask<Factory>
where
Factory: Send + 'static,
{
type Output = StateRootResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// Process all items until the stream is closed
loop {
match this.state_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(state)) => {
Self::on_state_update(this.consistent_view, this.input, state);
}
Poll::Ready(None) => {
// stream closed, return final result
return Poll::Ready(Ok((B256::default(), TrieUpdates::default())));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
// TODO:
// * keep track of proof calculation
// * keep track of intermediate root computation
// * return final state root result
}
}