reth_engine_util/
engine_store.rsuse alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState};
use futures::{Stream, StreamExt};
use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
use reth_fs_util as fs;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
path::PathBuf,
pin::Pin,
task::{ready, Context, Poll},
time::SystemTime,
};
use tracing::*;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum StoredEngineApiMessage<Attributes> {
ForkchoiceUpdated {
state: ForkchoiceState,
payload_attrs: Option<Attributes>,
},
NewPayload {
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
},
}
#[derive(Debug)]
pub struct EngineMessageStore {
path: PathBuf,
}
impl EngineMessageStore {
pub const fn new(path: PathBuf) -> Self {
Self { path }
}
pub fn on_message<Engine>(
&self,
msg: &BeaconEngineMessage<Engine>,
received_at: SystemTime,
) -> eyre::Result<()>
where
Engine: EngineTypes,
{
fs::create_dir_all(&self.path)?; let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
match msg {
BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx: _tx,
version: _version,
} => {
let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
fs::write(
self.path.join(filename),
serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated {
state: *state,
payload_attrs: payload_attrs.clone(),
})?,
)?;
}
BeaconEngineMessage::NewPayload { payload, sidecar, tx: _tx } => {
let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
fs::write(
self.path.join(filename),
serde_json::to_vec(
&StoredEngineApiMessage::<Engine::PayloadAttributes>::NewPayload {
payload: payload.clone(),
sidecar: sidecar.clone(),
},
)?,
)?;
}
BeaconEngineMessage::TransitionConfigurationExchanged => (),
};
Ok(())
}
pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
for entry in fs::read_dir(&self.path)? {
let entry = entry?;
let filename = entry.file_name();
if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
filenames_by_ts.entry(timestamp).or_default().push(entry.path());
tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
} else {
tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
}
} else {
tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
}
}
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
}
}
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineStoreStream<S> {
#[pin]
stream: S,
store: EngineMessageStore,
}
impl<S> EngineStoreStream<S> {
pub const fn new(stream: S, path: PathBuf) -> Self {
Self { stream, store: EngineMessageStore::new(path) }
}
}
impl<S, Engine> Stream for EngineStoreStream<S>
where
S: Stream<Item = BeaconEngineMessage<Engine>>,
Engine: EngineTypes,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let next = ready!(this.stream.poll_next_unpin(cx));
if let Some(msg) = &next {
if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
}
}
Poll::Ready(next)
}
}