reth_engine_util/
engine_store.rs
1use alloy_rpc_types_engine::ForkchoiceState;
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, EngineTypes, ExecutionPayload};
6use reth_fs_util as fs;
7use serde::{Deserialize, Serialize};
8use std::{
9 collections::BTreeMap,
10 path::PathBuf,
11 pin::Pin,
12 task::{ready, Context, Poll},
13 time::SystemTime,
14};
15use tracing::*;
16
17#[derive(Debug, Serialize, Deserialize)]
19#[serde(rename_all = "camelCase")]
20pub enum StoredEngineApiMessage<EngineT: EngineTypes> {
21 ForkchoiceUpdated {
23 state: ForkchoiceState,
25 payload_attrs: Option<EngineT::PayloadAttributes>,
27 },
28 NewPayload {
30 #[serde(flatten)]
32 payload: EngineT::ExecutionData,
33 },
34}
35
36#[derive(Debug)]
38pub struct EngineMessageStore {
39 path: PathBuf,
41}
42
43impl EngineMessageStore {
44 pub const fn new(path: PathBuf) -> Self {
48 Self { path }
49 }
50
51 pub fn on_message<Engine>(
54 &self,
55 msg: &BeaconEngineMessage<Engine>,
56 received_at: SystemTime,
57 ) -> eyre::Result<()>
58 where
59 Engine: EngineTypes,
60 {
61 fs::create_dir_all(&self.path)?; let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
63 match msg {
64 BeaconEngineMessage::ForkchoiceUpdated {
65 state,
66 payload_attrs,
67 tx: _tx,
68 version: _version,
69 } => {
70 let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
71 fs::write(
72 self.path.join(filename),
73 serde_json::to_vec(&StoredEngineApiMessage::<Engine>::ForkchoiceUpdated {
74 state: *state,
75 payload_attrs: payload_attrs.clone(),
76 })?,
77 )?;
78 }
79 BeaconEngineMessage::NewPayload { payload, tx: _tx } => {
80 let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
81 fs::write(
82 self.path.join(filename),
83 serde_json::to_vec(&StoredEngineApiMessage::<Engine>::NewPayload {
84 payload: payload.clone(),
85 })?,
86 )?;
87 }
88 BeaconEngineMessage::TransitionConfigurationExchanged => (),
90 };
91 Ok(())
92 }
93
94 pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
96 let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
97 for entry in fs::read_dir(&self.path)? {
98 let entry = entry?;
99 let filename = entry.file_name();
100 if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
101 if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
102 filenames_by_ts.entry(timestamp).or_default().push(entry.path());
103 tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
104 } else {
105 tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
106 }
107 } else {
108 tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
109 }
110 }
111 Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
112 }
113}
114
115#[derive(Debug)]
118#[pin_project::pin_project]
119pub struct EngineStoreStream<S> {
120 #[pin]
122 stream: S,
123 store: EngineMessageStore,
125}
126
127impl<S> EngineStoreStream<S> {
128 pub const fn new(stream: S, path: PathBuf) -> Self {
130 Self { stream, store: EngineMessageStore::new(path) }
131 }
132}
133
134impl<S, Engine> Stream for EngineStoreStream<S>
135where
136 S: Stream<Item = BeaconEngineMessage<Engine>>,
137 Engine: EngineTypes,
138{
139 type Item = S::Item;
140
141 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142 let mut this = self.project();
143 let next = ready!(this.stream.poll_next_unpin(cx));
144 if let Some(msg) = &next {
145 if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
146 error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
147 }
148 }
149 Poll::Ready(next)
150 }
151}