reth_engine_util/
engine_store.rs

1//! Stores engine API messages to disk for later inspection and replay.
2
3use alloy_rpc_types_engine::ForkchoiceState;
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, ExecutionPayload};
6use reth_fs_util as fs;
7use reth_payload_primitives::PayloadTypes;
8use serde::{Deserialize, Serialize};
9use std::{
10    collections::BTreeMap,
11    path::PathBuf,
12    pin::Pin,
13    task::{ready, Context, Poll},
14    time::SystemTime,
15};
16use tracing::*;
17
18/// A message from the engine API that has been stored to disk.
19#[derive(Debug, Serialize, Deserialize)]
20#[serde(rename_all = "camelCase")]
21pub enum StoredEngineApiMessage<T: PayloadTypes> {
22    /// The on-disk representation of an `engine_forkchoiceUpdated` method call.
23    ForkchoiceUpdated {
24        /// The [`ForkchoiceState`] sent in the persisted call.
25        state: ForkchoiceState,
26        /// The payload attributes sent in the persisted call, if any.
27        payload_attrs: Option<T::PayloadAttributes>,
28    },
29    /// The on-disk representation of an `engine_newPayload` method call.
30    NewPayload {
31        /// The [`PayloadTypes::ExecutionData`] sent in the persisted call.
32        #[serde(flatten)]
33        payload: T::ExecutionData,
34    },
35}
36
37/// This can read and write engine API messages in a specific directory.
38#[derive(Debug)]
39pub struct EngineMessageStore {
40    /// The path to the directory that stores the engine API messages.
41    path: PathBuf,
42}
43
44impl EngineMessageStore {
45    /// Creates a new [`EngineMessageStore`] at the given path.
46    ///
47    /// The path is expected to be a directory, where individual message JSON files will be stored.
48    pub const fn new(path: PathBuf) -> Self {
49        Self { path }
50    }
51
52    /// Stores the received [`BeaconEngineMessage`] to disk, appending the `received_at` time to the
53    /// path.
54    pub fn on_message<T>(
55        &self,
56        msg: &BeaconEngineMessage<T>,
57        received_at: SystemTime,
58    ) -> eyre::Result<()>
59    where
60        T: PayloadTypes,
61    {
62        fs::create_dir_all(&self.path)?; // ensure that store path had been created
63        let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
64        match msg {
65            BeaconEngineMessage::ForkchoiceUpdated {
66                state,
67                payload_attrs,
68                tx: _tx,
69                version: _version,
70            } => {
71                let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
72                fs::write(
73                    self.path.join(filename),
74                    serde_json::to_vec(&StoredEngineApiMessage::<T>::ForkchoiceUpdated {
75                        state: *state,
76                        payload_attrs: payload_attrs.clone(),
77                    })?,
78                )?;
79            }
80            BeaconEngineMessage::NewPayload { payload, tx: _tx } => {
81                let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
82                fs::write(
83                    self.path.join(filename),
84                    serde_json::to_vec(&StoredEngineApiMessage::<T>::NewPayload {
85                        payload: payload.clone(),
86                    })?,
87                )?;
88            }
89        };
90        Ok(())
91    }
92
93    /// Finds and iterates through any stored engine API message files, ordered by timestamp.
94    pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
95        let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
96        for entry in fs::read_dir(&self.path)? {
97            let entry = entry?;
98            let filename = entry.file_name();
99            if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
100                if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
101                    filenames_by_ts.entry(timestamp).or_default().push(entry.path());
102                    tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
103                } else {
104                    tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
105                }
106            } else {
107                tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
108            }
109        }
110        Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
111    }
112}
113
114/// A wrapper stream that stores Engine API messages in
115/// the specified directory.
116#[derive(Debug)]
117#[pin_project::pin_project]
118pub struct EngineStoreStream<S> {
119    /// Inner message stream.
120    #[pin]
121    stream: S,
122    /// Engine message store.
123    store: EngineMessageStore,
124}
125
126impl<S> EngineStoreStream<S> {
127    /// Create new engine store stream wrapper.
128    pub const fn new(stream: S, path: PathBuf) -> Self {
129        Self { stream, store: EngineMessageStore::new(path) }
130    }
131}
132
133impl<S, T> Stream for EngineStoreStream<S>
134where
135    S: Stream<Item = BeaconEngineMessage<T>>,
136    T: PayloadTypes,
137{
138    type Item = S::Item;
139
140    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141        let mut this = self.project();
142        let next = ready!(this.stream.poll_next_unpin(cx));
143        if let Some(msg) = &next {
144            if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
145                error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
146            }
147        }
148        Poll::Ready(next)
149    }
150}