reth_engine_util/
engine_store.rs
1use alloy_rpc_types_engine::ForkchoiceState;
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, ExecutionPayload};
6use reth_fs_util as fs;
7use reth_payload_primitives::PayloadTypes;
8use serde::{Deserialize, Serialize};
9use std::{
10 collections::BTreeMap,
11 path::PathBuf,
12 pin::Pin,
13 task::{ready, Context, Poll},
14 time::SystemTime,
15};
16use tracing::*;
17
18#[derive(Debug, Serialize, Deserialize)]
20#[serde(rename_all = "camelCase")]
21pub enum StoredEngineApiMessage<T: PayloadTypes> {
22 ForkchoiceUpdated {
24 state: ForkchoiceState,
26 payload_attrs: Option<T::PayloadAttributes>,
28 },
29 NewPayload {
31 #[serde(flatten)]
33 payload: T::ExecutionData,
34 },
35}
36
37#[derive(Debug)]
39pub struct EngineMessageStore {
40 path: PathBuf,
42}
43
44impl EngineMessageStore {
45 pub const fn new(path: PathBuf) -> Self {
49 Self { path }
50 }
51
52 pub fn on_message<T>(
55 &self,
56 msg: &BeaconEngineMessage<T>,
57 received_at: SystemTime,
58 ) -> eyre::Result<()>
59 where
60 T: PayloadTypes,
61 {
62 fs::create_dir_all(&self.path)?; let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
64 match msg {
65 BeaconEngineMessage::ForkchoiceUpdated {
66 state,
67 payload_attrs,
68 tx: _tx,
69 version: _version,
70 } => {
71 let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
72 fs::write(
73 self.path.join(filename),
74 serde_json::to_vec(&StoredEngineApiMessage::<T>::ForkchoiceUpdated {
75 state: *state,
76 payload_attrs: payload_attrs.clone(),
77 })?,
78 )?;
79 }
80 BeaconEngineMessage::NewPayload { payload, tx: _tx } => {
81 let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
82 fs::write(
83 self.path.join(filename),
84 serde_json::to_vec(&StoredEngineApiMessage::<T>::NewPayload {
85 payload: payload.clone(),
86 })?,
87 )?;
88 }
89 };
90 Ok(())
91 }
92
93 pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
95 let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
96 for entry in fs::read_dir(&self.path)? {
97 let entry = entry?;
98 let filename = entry.file_name();
99 if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
100 if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
101 filenames_by_ts.entry(timestamp).or_default().push(entry.path());
102 tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
103 } else {
104 tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
105 }
106 } else {
107 tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
108 }
109 }
110 Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
111 }
112}
113
114#[derive(Debug)]
117#[pin_project::pin_project]
118pub struct EngineStoreStream<S> {
119 #[pin]
121 stream: S,
122 store: EngineMessageStore,
124}
125
126impl<S> EngineStoreStream<S> {
127 pub const fn new(stream: S, path: PathBuf) -> Self {
129 Self { stream, store: EngineMessageStore::new(path) }
130 }
131}
132
133impl<S, T> Stream for EngineStoreStream<S>
134where
135 S: Stream<Item = BeaconEngineMessage<T>>,
136 T: PayloadTypes,
137{
138 type Item = S::Item;
139
140 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141 let mut this = self.project();
142 let next = ready!(this.stream.poll_next_unpin(cx));
143 if let Some(msg) = &next {
144 if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
145 error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
146 }
147 }
148 Poll::Ready(next)
149 }
150}