reth_exex/wal/
storage.rs

1use std::{
2    fs::File,
3    ops::RangeInclusive,
4    path::{Path, PathBuf},
5};
6
7use crate::wal::{WalError, WalResult};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_exex_types::ExExNotification;
10use reth_node_api::NodePrimitives;
11use reth_tracing::tracing::debug;
12use tracing::instrument;
13
14static FILE_EXTENSION: &str = "wal";
15
16/// The underlying WAL storage backed by a directory of files.
17///
18/// Each notification is represented by a single file that contains a MessagePack-encoded
19/// notification.
20#[derive(Debug, Clone)]
21pub struct Storage<N: NodePrimitives = EthPrimitives> {
22    /// The path to the WAL file.
23    path: PathBuf,
24    _pd: std::marker::PhantomData<N>,
25}
26
27impl<N> Storage<N>
28where
29    N: NodePrimitives,
30{
31    /// Creates a new instance of [`Storage`] backed by the file at the given path and creates
32    /// it doesn't exist.
33    pub(super) fn new(path: impl AsRef<Path>) -> WalResult<Self> {
34        reth_fs_util::create_dir_all(&path)?;
35
36        Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData })
37    }
38
39    fn file_path(&self, id: u32) -> PathBuf {
40        self.path.join(format!("{id}.{FILE_EXTENSION}"))
41    }
42
43    fn parse_filename(filename: &str) -> WalResult<u32> {
44        filename
45            .strip_suffix(".wal")
46            .and_then(|s| s.parse().ok())
47            .ok_or_else(|| WalError::Parse(filename.to_string()))
48    }
49
50    /// Removes notification for the given file ID from the storage.
51    ///
52    /// # Returns
53    ///
54    /// The size of the file that was removed in bytes, if any.
55    #[instrument(skip(self))]
56    fn remove_notification(&self, file_id: u32) -> Option<u64> {
57        let path = self.file_path(file_id);
58        let size = path.metadata().ok()?.len();
59
60        match reth_fs_util::remove_file(self.file_path(file_id)) {
61            Ok(()) => {
62                debug!(target: "exex::wal::storage", "Notification was removed from the storage");
63                Some(size)
64            }
65            Err(err) => {
66                debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage");
67                None
68            }
69        }
70    }
71
72    /// Returns the range of file IDs in the storage.
73    ///
74    /// If there are no files in the storage, returns `None`.
75    pub(super) fn files_range(&self) -> WalResult<Option<RangeInclusive<u32>>> {
76        let mut min_id = None;
77        let mut max_id = None;
78
79        for entry in reth_fs_util::read_dir(&self.path)? {
80            let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?;
81
82            if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) {
83                let file_name = entry.file_name();
84                let file_id = Self::parse_filename(&file_name.to_string_lossy())?;
85
86                min_id = min_id.map_or(Some(file_id), |min_id: u32| Some(min_id.min(file_id)));
87                max_id = max_id.map_or(Some(file_id), |max_id: u32| Some(max_id.max(file_id)));
88            }
89        }
90
91        Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
92    }
93
94    /// Removes notifications from the storage according to the given list of file IDs.
95    ///
96    /// # Returns
97    ///
98    /// Number of removed notifications and the total size of the removed files in bytes.
99    pub(super) fn remove_notifications(
100        &self,
101        file_ids: impl IntoIterator<Item = u32>,
102    ) -> WalResult<(usize, u64)> {
103        let mut deleted_total = 0;
104        let mut deleted_size = 0;
105
106        for id in file_ids {
107            if let Some(size) = self.remove_notification(id) {
108                deleted_total += 1;
109                deleted_size += size;
110            }
111        }
112
113        Ok((deleted_total, deleted_size))
114    }
115
116    pub(super) fn iter_notifications(
117        &self,
118        range: RangeInclusive<u32>,
119    ) -> impl Iterator<Item = WalResult<(u32, u64, ExExNotification<N>)>> + '_ {
120        range.map(move |id| {
121            let (notification, size) =
122                self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?;
123
124            Ok((id, size, notification))
125        })
126    }
127
128    /// Reads the notification from the file with the given ID.
129    #[instrument(skip(self))]
130    pub(super) fn read_notification(
131        &self,
132        file_id: u32,
133    ) -> WalResult<Option<(ExExNotification<N>, u64)>> {
134        let file_path = self.file_path(file_id);
135        debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
136
137        let mut file = match File::open(&file_path) {
138            Ok(file) => file,
139            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
140            Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
141        };
142        let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();
143
144        // Deserialize using the bincode- and msgpack-compatible serde wrapper
145        let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
146            rmp_serde::decode::from_read(&mut file)
147                .map_err(|err| WalError::Decode(file_id, file_path, err))?;
148
149        Ok(Some((notification.into(), size)))
150    }
151
152    /// Writes the notification to the file with the given ID.
153    ///
154    /// # Returns
155    ///
156    /// The size of the file that was written in bytes.
157    #[instrument(skip(self, notification))]
158    pub(super) fn write_notification(
159        &self,
160        file_id: u32,
161        notification: &ExExNotification<N>,
162    ) -> WalResult<u64> {
163        let file_path = self.file_path(file_id);
164        debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
165
166        // Serialize using the bincode- and msgpack-compatible serde wrapper
167        let notification =
168            reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
169
170        reth_fs_util::atomic_write_file(&file_path, |file| {
171            rmp_serde::encode::write(file, &notification)
172        })?;
173
174        Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::Storage;
181    use reth_exex_types::ExExNotification;
182    use reth_provider::Chain;
183    use reth_testing_utils::generators::{self, random_block};
184    use std::{fs::File, sync::Arc};
185
186    // wal with 1 block and tx
187    // <https://github.com/paradigmxyz/reth/issues/15012>
188    #[test]
189    fn decode_notification_wal() {
190        let wal = include_bytes!("../../test-data/28.wal");
191        let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
192            '_,
193            reth_ethereum_primitives::EthPrimitives,
194        > = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
195        let notification: ExExNotification = notification.into();
196        match notification {
197            ExExNotification::ChainCommitted { new } => {
198                assert_eq!(new.blocks().len(), 1);
199                assert_eq!(new.tip().transaction_count(), 1);
200            }
201            _ => panic!("unexpected notification"),
202        }
203    }
204
205    #[test]
206    fn test_roundtrip() -> eyre::Result<()> {
207        let mut rng = generators::rng();
208
209        let temp_dir = tempfile::tempdir()?;
210        let storage: Storage = Storage::new(&temp_dir)?;
211
212        let old_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
213        let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
214
215        let notification = ExExNotification::ChainReorged {
216            new: Arc::new(Chain::new(vec![new_block], Default::default(), None)),
217            old: Arc::new(Chain::new(vec![old_block], Default::default(), None)),
218        };
219
220        // Do a round trip serialization and deserialization
221        let file_id = 0;
222        storage.write_notification(file_id, &notification)?;
223        let deserialized_notification = storage.read_notification(file_id)?;
224        assert_eq!(
225            deserialized_notification.map(|(notification, _)| notification),
226            Some(notification)
227        );
228
229        Ok(())
230    }
231
232    #[test]
233    fn test_files_range() -> eyre::Result<()> {
234        let temp_dir = tempfile::tempdir()?;
235        let storage: Storage = Storage::new(&temp_dir)?;
236
237        // Create WAL files
238        File::create(storage.file_path(1))?;
239        File::create(storage.file_path(2))?;
240        File::create(storage.file_path(3))?;
241
242        // Create non-WAL files that should be ignored
243        File::create(temp_dir.path().join("0.tmp"))?;
244        File::create(temp_dir.path().join("4.tmp"))?;
245
246        // Check files range
247        assert_eq!(storage.files_range()?, Some(1..=3));
248
249        Ok(())
250    }
251}