reth_exex/wal/
storage.rs
1use std::{
2 fs::File,
3 ops::RangeInclusive,
4 path::{Path, PathBuf},
5};
6
7use crate::wal::{WalError, WalResult};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_exex_types::ExExNotification;
10use reth_node_api::NodePrimitives;
11use reth_tracing::tracing::debug;
12use tracing::instrument;
13
14static FILE_EXTENSION: &str = "wal";
15
16#[derive(Debug, Clone)]
21pub struct Storage<N: NodePrimitives = EthPrimitives> {
22 path: PathBuf,
24 _pd: std::marker::PhantomData<N>,
25}
26
27impl<N> Storage<N>
28where
29 N: NodePrimitives,
30{
31 pub(super) fn new(path: impl AsRef<Path>) -> WalResult<Self> {
34 reth_fs_util::create_dir_all(&path)?;
35
36 Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData })
37 }
38
39 fn file_path(&self, id: u32) -> PathBuf {
40 self.path.join(format!("{id}.{FILE_EXTENSION}"))
41 }
42
43 fn parse_filename(filename: &str) -> WalResult<u32> {
44 filename
45 .strip_suffix(".wal")
46 .and_then(|s| s.parse().ok())
47 .ok_or_else(|| WalError::Parse(filename.to_string()))
48 }
49
50 #[instrument(skip(self))]
56 fn remove_notification(&self, file_id: u32) -> Option<u64> {
57 let path = self.file_path(file_id);
58 let size = path.metadata().ok()?.len();
59
60 match reth_fs_util::remove_file(self.file_path(file_id)) {
61 Ok(()) => {
62 debug!(target: "exex::wal::storage", "Notification was removed from the storage");
63 Some(size)
64 }
65 Err(err) => {
66 debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage");
67 None
68 }
69 }
70 }
71
72 pub(super) fn files_range(&self) -> WalResult<Option<RangeInclusive<u32>>> {
76 let mut min_id = None;
77 let mut max_id = None;
78
79 for entry in reth_fs_util::read_dir(&self.path)? {
80 let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?;
81
82 if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) {
83 let file_name = entry.file_name();
84 let file_id = Self::parse_filename(&file_name.to_string_lossy())?;
85
86 min_id = min_id.map_or(Some(file_id), |min_id: u32| Some(min_id.min(file_id)));
87 max_id = max_id.map_or(Some(file_id), |max_id: u32| Some(max_id.max(file_id)));
88 }
89 }
90
91 Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
92 }
93
94 pub(super) fn remove_notifications(
100 &self,
101 file_ids: impl IntoIterator<Item = u32>,
102 ) -> WalResult<(usize, u64)> {
103 let mut deleted_total = 0;
104 let mut deleted_size = 0;
105
106 for id in file_ids {
107 if let Some(size) = self.remove_notification(id) {
108 deleted_total += 1;
109 deleted_size += size;
110 }
111 }
112
113 Ok((deleted_total, deleted_size))
114 }
115
116 pub(super) fn iter_notifications(
117 &self,
118 range: RangeInclusive<u32>,
119 ) -> impl Iterator<Item = WalResult<(u32, u64, ExExNotification<N>)>> + '_ {
120 range.map(move |id| {
121 let (notification, size) =
122 self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?;
123
124 Ok((id, size, notification))
125 })
126 }
127
128 #[instrument(skip(self))]
130 pub(super) fn read_notification(
131 &self,
132 file_id: u32,
133 ) -> WalResult<Option<(ExExNotification<N>, u64)>> {
134 let file_path = self.file_path(file_id);
135 debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
136
137 let mut file = match File::open(&file_path) {
138 Ok(file) => file,
139 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
140 Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
141 };
142 let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();
143
144 let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
146 rmp_serde::decode::from_read(&mut file)
147 .map_err(|err| WalError::Decode(file_id, file_path, err))?;
148
149 Ok(Some((notification.into(), size)))
150 }
151
152 #[instrument(skip(self, notification))]
158 pub(super) fn write_notification(
159 &self,
160 file_id: u32,
161 notification: &ExExNotification<N>,
162 ) -> WalResult<u64> {
163 let file_path = self.file_path(file_id);
164 debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
165
166 let notification =
168 reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
169
170 reth_fs_util::atomic_write_file(&file_path, |file| {
171 rmp_serde::encode::write(file, ¬ification)
172 })?;
173
174 Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::Storage;
181 use reth_exex_types::ExExNotification;
182 use reth_provider::Chain;
183 use reth_testing_utils::generators::{self, random_block};
184 use std::{fs::File, sync::Arc};
185
186 #[test]
189 fn decode_notification_wal() {
190 let wal = include_bytes!("../../test-data/28.wal");
191 let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
192 '_,
193 reth_ethereum_primitives::EthPrimitives,
194 > = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
195 let notification: ExExNotification = notification.into();
196 match notification {
197 ExExNotification::ChainCommitted { new } => {
198 assert_eq!(new.blocks().len(), 1);
199 assert_eq!(new.tip().transaction_count(), 1);
200 }
201 _ => panic!("unexpected notification"),
202 }
203 }
204
205 #[test]
206 fn test_roundtrip() -> eyre::Result<()> {
207 let mut rng = generators::rng();
208
209 let temp_dir = tempfile::tempdir()?;
210 let storage: Storage = Storage::new(&temp_dir)?;
211
212 let old_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
213 let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
214
215 let notification = ExExNotification::ChainReorged {
216 new: Arc::new(Chain::new(vec![new_block], Default::default(), None)),
217 old: Arc::new(Chain::new(vec![old_block], Default::default(), None)),
218 };
219
220 let file_id = 0;
222 storage.write_notification(file_id, ¬ification)?;
223 let deserialized_notification = storage.read_notification(file_id)?;
224 assert_eq!(
225 deserialized_notification.map(|(notification, _)| notification),
226 Some(notification)
227 );
228
229 Ok(())
230 }
231
232 #[test]
233 fn test_files_range() -> eyre::Result<()> {
234 let temp_dir = tempfile::tempdir()?;
235 let storage: Storage = Storage::new(&temp_dir)?;
236
237 File::create(storage.file_path(1))?;
239 File::create(storage.file_path(2))?;
240 File::create(storage.file_path(3))?;
241
242 File::create(temp_dir.path().join("0.tmp"))?;
244 File::create(temp_dir.path().join("4.tmp"))?;
245
246 assert_eq!(storage.files_range()?, Some(1..=3));
248
249 Ok(())
250 }
251}