1use std::{
2 fs::File,
3 ops::RangeInclusive,
4 path::{Path, PathBuf},
5};
6
7use crate::wal::{WalError, WalResult};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_exex_types::ExExNotification;
10use reth_node_api::NodePrimitives;
11use reth_tracing::tracing::debug;
12use tracing::instrument;
13
14static FILE_EXTENSION: &str = "wal";
15
16#[derive(Debug, Clone)]
21pub struct Storage<N: NodePrimitives = EthPrimitives> {
22 path: PathBuf,
24 _pd: std::marker::PhantomData<N>,
25}
26
27impl<N> Storage<N>
28where
29 N: NodePrimitives,
30{
31 pub(super) fn new(path: impl AsRef<Path>) -> WalResult<Self> {
34 reth_fs_util::create_dir_all(&path)?;
35
36 Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData })
37 }
38
39 fn file_path(&self, id: u32) -> PathBuf {
40 self.path.join(format!("{id}.{FILE_EXTENSION}"))
41 }
42
43 fn parse_filename(filename: &str) -> WalResult<u32> {
44 filename
45 .strip_suffix(".wal")
46 .and_then(|s| s.parse().ok())
47 .ok_or_else(|| WalError::Parse(filename.to_string()))
48 }
49
50 #[instrument(skip(self))]
56 fn remove_notification(&self, file_id: u32) -> Option<u64> {
57 let path = self.file_path(file_id);
58 let size = path.metadata().ok()?.len();
59
60 match reth_fs_util::remove_file(self.file_path(file_id)) {
61 Ok(()) => {
62 debug!(target: "exex::wal::storage", "Notification was removed from the storage");
63 Some(size)
64 }
65 Err(err) => {
66 debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage");
67 None
68 }
69 }
70 }
71
72 pub(super) fn files_range(&self) -> WalResult<Option<RangeInclusive<u32>>> {
76 let mut min_id = None;
77 let mut max_id = None;
78
79 for entry in reth_fs_util::read_dir(&self.path)? {
80 let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?;
81
82 if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) {
83 let file_name = entry.file_name();
84 let file_id = Self::parse_filename(&file_name.to_string_lossy())?;
85
86 min_id = min_id.map_or(Some(file_id), |min_id: u32| Some(min_id.min(file_id)));
87 max_id = max_id.map_or(Some(file_id), |max_id: u32| Some(max_id.max(file_id)));
88 }
89 }
90
91 Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
92 }
93
94 pub(super) fn remove_notifications(
100 &self,
101 file_ids: impl IntoIterator<Item = u32>,
102 ) -> WalResult<(usize, u64)> {
103 let mut deleted_total = 0;
104 let mut deleted_size = 0;
105
106 for id in file_ids {
107 if let Some(size) = self.remove_notification(id) {
108 deleted_total += 1;
109 deleted_size += size;
110 }
111 }
112
113 Ok((deleted_total, deleted_size))
114 }
115
116 pub(super) fn iter_notifications(
117 &self,
118 range: RangeInclusive<u32>,
119 ) -> impl Iterator<Item = WalResult<(u32, u64, ExExNotification<N>)>> + '_ {
120 range.map(move |id| {
121 let (notification, size) =
122 self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?;
123
124 Ok((id, size, notification))
125 })
126 }
127
128 #[instrument(skip(self))]
130 pub(super) fn read_notification(
131 &self,
132 file_id: u32,
133 ) -> WalResult<Option<(ExExNotification<N>, u64)>> {
134 let file_path = self.file_path(file_id);
135 debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
136
137 let mut file = match File::open(&file_path) {
138 Ok(file) => file,
139 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
140 Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
141 };
142 let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();
143
144 let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
146 rmp_serde::decode::from_read(&mut file)
147 .map_err(|err| WalError::Decode(file_id, file_path, err))?;
148
149 Ok(Some((notification.into(), size)))
150 }
151
152 #[instrument(skip(self, notification))]
158 pub(super) fn write_notification(
159 &self,
160 file_id: u32,
161 notification: &ExExNotification<N>,
162 ) -> WalResult<u64> {
163 let file_path = self.file_path(file_id);
164 debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
165
166 let notification =
168 reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
169
170 reth_fs_util::atomic_write_file(&file_path, |file| {
171 rmp_serde::encode::write(file, ¬ification)
172 })?;
173
174 Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::Storage;
181 use alloy_consensus::BlockHeader;
182 use alloy_primitives::{
183 map::{HashMap, HashSet},
184 B256, U256,
185 };
186 use reth_exex_types::ExExNotification;
187 use reth_primitives_traits::Account;
188 use reth_provider::Chain;
189 use reth_testing_utils::generators::{self, random_block};
190 use reth_trie_common::{
191 updates::{StorageTrieUpdates, TrieUpdates},
192 BranchNodeCompact, HashedPostState, HashedStorage, LazyTrieData, Nibbles,
193 };
194 use std::{collections::BTreeMap, fs::File, sync::Arc};
195
196 #[test]
199 fn decode_notification_wal() {
200 let wal = include_bytes!("../../test-data/28.wal");
201 let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
202 '_,
203 reth_ethereum_primitives::EthPrimitives,
204 > = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
205 let notification: ExExNotification = notification.into();
206 match notification {
207 ExExNotification::ChainCommitted { new } => {
208 assert_eq!(new.blocks().len(), 1);
209 assert_eq!(new.tip().transaction_count(), 1);
210 }
211 _ => panic!("unexpected notification"),
212 }
213 }
214
215 #[test]
217 fn decode_notification_wal_new_format() {
218 let wal = include_bytes!("../../test-data/new_format.wal");
219 let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
220 '_,
221 reth_ethereum_primitives::EthPrimitives,
222 > = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
223 let notification: ExExNotification = notification.into();
224
225 let expected_notification = get_test_notification_data().unwrap();
227 assert_eq!(
228 ¬ification, &expected_notification,
229 "Decoded notification should match expected static data"
230 );
231 }
232
233 #[test]
234 fn test_roundtrip() -> eyre::Result<()> {
235 let mut rng = generators::rng();
236
237 let temp_dir = tempfile::tempdir()?;
238 let storage: Storage = Storage::new(&temp_dir)?;
239
240 let old_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
241 let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
242
243 let notification = ExExNotification::ChainReorged {
244 new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())),
245 old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())),
246 };
247
248 let file_id = 0;
250 storage.write_notification(file_id, ¬ification)?;
251 let deserialized_notification = storage.read_notification(file_id)?;
252 assert_eq!(
253 deserialized_notification.map(|(notification, _)| notification),
254 Some(notification)
255 );
256
257 Ok(())
258 }
259
260 #[test]
267 #[ignore]
268 fn generate_test_wal() -> eyre::Result<()> {
269 use std::io::Write;
270
271 let notification = get_test_notification_data()?;
272
273 let notification_compat =
275 reth_exex_types::serde_bincode_compat::ExExNotification::from(¬ification);
276 let encoded = rmp_serde::encode::to_vec(¬ification_compat)?;
277
278 let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data");
280 std::fs::create_dir_all(&test_data_dir)?;
281
282 let output_path = test_data_dir.join("new_format.wal");
283 let mut file = File::create(&output_path)?;
284 file.write_all(&encoded)?;
285
286 println!("Generated WAL file at: {}", output_path.display());
287 println!("File size: {} bytes", encoded.len());
288 println!("✓ WAL file created successfully!");
289
290 Ok(())
291 }
292
293 fn get_test_notification_data(
295 ) -> eyre::Result<ExExNotification<reth_ethereum_primitives::EthPrimitives>> {
296 use reth_ethereum_primitives::Block;
297 use reth_primitives_traits::Block as _;
298
299 let block = Block::default().seal_slow().try_recover()?;
301 let block_number = block.header().number();
302
303 let hashed_address = B256::from([1; 32]);
304 let storage_key = B256::from([2; 32]);
305
306 let trie_updates = TrieUpdates {
307 account_nodes: HashMap::from_iter([
308 (Nibbles::from_nibbles_unchecked([0x01]), BranchNodeCompact::default()),
309 (Nibbles::from_nibbles_unchecked([0x02]), BranchNodeCompact::default()),
310 ]),
311 removed_nodes: HashSet::from_iter([Nibbles::from_nibbles_unchecked([0x03])]),
312 storage_tries: HashMap::from_iter([(
313 hashed_address,
314 StorageTrieUpdates {
315 is_deleted: false,
316 storage_nodes: HashMap::from_iter([(
317 Nibbles::from_nibbles_unchecked([0x04]),
318 BranchNodeCompact::default(),
319 )]),
320 removed_nodes: Default::default(),
321 },
322 )]),
323 };
324
325 let hashed_state = HashedPostState {
326 accounts: HashMap::from_iter([(
327 hashed_address,
328 Some(Account { nonce: 1, ..Default::default() }),
329 )]),
330 storages: HashMap::from_iter([(
331 hashed_address,
332 HashedStorage {
333 wiped: false,
334 storage: HashMap::from_iter([(storage_key, U256::from(101))]),
335 },
336 )]),
337 };
338
339 let trie_data = LazyTrieData::ready(
340 Arc::new(hashed_state.into_sorted()),
341 Arc::new(trie_updates.into_sorted()),
342 );
343
344 let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
345 ExExNotification::ChainCommitted {
346 new: Arc::new(Chain::new(
347 vec![block],
348 Default::default(),
349 BTreeMap::from([(block_number, trie_data)]),
350 )),
351 };
352 Ok(notification)
353 }
354
355 #[test]
356 fn test_files_range() -> eyre::Result<()> {
357 let temp_dir = tempfile::tempdir()?;
358 let storage: Storage = Storage::new(&temp_dir)?;
359
360 File::create(storage.file_path(1))?;
362 File::create(storage.file_path(2))?;
363 File::create(storage.file_path(3))?;
364
365 File::create(temp_dir.path().join("0.tmp"))?;
367 File::create(temp_dir.path().join("4.tmp"))?;
368
369 assert_eq!(storage.files_range()?, Some(1..=3));
371
372 Ok(())
373 }
374}